IGNITE-12671 Ignoring single messages during PME can prevent late affinity switch. - Fixes #7425.
Signed-off-by: Aleksei Scherbakov <ascherbakov@apache.org>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e78ada3..cc65919 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -80,7 +80,6 @@
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
@@ -405,32 +404,8 @@
return;
}
}
- else if (exchangeInProgress()) {
- if (log.isInfoEnabled())
- log.info("Ignore single message without exchange id (there is exchange in progress) [nodeId=" + node.id() + "]");
- return;
- }
-
- if (!crdInitFut.isDone() && !msg.restoreState()) {
- GridDhtPartitionExchangeId exchId = msg.exchangeId();
-
- if (log.isInfoEnabled()) {
- log.info("Waiting for coordinator initialization [node=" + node.id() +
- ", nodeOrder=" + node.order() +
- ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']');
- }
-
- crdInitFut.listen(new CI1<IgniteInternalFuture>() {
- @Override public void apply(IgniteInternalFuture fut) {
- processSinglePartitionUpdate(node, msg);
- }
- });
-
- return;
- }
-
- processSinglePartitionUpdate(node, msg);
+ preprocessSingleMessage(node, msg);
}
});
@@ -517,6 +492,34 @@
}
/**
+ * Preprocess {@code msg} which was sended by {@code node}.
+ *
+ * @param node Cluster node.
+ * @param msg Message.
+ */
+ private void preprocessSingleMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+ if (!crdInitFut.isDone() && !msg.restoreState()) {
+ GridDhtPartitionExchangeId exchId = msg.exchangeId();
+
+ if (log.isInfoEnabled()) {
+ log.info("Waiting for coordinator initialization [node=" + node.id() +
+ ", nodeOrder=" + node.order() +
+ ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']');
+ }
+
+ crdInitFut.listen(new CI1<IgniteInternalFuture>() {
+ @Override public void apply(IgniteInternalFuture fut) {
+ processSinglePartitionUpdate(node, msg);
+ }
+ });
+
+ return;
+ }
+
+ processSinglePartitionUpdate(node, msg);
+ }
+
+ /**
*
*/
public void onCoordinatorInitialized() {
@@ -2795,30 +2798,6 @@
return exchWorker != null && Thread.currentThread() == exchWorker.runner();
}
- /**
- * @return {@code True} If there is any exchange future in progress.
- */
- private boolean exchangeInProgress() {
- if (exchWorker.hasPendingServerExchange())
- return true;
-
- GridDhtPartitionsExchangeFuture current = lastTopologyFuture();
-
- if (current == null)
- return false;
-
- GridDhtTopologyFuture finished = lastFinishedFut.get();
-
- if (finished == null || finished.result().compareTo(current.initialVersion()) < 0) {
- ClusterNode triggeredBy = current.firstEvent().eventNode();
-
- if (current.partitionChangesInProgress() && !triggeredBy.isClient())
- return true;
- }
-
- return false;
- }
-
/** */
public boolean affinityChanged(AffinityTopologyVersion from, AffinityTopologyVersion to) {
if (lastAffinityChangedTopologyVersion(to).compareTo(from) >= 0)
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCompleteDuringExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCompleteDuringExchangeTest.java
new file mode 100644
index 0000000..242a8e7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCompleteDuringExchangeTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * This test hangs exchange and waits rebalance complete. After partitions rebalance completed exchange will unlock and
+ * test will wait ideal assignment.
+ */
+public class RebalanceCompleteDuringExchangeTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ return super.getConfiguration(name)
+ .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setCacheMode(CacheMode.REPLICATED))
+ .setCommunicationSpi(new TestRecordingCommunicationSpi());
+ }
+
+ /**
+ * Waits ideal assignment for configured cache.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalance() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+
+ ignite0.cluster().active(true);
+
+ IgniteCache cache = ignite0.cache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < 2000; i++)
+ cache.put(i, i);
+
+ IgniteEx ignite1 = startNodeAndBlockRebalance(1);
+
+ TestRecordingCommunicationSpi commSpi2 = startNodeAndBlockExchange(2);
+
+ TestRecordingCommunicationSpi commSpi1 = TestRecordingCommunicationSpi.spi(ignite1);
+
+ commSpi1.waitForRecorded();
+
+ info(getTestIgniteInstanceName(1) + " sent Single message to coordinator.");
+
+ commSpi1.recordedMessages(true);
+
+ commSpi1.record(GridDhtPartitionsSingleMessage.class);
+
+ commSpi2.waitForBlocked();
+
+ info("Exchange is waiting Single message from " + getTestIgniteInstanceName(2));
+
+ commSpi1.waitForBlocked();
+
+ info("Rebalance on " + getTestIgniteInstanceName(1) + " was blocked.");
+
+ commSpi1.stopBlock();
+
+ //Test is waiting sent information about rebalance complete.
+ commSpi1.waitForRecorded();
+
+ info("Rebalance on " + getTestIgniteInstanceName(1) + " was unblocked and completed.");
+
+ commSpi2.stopBlock();
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ * Starts node and blocks exchange on it.
+ *
+ * @param nodeNum Number of node.
+ * @return Test communication spi.
+ * @throws Exception If failed.
+ */
+ public TestRecordingCommunicationSpi startNodeAndBlockExchange(int nodeNum) throws Exception {
+ IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(nodeNum)));
+
+ TestRecordingCommunicationSpi commSpi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+ commSpi.blockMessages(GridDhtPartitionsSingleMessage.class, getTestIgniteInstanceName(0));
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
+ try {
+ IgniteEx ignite2 = startGrid(cfg);
+ }
+ catch (Exception e) {
+ log.error("Start clustr exception " + e.getMessage(), e);
+ }
+ });
+
+ return commSpi;
+ }
+
+ /**
+ * Starts node and blocks rebalance.
+ *
+ * @param nodeNum Number of node.
+ * @throws Exception If failed.
+ */
+ public IgniteEx startNodeAndBlockRebalance(int nodeNum) throws Exception {
+ IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(nodeNum)));
+
+ TestRecordingCommunicationSpi commSpi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+ commSpi.record((ClusterNode node, Message msg) -> {
+ if (msg instanceof GridDhtPartitionsSingleMessage) {
+ GridDhtPartitionsSingleMessage singleMessage = (GridDhtPartitionsSingleMessage)msg;
+
+ if (singleMessage.exchangeId() == null)
+ return false;
+
+ return singleMessage.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(3, 0));
+ }
+
+ return false;
+ });
+
+ commSpi.blockMessages((ClusterNode node, Message msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage demandMessage = (GridDhtPartitionDemandMessage)msg;
+
+ return CU.cacheId(DEFAULT_CACHE_NAME) == demandMessage.groupId();
+ }
+
+ return false;
+ });
+
+ return startGrid(cfg);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 00e77f9..77492a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.ignite.cache.RebalanceCompleteDuringExchangeTest;
import org.apache.ignite.cache.ResetLostPartitionTest;
import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
@@ -81,6 +82,7 @@
GridTestUtils.addTestIfNeeded(suite, IgnitePdsRestartAfterFailedToWriteMetaPageTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsRemoveDuringRebalancingTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsSpuriousRebalancingOnNodeJoinTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, RebalanceCompleteDuringExchangeTest.class, ignoredTests);
// Page lock tracker tests.
GridTestUtils.addTestIfNeeded(suite, PageLockTrackerManagerTest.class, ignoredTests);