IGNITE-12605: Reset initial update counter value before clearing a partition (#7341)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 51fd9cf..47d2b7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -1091,6 +1091,11 @@
         void resetUpdateCounter();
 
         /**
+         * Reset the initial value of the partition counter.
+         */
+        void resetInitialUpdateCounter();
+
+        /**
          * Partition storage.
          */
         public PartitionMetaStorage<SimpleDataRow> partStorage();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 94f6bad..8241b69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -3043,6 +3043,11 @@
         }
 
         /** {@inheritDoc} */
+        @Override public void resetInitialUpdateCounter() {
+            pCntr.resetInitialCounter();
+        }
+
+        /** {@inheritDoc} */
         @Override public PartitionMetaStorage<SimpleDataRow> partStorage() {
             return null;
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
index b8ac550..d0f0502 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
@@ -103,6 +103,11 @@
     public void reset();
 
     /**
+     * Reset the initial counter value to zero.
+     */
+    public void resetInitialCounter();
+
+    /**
      * @param start Counter.
      * @param delta Delta.
      * @deprecated TODO https://ggsystems.atlassian.net/browse/GG-17396
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
index bdad73d..5ce351f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
@@ -223,6 +223,11 @@
     }
 
     /** {@inheritDoc} */
+    @Override public void resetInitialCounter() {
+        delegate.resetInitialCounter();
+    }
+
+    /** {@inheritDoc} */
     @Override public long initial() {
         return delegate.initial();
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
index 68318eb..9a5f1b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
@@ -86,7 +86,7 @@
      * Initial counter points to last sequential update after WAL recovery.
      * @deprecated TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11794
      */
-    @Deprecated private long initCntr;
+    @Deprecated private volatile long initCntr;
 
     /**
      * @param grp Group.
@@ -346,6 +346,11 @@
         queue.clear();
     }
 
+    /** {@inheritDoc} */
+    @Override public void resetInitialCounter() {
+        initCntr = 0;
+    }
+
     /**
      * Update counter task. Update from start value by delta value.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
index 4945358..1545ae76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
@@ -38,7 +38,7 @@
     /**
      * Initial counter is set to update with max sequence number after WAL recovery.
      */
-    private long initCntr;
+    private volatile long initCntr;
 
     /** */
     private final CacheGroupContext grp;
@@ -127,6 +127,11 @@
     }
 
     /** {@inheritDoc} */
+    @Override public void resetInitialCounter() {
+        initCntr = 0;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
         if (this == o)
             return true;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index 13b7811..d767bfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -711,6 +711,10 @@
         if (!reinitialized)
             return;
 
+        // Reset the initial update counter value to prevent historical rebalancing on this partition.
+        if (grp.persistenceEnabled())
+            store.resetInitialUpdateCounter();
+
         // Make sure current rebalance future is finished before start clearing
         // to avoid clearing currently rebalancing partition (except "initial" dummy rebalance).
         if (clearingRequested) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index c215939..33fc753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -2792,6 +2792,21 @@
             }
         }
 
+        /** {@inheritDoc} */
+        @Override public void resetInitialUpdateCounter() {
+            try {
+                CacheDataStore delegate0 = init0(true);
+
+                if (delegate0 == null)
+                    return;
+
+                delegate0.resetInitialUpdateCounter();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
         @Override public PartitionMetaStorage partStorage() {
             return partStorage;
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 087ac6f..597ff2b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -36,6 +36,7 @@
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -55,6 +56,7 @@
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -498,6 +500,71 @@
     }
 
     /**
+     * Check that historical rebalance doesn't start on the cleared partition when some cluster node restarts.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceRestartWithNodeBlinking() throws Exception {
+        int entryCnt = PARTS_CNT * 200;
+
+        // Start 3 nodes cluster:
+        //  node0 - coordinator (main supplier for historical rebalance)
+        //  node1 - some node that will generate NODE_LEFT/NODE_JOINED events
+        //  node2 - historical rebalance demander
+        IgniteEx crd = (IgniteEx)startGridsMultiThreaded(3);
+
+        crd.cluster().state(ClusterState.ACTIVE);
+        crd.cluster().baselineAutoAdjustEnabled(false);
+
+        IgniteCache<Integer, String> cache0 = crd.cache(CACHE_NAME);
+
+        for (int i = 0; i < entryCnt / 2; i++)
+            cache0.put(i, String.valueOf(i));
+
+        forceCheckpoint();
+
+        stopGrid(2);
+
+        for (int i = entryCnt / 2; i < entryCnt; i++)
+            cache0.put(i, String.valueOf(i));
+
+        blockMessagePredicate = (node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+                return msg0.groupId() == CU.cacheId(CACHE_NAME) && msg0.partitions().size() == PARTS_CNT;
+            }
+
+            return false;
+        };
+
+        startGrid(2);
+
+        TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(grid(2));
+
+        // Wait until node2 starts historical rebalancning.
+        spi2.waitForBlocked(1);
+
+        // Interruption of rebalancing by NODE_LEFT event, historical supplier should not be provided.
+        stopGrid(1);
+
+        // Wait until the full rebalance begins.
+        spi2.waitForBlocked(2);
+
+        // Interrupting it again by NODE_JOINED and get a historical supplier again.
+        startGrid(1);
+
+        spi2.stopBlock();
+
+        awaitPartitionMapExchange();
+
+        // Verify data on demander node.
+        for (int i = 0; i < entryCnt; i++)
+            assertEquals(String.valueOf(i), grid(2).cache(CACHE_NAME).get(i));
+    }
+
+    /**
      *
      */
     private static class IndexedObject {