IGNITE-12950 Added check of partition sizes to GridDhtPartitionsStateValidator, even if update counters are different. Fixes #8645
Signed-off-by: Slava Koptilin <slava.koptilin@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
index 8bea474..d1dd063 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
import java.util.AbstractMap;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -25,6 +26,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
@@ -33,6 +35,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.Nullable;
@@ -40,7 +43,8 @@
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
/**
- * Class to validate partitions update counters and cache sizes during exchange process.
+ * Class to validate partitions update counters and cache sizes during exchange
+ * process.
*/
public class GridDhtPartitionsStateValidator {
/** Version since node is able to send cache sizes in {@link GridDhtPartitionsSingleMessage}. */
@@ -66,8 +70,9 @@
* @param fut Current exchange future.
* @param top Topology to validate.
* @param messages Single messages received from all nodes.
- * @throws IgniteCheckedException If validation failed. Exception message contains
- * full information about all partitions which update counters or cache sizes are not consistent.
+ * @throws IgniteCheckedException If validation failed.
+ * Exception message contains full information about all partitions which
+ * update counters or cache sizes are not consistent.
*/
public void validatePartitionCountersAndSizes(
GridDhtPartitionsExchangeFuture fut,
@@ -82,13 +87,10 @@
ignoringNodes.add(evt.eventNode().id());
}
- AffinityTopologyVersion topVer = fut.context().events().topologyVersion();
+ StringBuilder error = new StringBuilder();
- // Validate update counters.
- Map<Integer, Map<UUID, Long>> result = validatePartitionsUpdateCounters(top, messages, ignoringNodes);
-
- if (!result.isEmpty())
- throw new IgniteCheckedException("Partitions update counters are inconsistent for " + fold(topVer, result));
+ Map<Integer, Map<UUID, Long>> resUpdCnt = validatePartitionsUpdateCounters(top, messages, ignoringNodes);
+ Map<Integer, Map<UUID, Long>> resSize = Collections.emptyMap();
// For sizes validation ignore also nodes which are not able to send cache sizes.
for (UUID id : messages.keySet()) {
@@ -99,15 +101,31 @@
if (!cctx.cache().cacheGroup(top.groupId()).mvccEnabled()) { // TODO: Remove "if" clause in IGNITE-9451.
// Validate cache sizes.
- result = validatePartitionsSizes(top, messages, ignoringNodes);
+ resSize = validatePartitionsSizes(top, messages, ignoringNodes);
+ }
- if (!result.isEmpty())
- throw new IgniteCheckedException("Partitions cache sizes are inconsistent for " + fold(topVer, result));
+ AffinityTopologyVersion topVer = fut.context().events().topologyVersion();
+
+ if (!resUpdCnt.isEmpty() && !resSize.isEmpty()) {
+ error.append("Partitions cache size and update counters are inconsistent for ")
+ .append(fold(topVer, resUpdCnt, resSize));
+ }
+ else if (!resUpdCnt.isEmpty() && resSize.isEmpty())
+ error.append("Partitions update counters are inconsistent for ").append(fold(topVer, resUpdCnt));
+ else if (resUpdCnt.isEmpty() && !resSize.isEmpty())
+ error.append("Partitions cache sizes are inconsistent for ").append(fold(topVer, resSize));
+
+ if (error.length() > 0) {
+ Set<Integer> parts = new HashSet<>(resUpdCnt.keySet());
+ parts.addAll(resSize.keySet());
+
+ throw new IgniteCheckedException(error.toString());
}
}
/**
- * Checks what partitions from given {@code singleMsg} message should be excluded from validation.
+ * Checks what partitions from given {@code singleMsg} message should be
+ * excluded from validation.
*
* @param top Topology to validate.
* @param nodeId Node which sent single message.
@@ -156,7 +174,8 @@
* @param top Topology to validate.
* @param messages Single messages received from all nodes.
* @param ignoringNodes Nodes for what we ignore validation.
- * @return Invalid partitions map with following structure: (partId, (nodeId, updateCounter)).
+ * @return Invalid partitions map with following structure:
+ * (partId, (nodeId, updateCounter)).
* If map is empty validation is successful.
*/
public Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters(
@@ -216,7 +235,8 @@
* @param top Topology to validate.
* @param messages Single messages received from all nodes.
* @param ignoringNodes Nodes for what we ignore validation.
- * @return Invalid partitions map with following structure: (partId, (nodeId, cacheSize)).
+ * @return Invalid partitions map with following structure:
+ * (partId, (nodeId, cacheSize)).
* If map is empty validation is successful.
*/
public Map<Integer, Map<UUID, Long>> validatePartitionsSizes(
@@ -271,8 +291,10 @@
}
/**
- * Processes given {@code counter} for partition {@code part} reported by {@code node}.
- * Populates {@code invalidPartitions} map if existing counter and current {@code counter} are different.
+ * Processes given {@code counter} for partition {@code part}
+ * reported by {@code node}.
+ * Populates {@code invalidPartitions} map if existing counter
+ * and current {@code counter} are different.
*
* @param invalidPartitions Invalid partitions map.
* @param countersAndNodes Current map of counters and nodes by partitions.
@@ -304,7 +326,8 @@
}
/**
- * Folds given map of invalid partition states to string representation in the following format:
+ * Folds given map of invalid partition states to string representation
+ * in the following format:
* Part [id]: [consistentId=value*]
*
* Value can be both update counter or cache size.
@@ -329,4 +352,70 @@
return sb.toString();
}
+
+ /**
+ * Folds given map of invalid partition states to string representation
+ * in the following format:
+ * Part [id]: [consistentId=value meta=[updCnt=value, size=value]]
+ * @param topVer Topology version.
+ * @param invalidPartitionsCounters Invalid partitions counters map.
+ * @param invalidPartitionsSize Invalid partitions size map.
+ * @return value is String in the following format:
+ * Part [id]: [consistentId=value meta=[updCnt=value, size=value]]
+ */
+ private String fold(
+ AffinityTopologyVersion topVer,
+ Map<Integer, Map<UUID, Long>> invalidPartitionsCounters,
+ Map<Integer, Map<UUID, Long>> invalidPartitionsSize
+ ) {
+ SB sb = new SB();
+
+ NavigableMap<Integer, Map<UUID, IgnitePair<Long>>> sortedAllPartitions = new TreeMap<>();
+
+ Set<Integer> allKeys = new HashSet<>(invalidPartitionsCounters.keySet());
+
+ allKeys.addAll(invalidPartitionsSize.keySet());
+
+ for (Integer p : allKeys) {
+ Map<UUID, IgnitePair<Long>> map = new HashMap<>();
+
+ fillMapForPartition(invalidPartitionsCounters.get(p), map, true);
+ fillMapForPartition(invalidPartitionsSize.get(p), map, false);
+
+ sortedAllPartitions.put(p, map);
+ }
+
+ for (Map.Entry<Integer, Map<UUID, IgnitePair<Long>>> p : sortedAllPartitions.entrySet()) {
+ sb.a("Part ").a(p.getKey()).a(": [");
+ for (Map.Entry<UUID, IgnitePair<Long>> e : p.getValue().entrySet()) {
+ Object consistentId = cctx.discovery().node(topVer, e.getKey()).consistentId();
+ sb.a("consistentId=").a(consistentId).a(" meta=[updCnt=").a(e.getValue().get1())
+ .a(", size=").a(e.getValue().get2()).a("] ");
+ }
+ sb.a("] ");
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Adds pair of counters and size in result map
+ * @param srcMap PartitionCounters or PartitionSize
+ * @param resultMap result map with pair of values
+ */
+ private void fillMapForPartition(
+ Map<UUID, Long> srcMap,
+ Map<UUID, IgnitePair<Long>> resultMap,
+ boolean isFirst
+ ) {
+ if (srcMap != null) {
+ srcMap.forEach((uuid, val) -> {
+ IgnitePair<Long> pair = resultMap.computeIfAbsent(uuid, u -> new IgnitePair<>());
+ if (isFirst)
+ pair.set1(val);
+ else
+ pair.set2(val);
+ });
+ }
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsUpdateCountersAndSizeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsUpdateCountersAndSizeTest.java
new file mode 100644
index 0000000..f7b87d1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsUpdateCountersAndSizeTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.HashSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test correct behaviour of class to validate partitions update counters and
+ * cache sizes during exchange process
+ * {@link org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator}.
+ */
+public class GridCachePartitionsUpdateCountersAndSizeTest extends GridCommonAbstractTest {
+ /** Cache name. */
+ private static final String CACHE_NAME = "cacheTest";
+
+ /** Listener for parsing patterns in log. */
+ private ListeningTestLogger testLog;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME)
+ .setBackups(2)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ );
+
+ if (igniteInstanceName.endsWith("0"))
+ cfg.setGridLogger(testLog);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ testLog = new ListeningTestLogger(log);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ testLog.clearListeners();
+
+ super.afterTest();
+ }
+
+ /**
+ * Method that shows the results of partitions state validation for all possible cases:
+ * @param inconsistentCnt - {@code true} if it is expected that partition counters are inconsistent.
+ * @param inconsistentSize - {@code true} if it is expected that partition sizes are inconsistent.
+ * @throws Exception If failed.
+ */
+ private void startThreeNodesGrid(boolean inconsistentCnt, boolean inconsistentSize) throws Exception {
+ SizeCounterLogListener lsnr = new SizeCounterLogListener();
+
+ IgniteEx ignite = startGrids(3);
+ ignite.cluster().state(ClusterState.ACTIVE);
+ awaitPartitionMapExchange();
+
+ testLog.registerListener(lsnr);
+
+ // Populate cache to increment update counters.
+ for (int i = 0; i < 1000; i++)
+ ignite.cache(CACHE_NAME).put(i, i);
+
+ if (inconsistentCnt) {
+ // Modify update counter for some partition.
+ ignite.cachex(CACHE_NAME).context().topology().localPartitions().get(0).updateCounter(99L);
+ }
+
+ if (inconsistentSize) {
+ // Modify update size for some partition.
+ ignite.cachex(CACHE_NAME).context().topology().localPartitions().get(0).dataStore()
+ .clear(ignite.cachex(CACHE_NAME).context().cacheId());
+ }
+
+ // Trigger exchange.
+ startGrid(3);
+
+ awaitPartitionMapExchange();
+
+ // Nothing should happen (just log error message) and we're still able
+ // to put data to corrupted cache.
+ ignite.cache(CACHE_NAME).put(0, 0);
+
+ if (inconsistentCnt && !inconsistentSize)
+ assertTrue("Counters inconsistent message found", lsnr.checkCnt());
+
+ if (!inconsistentCnt && inconsistentSize)
+ assertTrue("Size inconsistent message found", lsnr.checkSize());
+
+ if (inconsistentCnt && inconsistentSize)
+ assertTrue("Both counters and sizes message found", lsnr.check());
+
+ if (!inconsistentCnt && !inconsistentSize) {
+ assertFalse("Counters and Size inconsistent message found!",
+ lsnr.check() || lsnr.checkCnt() || lsnr.checkSize());
+ }
+ }
+
+ /**
+ * 1. Only partition counters are inconsistent.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testValidationfPartitionCountersInconsistent() throws Exception {
+ startThreeNodesGrid(true, false);
+ }
+
+ /**
+ * 2. Only partition size are inconsistent.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testValidationfPartitionSizeInconsistent() throws Exception {
+ startThreeNodesGrid(false, true);
+ }
+
+ /**
+ * 3. Partition counters and size are inconsistent.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testValidationBothPartitionSizesAndCountersAreInconsistent() throws Exception {
+ startThreeNodesGrid(true, true);
+ }
+
+ /**
+ * 4. No one partition counters and size are inconsistent.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testValidationBothPatririonSixeAndCountersAreConsistent() throws Exception {
+ startThreeNodesGrid(false, false);
+ }
+
+ /**
+ * Overraided class LogListener for find specific patterns.
+ */
+ private static class SizeCounterLogListener extends LogListener {
+ /** Pattern for Counters inconsistent message.*/
+ final Pattern patCnt = Pattern.compile("(\\d)=(\\d{1,2})");
+
+ /** Pattern for Size inconsistent message.*/
+ final Pattern patSz = Pattern.compile("(\\d)=(\\d{1,2})");
+
+ /** Pattern for Both counters and sizes message*/
+ final Pattern patCntSz = Pattern.compile("consistentId=dht.GridCachePartitionsUpdateCountersAndSizeTest" +
+ "\\d meta=\\[updCnt=(\\d{2}), size=(\\d{1,2})");
+
+ /** if finded substring in log for inconsistent counters.*/
+ boolean cn;
+
+ /** if finded substring in log for inconsistent partition size.*/
+ boolean sz;
+
+ /** return true if inconsistent counters.*/
+ public boolean checkCnt() {
+ return cn;
+ }
+
+ /** return true if inconsistent partition size.*/
+ public boolean checkSize() {
+ return sz;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean check() {
+ return cn && sz;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ //no op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void accept(String s) {
+ HashSet<Long> setCnt = new HashSet<>();
+ HashSet<Long> setSize = new HashSet<>();
+
+ if (s.contains("Partitions update counters are inconsistent for Part 0: ")) {
+ Matcher m = patCnt.matcher(s);
+
+ while (m.find())
+ setCnt.add(Long.parseLong(m.group(2)));
+ }
+
+ else if (s.contains("Partitions cache sizes are inconsistent for Part 0: ")) {
+ Matcher m = patSz.matcher(s);
+ while (m.find())
+ setSize.add(Long.parseLong(m.group(2)));
+ }
+
+ else if (s.contains("Partitions cache size and update counters are inconsistent for Part 0:")) {
+ Matcher m = patCntSz.matcher(s);
+ while (m.find()) {
+ setCnt.add(Long.parseLong(m.group(1)));
+ setSize.add(Long.parseLong(m.group(2)));
+ }
+ }
+
+ if (setCnt.size() == 2 && setCnt.contains(32L) && setCnt.contains(99L))
+ cn = true;
+ if (setSize.size() == 2 && setSize.contains(0L) && setSize.contains(32L))
+ sz = true;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
index e0589f7..326ed80 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
@@ -107,6 +107,7 @@
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageWriteTimeoutTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheAtomicNearCacheSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidatorSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsUpdateCountersAndSizeTest;
import org.apache.ignite.internal.processors.cache.expiry.IgniteCacheAtomicLocalExpiryPolicyTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorExternalizableFailedTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorNonSerializableTest;
@@ -215,6 +216,7 @@
ignoredTests.add(GridCacheLifecycleAwareSelfTest.class);
ignoredTests.add(IgniteCacheMessageWriteTimeoutTest.class);
ignoredTests.add(GridCachePartitionsStateValidatorSelfTest.class);
+ ignoredTests.add(GridCachePartitionsUpdateCountersAndSizeTest.class);
ignoredTests.add(IgniteVariousConnectionNumberTest.class);
ignoredTests.add(IgniteIncompleteCacheObjectSelfTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index b66ffaa..d60ef3d 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -160,6 +160,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheGlobalLoadTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidationTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidatorSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsUpdateCountersAndSizeTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheGetStoreErrorSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxExceptionSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedStorePutSelfTest;
@@ -329,6 +330,7 @@
GridTestUtils.addTestIfNeeded(suite, CacheDeferredDeleteQueueTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCachePartitionsStateValidatorSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCachePartitionsStateValidationTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, GridCachePartitionsUpdateCountersAndSizeTest.class, ignoredTests);
suite.addAll(IgniteCacheTcpClientDiscoveryTestSuite.suite(ignoredTests));
diff --git a/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlTest.java b/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlTest.java
index d89afb8..2c6dd72 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlTest.java
@@ -19,7 +19,6 @@
import java.util.Arrays;
import java.util.List;
-
import org.junit.Test;
/**