IGNITE-17507 Fixed an issue that could lead to unexpected partition map exchange on client nodes. Fixes #10187
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
index 3d9bfcc..29aa940 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
@@ -50,9 +50,8 @@
     }
 
     /** {@inheritDoc} */
-    @Deprecated
     @Override public boolean stopProcess() {
-        return false;
+        return delegate.stopProcess();
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index 2309fc0..c3cd0ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -20,6 +20,7 @@
 import java.io.Serializable;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
 import org.jetbrains.annotations.Nullable;
@@ -92,6 +93,15 @@
     public boolean isMutable();
 
     /**
+     * See {@link DiscoverySpiCustomMessage#stopProcess()}.
+     *
+     * @return {@code True} if message should not be sent to others nodes after it was processed on coordinator.
+     */
+    public default boolean stopProcess() {
+        return false;
+    }
+
+    /**
      * Creates new discovery cache if message caused topology version change.
      *
      * @param mgr Discovery manager.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
index 708e3c1..1330e88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
@@ -28,6 +28,7 @@
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -55,8 +56,18 @@
     /** */
     private GridDhtPartitionsFullMessage partsMsg;
 
-    /** */
-    private transient boolean exchangeNeeded;
+    /** If this flag is {@code true} then this message should lead to partition map exchnage. */
+    private boolean exchangeNeeded;
+
+    /**
+     * This flag indicates that this message should not be passed to other nodes except the coordinator.
+     * Instead of this message, the message which is returned by {@link #ackMessage()} will be sent.
+     * See {@link DiscoveryCustomMessage#stopProcess()}.
+     *
+     * This flag is used when discovery SPI does not support mutable custom messages.
+     * See {@link DiscoverySpiMutableCustomMessageSupport}.
+     */
+    private transient boolean stopProc;
 
     /**
      * Constructor used when message is created after cache rebalance finished.
@@ -76,7 +87,8 @@
      * @param partsMsg Partitions messages.
      * @param assignmentChange Assignment change.
      */
-    public CacheAffinityChangeMessage(GridDhtPartitionExchangeId exchId,
+    public CacheAffinityChangeMessage(
+        GridDhtPartitionExchangeId exchId,
         GridDhtPartitionsFullMessage partsMsg,
         Map<Integer, Map<Integer, List<UUID>>> assignmentChange) {
         this.exchId = exchId;
@@ -140,17 +152,44 @@
 
     /** {@inheritDoc} */
     @Nullable @Override public DiscoveryCustomMessage ackMessage() {
-        return null;
+        if (!stopProc)
+            return null;
+
+        // If stopProc is equal to true, then Discovery SPI does not support mutable custom messages.
+        // Let's return the same message, that was muted on the coordinator node. This message will be sent to all nodes
+        // instead of the original one.
+        return this;
     }
 
     /** {@inheritDoc} */
     @Override public boolean isMutable() {
-        return false;
+        return true;
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
-        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+    @Override public boolean stopProcess() {
+        return stopProc;
+    }
+
+    /**
+     * Sets stop processing flag. If this flag is {@code true} then this message is not passed to other nodes after
+     * the coordinator node notitied its own listner. If method {@link #ackMessage()} returns non-null ack message,
+     * it is sent to all nodes.
+     * This flag is used when discovery SPI does not support mutable custom messages.
+     * See {@link DiscoverySpiMutableCustomMessageSupport}.
+     *
+     * @param stopProc If {@code true} then this message is not passed to other nodes.
+     */
+    public void stopProcess(boolean stopProc) {
+        this.stopProc = stopProc;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoCache createDiscoCache(
+        GridDiscoveryManager mgr,
+        AffinityTopologyVersion topVer,
+        DiscoCache discoCache
+    ) {
         return discoCache.copy(topVer, null);
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index f6f4a95..fda6598 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -77,6 +77,7 @@
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -219,11 +220,21 @@
             return false;
         }
 
+        boolean isClient = cctx.discovery().localNode().isClient();
+
+        if (cctx.kernalContext().config().getDiscoverySpi() instanceof TcpDiscoverySpi)
+            isClient &= !((TcpDiscoverySpi)cctx.kernalContext().config().getDiscoverySpi()).isForceServerMode();
+
         // Skip message if affinity was already recalculated.
-        boolean exchangeNeeded = lastAffVer == null || lastAffVer.equals(msg.topologyVersion());
+        // Client node should just accept the flag from the mutated message.
+        boolean exchangeNeeded = (isClient) ? msg.exchangeNeeded() :
+            lastAffVer == null || lastAffVer.equals(msg.topologyVersion());
 
         msg.exchangeNeeded(exchangeNeeded);
 
+        if (!cctx.discovery().mutableCustomMessages() && !isClient)
+            msg.stopProcess(true);
+
         if (exchangeNeeded) {
             if (log.isDebugEnabled()) {
                 log.debug("Need process affinity change message [lastAffVer=" + lastAffVer +
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index afd7093..6e11673 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -45,10 +45,9 @@
      * then message is not passed to others nodes, if after this method {@link #ackMessage()} returns non-null ack
      * message, it is sent to all nodes.
      *
-     * @return {@code True} if message should not be sent to all nodes.
+     * Note: this method is used then and only then the zookeeper discovery is configured.
      *
-     * @deprecated Is not used anymore and will be removed at 3.0.
+     * @return {@code True} if message should not be sent to all nodes.
      */
-    @Deprecated
     public boolean stopProcess();
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 0309447..d50c2c5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -105,7 +105,10 @@
 import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.blockSingleExhangeMessage;
 import static org.apache.ignite.internal.processors.cache.ExchangeContext.IGNITE_EXCHANGE_COMPATIBILITY_VER_1;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
  *
@@ -1352,7 +1355,7 @@
             Ignite ignite = grids.get(i);
 
             if (!blocked.contains(ignite.name())) {
-                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                waitForCondition(new GridAbsPredicate() {
                     @Override public boolean apply() {
                         return fut.isDone();
                     }
@@ -1527,7 +1530,7 @@
             });
         }
 
-        IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(new Callable<Void>() {
+        IgniteInternalFuture<?> stopFut = runAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
                 for (int j = 1; j < NODES; j++) {
                     TestRecordingCommunicationSpi spi =
@@ -1561,6 +1564,106 @@
     }
 
     /**
+     * Wait for rebalance, send affinity change message, but affinity already changed
+     * (new nodes joined: server + client). Checks that tere is no race that could lead to
+     * unexpected partition map exchange on the client node.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDelayAssignmentAffinityChangedUnexpectedPME() throws Exception {
+        Ignite ignite0 = startServer(0, 1);
+
+        for (int i = 0; i < 1024; i++)
+            ignite0.cache(CACHE_NAME1).put(i, i);
+
+        DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+        ((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr);
+
+        TestRecordingCommunicationSpi commSpi0 =
+            (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi();
+
+        // Starting a new client node should not lead to a rebalance obviously.
+        // So, it is expected that data distribution is ideal (ideal assignment).
+        startClient(1, 2);
+
+        checkAffinity(2, topVer(2, 0), true);
+
+        // Block late affinity assignment. (*)
+        lsnr.blockCustomEvent(CacheAffinityChangeMessage.class);
+
+        // Starting a new server node triggers data rebalancing.
+        // [3, 0] - is not ideal (expected)
+        startServer(2, 3);
+
+        checkAffinity(3, topVer(3, 0), false);
+
+        // Wait for sending late affinity assignment message (1) from the coordinator node.
+        // This message will be blocked (*)
+        lsnr.waitCustomEvent();
+
+        // Block rebalance messages.
+        blockSupplySend(commSpi0, CACHE_NAME1);
+
+        // Starting a new server node means that the late affinity assignment message (1) should be skipped.
+        startServer(3, 4);
+
+        TestRecordingCommunicationSpi clientSpi = new TestRecordingCommunicationSpi();
+        clientSpi.blockMessages(blockSingleExhangeMessage());
+        spiC = igniteInstanceName -> clientSpi;
+
+        IgniteInternalFuture<?> startClientFut = runAsync(() -> {
+            startClient(4, 5);
+        });
+
+        clientSpi.waitForBlocked();
+
+        // Unblock the late affinity assignment message (1).
+        lsnr.stopBlockCustomEvents();
+
+        clientSpi.stopBlock();
+
+        startClientFut.get(15_000);
+
+        // [5, 0] - is not ideal (expected)
+        checkAffinity(5, topVer(5, 0), false);
+
+        // Rebalance is blocked at this moment, so [5, 1] is not ready.
+        checkNoExchange(5, topVer(5, 1));
+
+        // Unblock rebalancing.
+        // The late affinity assignments message (2) should be fired after all.
+        commSpi0.stopBlock();
+
+        // [5, 1] should be ideal
+        checkAffinity(5, topVer(5, 1), true);
+
+        // The following output demonstrates the issue.
+        // The coordinator node and client initiate PME on the same toplogy version,
+        // but it relies to different custom messages.
+        // client:
+        //      Started exchange init [
+        //          topVer=AffinityTopologyVersion [topVer=5, minorTopVer=1],
+        //          crd=false,
+        //          evt=DISCOVERY_CUSTOM_EVT, evtNode=00ac9434-fd34-4aae-95d3-ceb477700000,
+        //          customEvt=CacheAffinityChangeMessage [
+        //              id=3ccc8984181-ea41279c-71cb-4b8c-8b48-1dee1baa6fe0,                       <<< (1)
+        //              topVer=AffinityTopologyVersion [topVer=3, minorTopVer=0], ...]             <<< !!!
+        // coordinator:
+        //      Started exchange init
+        //          [topVer=AffinityTopologyVersion [topVer=5, minorTopVer=1],
+        //          crd=true,
+        //          evt=DISCOVERY_CUSTOM_EVT, evtNode=00ac9434-fd34-4aae-95d3-ceb477700000,
+        //          customEvt=CacheAffinityChangeMessage [
+        //              id=d2ec8984181-ea41279c-71cb-4b8c-8b48-1dee1baa6fe0,                        <<< (2)
+        //              topVer=AffinityTopologyVersion [topVer=4, minorTopVer=0], ...]              <<< !!!
+        awaitPartitionMapExchange(true, true, null, false);
+
+        assertPartitionsSame(idleVerify(grid(0), CACHE_NAME1));
+    }
+
+    /**
      * Wait for rebalance, send affinity change message, but affinity already changed (new node joined).
      *
      * @throws Exception If failed.
@@ -1657,7 +1760,7 @@
 
             lsnr.stopBlockCustomEvents();
 
-            boolean started = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            boolean started = waitForCondition(new GridAbsPredicate() {
                 @Override public boolean apply() {
                     return startedFuture.isDone();
                 }
@@ -2174,7 +2277,7 @@
             //Ensure exchanges merge.
             spiC = igniteInstanceName -> testSpis[getTestIgniteInstanceIndex(igniteInstanceName)];
 
-            GridTestUtils.runAsync(() -> {
+            runAsync(() -> {
                 try {
                     for (int j = 1; j < NODES; j++)
                         testSpis[j].waitForBlocked();
@@ -2321,7 +2424,7 @@
             }
         }, NODES, "update-thread");
 
-        IgniteInternalFuture<?> srvRestartFut = GridTestUtils.runAsync(new Callable<Void>() {
+        IgniteInternalFuture<?> srvRestartFut = runAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
                 while (!fail.get() && System.currentTimeMillis() < stopTime) {
                     Ignite node = startGrid(NODES);
@@ -2462,7 +2565,7 @@
 
             final ClusterNode srvcNode = affinity.get(part).get(0);
 
-            boolean wait = GridTestUtils.waitForCondition(new PA() {
+            boolean wait = waitForCondition(new PA() {
                 @Override public boolean apply() {
                     TestService srvc = grid(srvcNode).services().service(srvcName);
 
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
index 41a4761..9b7476c 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
@@ -58,7 +58,6 @@
     }
 
     /** {@inheritDoc} */
-    @Deprecated
     @Override public boolean stopProcess() {
         return false;
     }
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
index 4c8d996..0c79c36 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
@@ -50,7 +50,6 @@
     }
 
     /** {@inheritDoc} */
-    @Deprecated
     @Override public boolean stopProcess() {
         return false;
     }
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
index bd6e1e8..de7291c 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
@@ -54,7 +54,6 @@
     }
 
     /** {@inheritDoc} */
-    @Deprecated
     @Override public boolean stopProcess() {
         return false;
     }
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
index 957310d..626fe74 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
@@ -39,7 +39,6 @@
     }
 
     /** {@inheritDoc} */
-    @Deprecated
     @Override public boolean stopProcess() {
         return false;
     }
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 97f7b86..5225c9a 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -2588,11 +2588,39 @@
         if (log.isDebugEnabled())
             log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
 
+        boolean fastStopProcess = false;
+
         if (msg instanceof ZkInternalMessage)
             processInternalMessage(evtData, (ZkInternalMessage)msg);
-        else
+        else {
             notifyCustomEvent(evtData, msg);
 
+            if (msg.stopProcess()) {
+                if (log.isDebugEnabled())
+                    log.debug("Fast stop process custom event [evt=" + evtData + ", msg=" + msg + ']');
+
+                fastStopProcess = true;
+
+                // No need to process this event on others nodes, skip this event.
+                evtsData.evts.remove(evtData.eventId());
+
+                evtsData.evtIdGen--;
+
+                DiscoverySpiCustomMessage ack = msg.ackMessage();
+
+                if (ack != null) {
+                    evtData = createAckEvent(ack, evtData);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Generated CUSTOM event (ack for fast stop process) [evt=" + evtData + ", msg=" + msg + ']');
+
+                    notifyCustomEvent(evtData, ack);
+                }
+                else
+                    evtData = null;
+            }
+        }
+
         if (evtData != null) {
             evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
 
@@ -2600,6 +2628,9 @@
 
             saveAndProcessNewEvents();
 
+            if (fastStopProcess)
+                deleteCustomEventDataAsync(zkClient, evtPath);
+
             if (failedNode != null) {
                 deleteAliveNode(failedNode.internalId());