IGNITE-12671 Ignoring single messages during PME can prevent late affinity switch. - Fixes #7425.

Signed-off-by: Aleksei Scherbakov <ascherbakov@apache.org>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e78ada3..cc65919 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -80,7 +80,6 @@
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
@@ -405,32 +404,8 @@
                             return;
                         }
                     }
-                    else if (exchangeInProgress()) {
-                        if (log.isInfoEnabled())
-                            log.info("Ignore single message without exchange id (there is exchange in progress) [nodeId=" + node.id() + "]");
 
-                        return;
-                    }
-
-                    if (!crdInitFut.isDone() && !msg.restoreState()) {
-                        GridDhtPartitionExchangeId exchId = msg.exchangeId();
-
-                        if (log.isInfoEnabled()) {
-                            log.info("Waiting for coordinator initialization [node=" + node.id() +
-                                ", nodeOrder=" + node.order() +
-                                ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']');
-                        }
-
-                        crdInitFut.listen(new CI1<IgniteInternalFuture>() {
-                            @Override public void apply(IgniteInternalFuture fut) {
-                                processSinglePartitionUpdate(node, msg);
-                            }
-                        });
-
-                        return;
-                    }
-
-                    processSinglePartitionUpdate(node, msg);
+                    preprocessSingleMessage(node, msg);
                 }
             });
 
@@ -517,6 +492,34 @@
     }
 
     /**
+     * Preprocess {@code msg} which was sended by {@code node}.
+     *
+     * @param node Cluster node.
+     * @param msg Message.
+     */
+    private void preprocessSingleMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+        if (!crdInitFut.isDone() && !msg.restoreState()) {
+            GridDhtPartitionExchangeId exchId = msg.exchangeId();
+
+            if (log.isInfoEnabled()) {
+                log.info("Waiting for coordinator initialization [node=" + node.id() +
+                    ", nodeOrder=" + node.order() +
+                    ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']');
+            }
+
+            crdInitFut.listen(new CI1<IgniteInternalFuture>() {
+                @Override public void apply(IgniteInternalFuture fut) {
+                    processSinglePartitionUpdate(node, msg);
+                }
+            });
+
+            return;
+        }
+
+        processSinglePartitionUpdate(node, msg);
+    }
+
+    /**
      *
      */
     public void onCoordinatorInitialized() {
@@ -2795,30 +2798,6 @@
         return exchWorker != null && Thread.currentThread() == exchWorker.runner();
     }
 
-    /**
-     * @return {@code True} If there is any exchange future in progress.
-     */
-    private boolean exchangeInProgress() {
-        if (exchWorker.hasPendingServerExchange())
-            return true;
-
-        GridDhtPartitionsExchangeFuture current = lastTopologyFuture();
-
-        if (current == null)
-            return false;
-
-        GridDhtTopologyFuture finished = lastFinishedFut.get();
-
-        if (finished == null || finished.result().compareTo(current.initialVersion()) < 0) {
-            ClusterNode triggeredBy = current.firstEvent().eventNode();
-
-            if (current.partitionChangesInProgress() && !triggeredBy.isClient())
-                return true;
-       }
-
-        return false;
-    }
-
     /** */
     public boolean affinityChanged(AffinityTopologyVersion from, AffinityTopologyVersion to) {
         if (lastAffinityChangedTopologyVersion(to).compareTo(from) >= 0)
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCompleteDuringExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCompleteDuringExchangeTest.java
new file mode 100644
index 0000000..242a8e7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCompleteDuringExchangeTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * This test hangs exchange and waits rebalance complete. After partitions rebalance completed exchange will unlock and
+ * test will wait ideal assignment.
+ */
+public class RebalanceCompleteDuringExchangeTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+        return super.getConfiguration(name)
+            .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+                .setCacheMode(CacheMode.REPLICATED))
+            .setCommunicationSpi(new TestRecordingCommunicationSpi());
+    }
+
+    /**
+     * Waits ideal assignment for configured cache.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalance() throws Exception {
+        IgniteEx ignite0 = startGrid(0);
+
+        ignite0.cluster().active(true);
+
+        IgniteCache cache = ignite0.cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 2000; i++)
+            cache.put(i, i);
+
+        IgniteEx ignite1 = startNodeAndBlockRebalance(1);
+
+        TestRecordingCommunicationSpi commSpi2 = startNodeAndBlockExchange(2);
+
+        TestRecordingCommunicationSpi commSpi1 = TestRecordingCommunicationSpi.spi(ignite1);
+
+        commSpi1.waitForRecorded();
+
+        info(getTestIgniteInstanceName(1) + " sent Single message to coordinator.");
+
+        commSpi1.recordedMessages(true);
+
+        commSpi1.record(GridDhtPartitionsSingleMessage.class);
+
+        commSpi2.waitForBlocked();
+
+        info("Exchange is waiting Single message from " + getTestIgniteInstanceName(2));
+
+        commSpi1.waitForBlocked();
+
+        info("Rebalance on " + getTestIgniteInstanceName(1) + " was blocked.");
+
+        commSpi1.stopBlock();
+
+        //Test is waiting sent information about rebalance complete.
+        commSpi1.waitForRecorded();
+
+        info("Rebalance on " + getTestIgniteInstanceName(1) + " was unblocked and completed.");
+
+        commSpi2.stopBlock();
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * Starts node and blocks exchange on it.
+     *
+     * @param nodeNum Number of node.
+     * @return Test communication spi.
+     * @throws Exception If failed.
+     */
+    public TestRecordingCommunicationSpi startNodeAndBlockExchange(int nodeNum) throws Exception {
+        IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(nodeNum)));
+
+        TestRecordingCommunicationSpi commSpi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+        commSpi.blockMessages(GridDhtPartitionsSingleMessage.class, getTestIgniteInstanceName(0));
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
+            try {
+                IgniteEx ignite2 = startGrid(cfg);
+            }
+            catch (Exception e) {
+                log.error("Start clustr exception " + e.getMessage(), e);
+            }
+        });
+
+        return commSpi;
+    }
+
+    /**
+     * Starts node and blocks rebalance.
+     *
+     * @param nodeNum Number of node.
+     * @throws Exception If failed.
+     */
+    public IgniteEx startNodeAndBlockRebalance(int nodeNum) throws Exception {
+        IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(nodeNum)));
+
+        TestRecordingCommunicationSpi commSpi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+        commSpi.record((ClusterNode node, Message msg) -> {
+            if (msg instanceof GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage singleMessage = (GridDhtPartitionsSingleMessage)msg;
+
+                if (singleMessage.exchangeId() == null)
+                    return false;
+
+                return singleMessage.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(3, 0));
+            }
+
+            return false;
+        });
+
+        commSpi.blockMessages((ClusterNode node, Message msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage demandMessage = (GridDhtPartitionDemandMessage)msg;
+
+                return CU.cacheId(DEFAULT_CACHE_NAME) == demandMessage.groupId();
+            }
+
+            return false;
+        });
+
+        return startGrid(cfg);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 00e77f9..77492a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -20,6 +20,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import org.apache.ignite.cache.RebalanceCompleteDuringExchangeTest;
 import org.apache.ignite.cache.ResetLostPartitionTest;
 import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
 import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
@@ -81,6 +82,7 @@
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsRestartAfterFailedToWriteMetaPageTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsRemoveDuringRebalancingTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsSpuriousRebalancingOnNodeJoinTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, RebalanceCompleteDuringExchangeTest.class, ignoredTests);
 
         // Page lock tracker tests.
         GridTestUtils.addTestIfNeeded(suite, PageLockTrackerManagerTest.class, ignoredTests);