IGNITE-12605: Reset initial update counter value before clearing a partition (#7341)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 51fd9cf..47d2b7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -1091,6 +1091,11 @@
void resetUpdateCounter();
/**
+ * Reset the initial value of the partition counter.
+ */
+ void resetInitialUpdateCounter();
+
+ /**
* Partition storage.
*/
public PartitionMetaStorage<SimpleDataRow> partStorage();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 94f6bad..8241b69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -3043,6 +3043,11 @@
}
/** {@inheritDoc} */
+ @Override public void resetInitialUpdateCounter() {
+ pCntr.resetInitialCounter();
+ }
+
+ /** {@inheritDoc} */
@Override public PartitionMetaStorage<SimpleDataRow> partStorage() {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
index b8ac550..d0f0502 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
@@ -103,6 +103,11 @@
public void reset();
/**
+ * Reset the initial counter value to zero.
+ */
+ public void resetInitialCounter();
+
+ /**
* @param start Counter.
* @param delta Delta.
* @deprecated TODO https://ggsystems.atlassian.net/browse/GG-17396
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
index bdad73d..5ce351f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
@@ -223,6 +223,11 @@
}
/** {@inheritDoc} */
+ @Override public void resetInitialCounter() {
+ delegate.resetInitialCounter();
+ }
+
+ /** {@inheritDoc} */
@Override public long initial() {
return delegate.initial();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
index 68318eb..9a5f1b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
@@ -86,7 +86,7 @@
* Initial counter points to last sequential update after WAL recovery.
* @deprecated TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11794
*/
- @Deprecated private long initCntr;
+ @Deprecated private volatile long initCntr;
/**
* @param grp Group.
@@ -346,6 +346,11 @@
queue.clear();
}
+ /** {@inheritDoc} */
+ @Override public void resetInitialCounter() {
+ initCntr = 0;
+ }
+
/**
* Update counter task. Update from start value by delta value.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
index 4945358..1545ae76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
@@ -38,7 +38,7 @@
/**
* Initial counter is set to update with max sequence number after WAL recovery.
*/
- private long initCntr;
+ private volatile long initCntr;
/** */
private final CacheGroupContext grp;
@@ -127,6 +127,11 @@
}
/** {@inheritDoc} */
+ @Override public void resetInitialCounter() {
+ initCntr = 0;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index 13b7811..d767bfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -711,6 +711,10 @@
if (!reinitialized)
return;
+ // Reset the initial update counter value to prevent historical rebalancing on this partition.
+ if (grp.persistenceEnabled())
+ store.resetInitialUpdateCounter();
+
// Make sure current rebalance future is finished before start clearing
// to avoid clearing currently rebalancing partition (except "initial" dummy rebalance).
if (clearingRequested) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index c215939..33fc753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -2792,6 +2792,21 @@
}
}
+ /** {@inheritDoc} */
+ @Override public void resetInitialUpdateCounter() {
+ try {
+ CacheDataStore delegate0 = init0(true);
+
+ if (delegate0 == null)
+ return;
+
+ delegate0.resetInitialUpdateCounter();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
@Override public PartitionMetaStorage partStorage() {
return partStorage;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 087ac6f..597ff2b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -36,6 +36,7 @@
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -55,6 +56,7 @@
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
@@ -498,6 +500,71 @@
}
/**
+ * Check that historical rebalance doesn't start on the cleared partition when some cluster node restarts.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceRestartWithNodeBlinking() throws Exception {
+ int entryCnt = PARTS_CNT * 200;
+
+ // Start 3 nodes cluster:
+ // node0 - coordinator (main supplier for historical rebalance)
+ // node1 - some node that will generate NODE_LEFT/NODE_JOINED events
+ // node2 - historical rebalance demander
+ IgniteEx crd = (IgniteEx)startGridsMultiThreaded(3);
+
+ crd.cluster().state(ClusterState.ACTIVE);
+ crd.cluster().baselineAutoAdjustEnabled(false);
+
+ IgniteCache<Integer, String> cache0 = crd.cache(CACHE_NAME);
+
+ for (int i = 0; i < entryCnt / 2; i++)
+ cache0.put(i, String.valueOf(i));
+
+ forceCheckpoint();
+
+ stopGrid(2);
+
+ for (int i = entryCnt / 2; i < entryCnt; i++)
+ cache0.put(i, String.valueOf(i));
+
+ blockMessagePredicate = (node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+ return msg0.groupId() == CU.cacheId(CACHE_NAME) && msg0.partitions().size() == PARTS_CNT;
+ }
+
+ return false;
+ };
+
+ startGrid(2);
+
+ TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(grid(2));
+
+ // Wait until node2 starts historical rebalancning.
+ spi2.waitForBlocked(1);
+
+ // Interruption of rebalancing by NODE_LEFT event, historical supplier should not be provided.
+ stopGrid(1);
+
+ // Wait until the full rebalance begins.
+ spi2.waitForBlocked(2);
+
+ // Interrupting it again by NODE_JOINED and get a historical supplier again.
+ startGrid(1);
+
+ spi2.stopBlock();
+
+ awaitPartitionMapExchange();
+
+ // Verify data on demander node.
+ for (int i = 0; i < entryCnt; i++)
+ assertEquals(String.valueOf(i), grid(2).cache(CACHE_NAME).get(i));
+ }
+
+ /**
*
*/
private static class IndexedObject {