IGNITE-12950 Added check of partition sizes to GridDhtPartitionsStateValidator, even if update counters are different. Fixes #8645

Signed-off-by: Slava Koptilin <slava.koptilin@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
index 8bea474..d1dd063 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
 
 import java.util.AbstractMap;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -25,6 +26,7 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -33,6 +35,7 @@
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
@@ -40,7 +43,8 @@
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 
 /**
- * Class to validate partitions update counters and cache sizes during exchange process.
+ * Class to validate partitions update counters and cache sizes during exchange
+ * process.
  */
 public class GridDhtPartitionsStateValidator {
     /** Version since node is able to send cache sizes in {@link GridDhtPartitionsSingleMessage}. */
@@ -66,8 +70,9 @@
      * @param fut Current exchange future.
      * @param top Topology to validate.
      * @param messages Single messages received from all nodes.
-     * @throws IgniteCheckedException If validation failed. Exception message contains
-     * full information about all partitions which update counters or cache sizes are not consistent.
+     * @throws IgniteCheckedException If validation failed.
+     * Exception message contains full information about all partitions which
+     * update counters or cache sizes are not consistent.
      */
     public void validatePartitionCountersAndSizes(
         GridDhtPartitionsExchangeFuture fut,
@@ -82,13 +87,10 @@
                 ignoringNodes.add(evt.eventNode().id());
         }
 
-        AffinityTopologyVersion topVer = fut.context().events().topologyVersion();
+        StringBuilder error = new StringBuilder();
 
-        // Validate update counters.
-        Map<Integer, Map<UUID, Long>> result = validatePartitionsUpdateCounters(top, messages, ignoringNodes);
-
-        if (!result.isEmpty())
-            throw new IgniteCheckedException("Partitions update counters are inconsistent for " + fold(topVer, result));
+        Map<Integer, Map<UUID, Long>> resUpdCnt = validatePartitionsUpdateCounters(top, messages, ignoringNodes);
+        Map<Integer, Map<UUID, Long>> resSize = Collections.emptyMap();
 
         // For sizes validation ignore also nodes which are not able to send cache sizes.
         for (UUID id : messages.keySet()) {
@@ -99,15 +101,31 @@
 
         if (!cctx.cache().cacheGroup(top.groupId()).mvccEnabled()) { // TODO: Remove "if" clause in IGNITE-9451.
             // Validate cache sizes.
-            result = validatePartitionsSizes(top, messages, ignoringNodes);
+            resSize = validatePartitionsSizes(top, messages, ignoringNodes);
+        }
 
-            if (!result.isEmpty())
-                throw new IgniteCheckedException("Partitions cache sizes are inconsistent for " + fold(topVer, result));
+        AffinityTopologyVersion topVer = fut.context().events().topologyVersion();
+
+        if (!resUpdCnt.isEmpty() && !resSize.isEmpty()) {
+            error.append("Partitions cache size and update counters are inconsistent for ")
+                .append(fold(topVer, resUpdCnt, resSize));
+        }
+        else if (!resUpdCnt.isEmpty() && resSize.isEmpty())
+            error.append("Partitions update counters are inconsistent for ").append(fold(topVer, resUpdCnt));
+        else if (resUpdCnt.isEmpty() && !resSize.isEmpty())
+            error.append("Partitions cache sizes are inconsistent for ").append(fold(topVer, resSize));
+
+        if (error.length() > 0) {
+            Set<Integer> parts = new HashSet<>(resUpdCnt.keySet());
+            parts.addAll(resSize.keySet());
+
+            throw new IgniteCheckedException(error.toString());
         }
     }
 
     /**
-     * Checks what partitions from given {@code singleMsg} message should be excluded from validation.
+     * Checks what partitions from given {@code singleMsg} message should be
+     * excluded from validation.
      *
      * @param top Topology to validate.
      * @param nodeId Node which sent single message.
@@ -156,7 +174,8 @@
      * @param top Topology to validate.
      * @param messages Single messages received from all nodes.
      * @param ignoringNodes Nodes for what we ignore validation.
-     * @return Invalid partitions map with following structure: (partId, (nodeId, updateCounter)).
+     * @return Invalid partitions map with following structure:
+     * (partId, (nodeId, updateCounter)).
      * If map is empty validation is successful.
      */
     public Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters(
@@ -216,7 +235,8 @@
      * @param top Topology to validate.
      * @param messages Single messages received from all nodes.
      * @param ignoringNodes Nodes for what we ignore validation.
-     * @return Invalid partitions map with following structure: (partId, (nodeId, cacheSize)).
+     * @return Invalid partitions map with following structure:
+     * (partId, (nodeId, cacheSize)).
      * If map is empty validation is successful.
      */
     public Map<Integer, Map<UUID, Long>> validatePartitionsSizes(
@@ -271,8 +291,10 @@
     }
 
     /**
-     * Processes given {@code counter} for partition {@code part} reported by {@code node}.
-     * Populates {@code invalidPartitions} map if existing counter and current {@code counter} are different.
+     * Processes given {@code counter} for partition {@code part}
+     * reported by {@code node}.
+     * Populates {@code invalidPartitions} map if existing counter
+     * and current {@code counter} are different.
      *
      * @param invalidPartitions Invalid partitions map.
      * @param countersAndNodes Current map of counters and nodes by partitions.
@@ -304,7 +326,8 @@
     }
 
     /**
-     * Folds given map of invalid partition states to string representation in the following format:
+     * Folds given map of invalid partition states to string representation
+     * in the following format:
      * Part [id]: [consistentId=value*]
      *
      * Value can be both update counter or cache size.
@@ -329,4 +352,70 @@
 
         return sb.toString();
     }
+
+    /**
+     * Folds given map of invalid partition states to string representation
+     * in the following format:
+     * Part [id]: [consistentId=value meta=[updCnt=value, size=value]]
+     * @param topVer Topology version.
+     * @param invalidPartitionsCounters Invalid partitions counters map.
+     * @param invalidPartitionsSize Invalid partitions size map.
+     * @return value is String in the following format:
+     *     Part [id]: [consistentId=value meta=[updCnt=value, size=value]]
+     */
+    private String fold(
+        AffinityTopologyVersion topVer,
+        Map<Integer, Map<UUID, Long>> invalidPartitionsCounters,
+        Map<Integer, Map<UUID, Long>> invalidPartitionsSize
+    ) {
+        SB sb = new SB();
+
+        NavigableMap<Integer, Map<UUID, IgnitePair<Long>>> sortedAllPartitions = new TreeMap<>();
+
+        Set<Integer> allKeys = new HashSet<>(invalidPartitionsCounters.keySet());
+
+        allKeys.addAll(invalidPartitionsSize.keySet());
+
+        for (Integer p : allKeys) {
+            Map<UUID, IgnitePair<Long>> map = new HashMap<>();
+
+            fillMapForPartition(invalidPartitionsCounters.get(p), map, true);
+            fillMapForPartition(invalidPartitionsSize.get(p), map, false);
+
+            sortedAllPartitions.put(p, map);
+        }
+
+        for (Map.Entry<Integer, Map<UUID, IgnitePair<Long>>> p : sortedAllPartitions.entrySet()) {
+            sb.a("Part ").a(p.getKey()).a(": [");
+            for (Map.Entry<UUID, IgnitePair<Long>> e : p.getValue().entrySet()) {
+                Object consistentId = cctx.discovery().node(topVer, e.getKey()).consistentId();
+                sb.a("consistentId=").a(consistentId).a(" meta=[updCnt=").a(e.getValue().get1())
+                    .a(", size=").a(e.getValue().get2()).a("] ");
+            }
+            sb.a("] ");
+        }
+
+        return sb.toString();
+    }
+
+    /**
+     * Adds pair of counters and size in result map
+     * @param srcMap PartitionCounters or PartitionSize
+     * @param resultMap  result map with pair of values
+     */
+    private void fillMapForPartition(
+        Map<UUID, Long> srcMap,
+        Map<UUID, IgnitePair<Long>> resultMap,
+        boolean isFirst
+    ) {
+        if (srcMap != null) {
+            srcMap.forEach((uuid, val) -> {
+                IgnitePair<Long> pair = resultMap.computeIfAbsent(uuid, u -> new IgnitePair<>());
+                if (isFirst)
+                    pair.set1(val);
+                else
+                    pair.set2(val);
+            });
+        }
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsUpdateCountersAndSizeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsUpdateCountersAndSizeTest.java
new file mode 100644
index 0000000..f7b87d1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsUpdateCountersAndSizeTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.HashSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test correct behaviour of class to validate partitions update counters and
+ * cache sizes during exchange process
+ * {@link org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator}.
+ */
+public class GridCachePartitionsUpdateCountersAndSizeTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String CACHE_NAME = "cacheTest";
+
+    /** Listener for parsing patterns in log. */
+    private ListeningTestLogger testLog;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME)
+            .setBackups(2)
+            .setAffinity(new RendezvousAffinityFunction(false, 32))
+        );
+
+        if (igniteInstanceName.endsWith("0"))
+            cfg.setGridLogger(testLog);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        testLog = new ListeningTestLogger(log);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        testLog.clearListeners();
+
+        super.afterTest();
+    }
+
+    /**
+     * Method that shows the results of partitions state validation for all possible cases:
+     * @param inconsistentCnt - {@code true} if it is expected that partition counters are inconsistent.
+     * @param inconsistentSize -  {@code true} if it is expected that partition sizes are inconsistent.
+     * @throws Exception If failed.
+     */
+    private void startThreeNodesGrid(boolean inconsistentCnt, boolean inconsistentSize) throws Exception {
+        SizeCounterLogListener lsnr = new SizeCounterLogListener();
+
+        IgniteEx ignite = startGrids(3);
+        ignite.cluster().state(ClusterState.ACTIVE);
+        awaitPartitionMapExchange();
+
+        testLog.registerListener(lsnr);
+
+        // Populate cache to increment update counters.
+        for (int i = 0; i < 1000; i++)
+            ignite.cache(CACHE_NAME).put(i, i);
+
+        if (inconsistentCnt) {
+            // Modify update counter for some partition.
+            ignite.cachex(CACHE_NAME).context().topology().localPartitions().get(0).updateCounter(99L);
+        }
+
+        if (inconsistentSize) {
+            // Modify update size for some partition.
+            ignite.cachex(CACHE_NAME).context().topology().localPartitions().get(0).dataStore()
+                .clear(ignite.cachex(CACHE_NAME).context().cacheId());
+        }
+
+        // Trigger exchange.
+        startGrid(3);
+
+        awaitPartitionMapExchange();
+
+        // Nothing should happen (just log error message) and we're still able
+        // to put data to corrupted cache.
+        ignite.cache(CACHE_NAME).put(0, 0);
+
+        if (inconsistentCnt && !inconsistentSize)
+            assertTrue("Counters inconsistent message found", lsnr.checkCnt());
+
+        if (!inconsistentCnt && inconsistentSize)
+            assertTrue("Size inconsistent message found", lsnr.checkSize());
+
+        if (inconsistentCnt && inconsistentSize)
+            assertTrue("Both counters and sizes message found", lsnr.check());
+
+        if (!inconsistentCnt && !inconsistentSize) {
+            assertFalse("Counters and Size inconsistent message found!",
+                lsnr.check() || lsnr.checkCnt() || lsnr.checkSize());
+        }
+    }
+
+    /**
+     *  1. Only partition counters are inconsistent.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testValidationfPartitionCountersInconsistent() throws Exception {
+        startThreeNodesGrid(true, false);
+    }
+
+    /**
+     *  2. Only partition size are inconsistent.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testValidationfPartitionSizeInconsistent() throws Exception {
+        startThreeNodesGrid(false, true);
+    }
+
+    /**
+     *  3. Partition counters and size are inconsistent.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testValidationBothPartitionSizesAndCountersAreInconsistent() throws Exception {
+        startThreeNodesGrid(true, true);
+    }
+
+    /**
+     *  4. No one partition counters and size are inconsistent.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testValidationBothPatririonSixeAndCountersAreConsistent() throws Exception {
+        startThreeNodesGrid(false, false);
+    }
+
+    /**
+     * Overraided class LogListener for find specific patterns.
+     */
+    private static class SizeCounterLogListener extends LogListener {
+        /** Pattern for Counters inconsistent message.*/
+        final Pattern patCnt = Pattern.compile("(\\d)=(\\d{1,2})");
+
+        /** Pattern for Size inconsistent message.*/
+        final Pattern patSz = Pattern.compile("(\\d)=(\\d{1,2})");
+
+        /** Pattern for Both counters and sizes message*/
+        final Pattern patCntSz = Pattern.compile("consistentId=dht.GridCachePartitionsUpdateCountersAndSizeTest" +
+            "\\d meta=\\[updCnt=(\\d{2}), size=(\\d{1,2})");
+
+        /** if finded substring in log for inconsistent counters.*/
+        boolean cn;
+
+        /** if finded substring in log for inconsistent partition size.*/
+        boolean sz;
+
+        /** return true if inconsistent counters.*/
+        public boolean checkCnt() {
+                return cn;
+        }
+
+        /** return true if inconsistent partition size.*/
+        public boolean checkSize() {
+                return sz;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean check() {
+            return cn && sz;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            //no op
+        }
+
+        /** {@inheritDoc} */
+        @Override public void accept(String s) {
+            HashSet<Long> setCnt = new HashSet<>();
+            HashSet<Long> setSize = new HashSet<>();
+
+            if (s.contains("Partitions update counters are inconsistent for Part 0: ")) {
+                Matcher m = patCnt.matcher(s);
+
+                while (m.find())
+                    setCnt.add(Long.parseLong(m.group(2)));
+            }
+
+            else if (s.contains("Partitions cache sizes are inconsistent for Part 0: ")) {
+                Matcher m = patSz.matcher(s);
+                while (m.find())
+                    setSize.add(Long.parseLong(m.group(2)));
+            }
+
+            else if (s.contains("Partitions cache size and update counters are inconsistent for Part 0:")) {
+                Matcher m = patCntSz.matcher(s);
+                while (m.find()) {
+                    setCnt.add(Long.parseLong(m.group(1)));
+                    setSize.add(Long.parseLong(m.group(2)));
+                }
+            }
+
+            if (setCnt.size() == 2 && setCnt.contains(32L) && setCnt.contains(99L))
+                cn = true;
+            if (setSize.size() == 2 && setSize.contains(0L) && setSize.contains(32L))
+                sz = true;
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
index e0589f7..326ed80 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
@@ -107,6 +107,7 @@
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageWriteTimeoutTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheAtomicNearCacheSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidatorSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsUpdateCountersAndSizeTest;
 import org.apache.ignite.internal.processors.cache.expiry.IgniteCacheAtomicLocalExpiryPolicyTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorExternalizableFailedTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorNonSerializableTest;
@@ -215,6 +216,7 @@
         ignoredTests.add(GridCacheLifecycleAwareSelfTest.class);
         ignoredTests.add(IgniteCacheMessageWriteTimeoutTest.class);
         ignoredTests.add(GridCachePartitionsStateValidatorSelfTest.class);
+        ignoredTests.add(GridCachePartitionsUpdateCountersAndSizeTest.class);
         ignoredTests.add(IgniteVariousConnectionNumberTest.class);
         ignoredTests.add(IgniteIncompleteCacheObjectSelfTest.class);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index b66ffaa..d60ef3d 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -160,6 +160,7 @@
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheGlobalLoadTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidationTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidatorSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsUpdateCountersAndSizeTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheGetStoreErrorSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxExceptionSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedStorePutSelfTest;
@@ -329,6 +330,7 @@
         GridTestUtils.addTestIfNeeded(suite, CacheDeferredDeleteQueueTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCachePartitionsStateValidatorSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCachePartitionsStateValidationTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, GridCachePartitionsUpdateCountersAndSizeTest.class, ignoredTests);
 
         suite.addAll(IgniteCacheTcpClientDiscoveryTestSuite.suite(ignoredTests));
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlTest.java b/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlTest.java
index d89afb8..2c6dd72 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlTest.java
@@ -19,7 +19,6 @@
 
 import java.util.Arrays;
 import java.util.List;
-
 import org.junit.Test;
 
 /**