IGNITE-11797 Fixed partition consistency issues for atomic and mixed tx-atomic cache groups. - Fixes #7315.
Signed-off-by: Aleksei Scherbakov <ascherbakov@apache.org>
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/JmhPartitionUpdateCounterBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/JmhPartitionUpdateCounterBenchmark.java
index 9aa0102..c44d938 100644
--- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/JmhPartitionUpdateCounterBenchmark.java
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/JmhPartitionUpdateCounterBenchmark.java
@@ -20,8 +20,7 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.internal.processors.cache.PartitionTxUpdateCounterImpl;
-import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterTrackingImpl;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -39,7 +38,7 @@
import org.openjdk.jmh.runner.options.OptionsBuilder;
/**
- * Benchmarks {@link PartitionTxUpdateCounterImpl} class.
+ * Benchmarks {@link PartitionUpdateCounterTrackingImpl} class.
*/
@State(Scope.Benchmark)
@Fork(1)
@@ -65,7 +64,7 @@
private final AtomicLong reservedCntr = new AtomicLong();
/** Partition update counter. */
- private final PartitionUpdateCounter partCntr = new PartitionTxUpdateCounterImpl();
+ private final PartitionUpdateCounterTrackingImpl partCntr = new PartitionUpdateCounterTrackingImpl(null);
/**
* Setup.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 6082789..fe45ae1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -66,7 +66,6 @@
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -182,9 +181,6 @@
/** Statistics holder to track IO operations for data pages. */
private final IoStatisticsHolder statHolderData;
- /** */
- private volatile boolean hasAtomicCaches;
-
/** Cache group metrics. */
private final CacheGroupMetricsImpl metrics;
@@ -362,9 +358,6 @@
if (!drEnabled && cctx.isDrEnabled())
drEnabled = true;
-
- if (!hasAtomicCaches)
- hasAtomicCaches = cctx.config().getAtomicityMode() == ATOMIC;
}
/**
@@ -1292,13 +1285,6 @@
}
/**
- * @return {@code True} if group has atomic caches.
- */
- public boolean hasAtomicCaches() {
- return hasAtomicCaches;
- }
-
- /**
* @return Metrics.
*/
public CacheGroupMetricsImpl metrics() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 44b15ec..94f6bad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1475,14 +1475,13 @@
this.partId = partId;
this.rowStore = rowStore;
this.dataTree = dataTree;
- if (grp.mvccEnabled())
- pCntr = new PartitionMvccTxUpdateCounterImpl();
- else if (grp.hasAtomicCaches() || !grp.persistenceEnabled())
- pCntr = new PartitionAtomicUpdateCounterImpl();
- else {
- pCntr = ctx.logger(PartitionTxUpdateCounterDebugWrapper.class).isDebugEnabled() ?
- new PartitionTxUpdateCounterDebugWrapper(grp, partId) : new PartitionTxUpdateCounterImpl();
- }
+
+ PartitionUpdateCounter delegate = grp.mvccEnabled() ? new PartitionUpdateCounterMvccImpl(grp) :
+ grp.persistenceEnabled() ? new PartitionUpdateCounterTrackingImpl(grp) :
+ new PartitionUpdateCounterVolatileImpl(grp);
+
+ pCntr = ctx.logger(PartitionUpdateCounterDebugWrapper.class).isDebugEnabled() ?
+ new PartitionUpdateCounterDebugWrapper(partId, delegate) : delegate;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
index 112a110..b8ac550 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
@@ -27,7 +27,7 @@
<ol>
* <li><b>Low water mark (LWM)</b> or update counter - lowest applied sequential update number.</li>
* <li><b>High water mark (HWM)</b> or reservation counter - highest seen but unapplied yet update number.</li>
- * <li>Out-of-order applied updates in range between LWM and HWM.</li>
+ * <li>Out-of-order applied updates in range between LWM and HWM</li>
* </ol>
*/
public interface PartitionUpdateCounter extends Iterable<long[]> {
@@ -133,4 +133,9 @@
* @return Iterator for pairs [start, range] for each out-of-order update in the update counter sequence.
*/
@Override public Iterator<long[]> iterator();
+
+ /**
+ * @return Cache group context.
+ */
+ public CacheGroupContext context();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterDebugWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
similarity index 75%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterDebugWrapper.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
index 3bf92aa..bdad73d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterDebugWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Iterator;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.GridLongList;
@@ -24,31 +25,35 @@
import org.jetbrains.annotations.Nullable;
/**
- * Update counter implementation useful for debugging.
+ * Update counter wrapper with logging capabilities.
*/
-public class PartitionTxUpdateCounterDebugWrapper extends PartitionTxUpdateCounterImpl {
+public class PartitionUpdateCounterDebugWrapper implements PartitionUpdateCounter {
/** */
- private IgniteLogger log;
+ private final IgniteLogger log;
/** */
- private int partId;
+ private final int partId;
/** */
- private CacheGroupContext grp;
+ private final CacheGroupContext grp;
+
+ /** */
+ private final PartitionUpdateCounter delegate;
/**
- * @param grp Group.
* @param partId Part id.
+ * @param delegate Delegate.
*/
- public PartitionTxUpdateCounterDebugWrapper(CacheGroupContext grp, int partId) {
- this.log = grp.shared().logger(getClass());
+ public PartitionUpdateCounterDebugWrapper(int partId, PartitionUpdateCounter delegate) {
this.partId = partId;
- this.grp = grp;
+ this.grp = delegate.context();
+ this.log = grp.shared().logger(getClass());
+ this.delegate = delegate;
}
/** {@inheritDoc} */
@Override public void init(long initUpdCntr, @Nullable byte[] cntrUpdData) {
- super.init(initUpdCntr, cntrUpdData);
+ delegate.init(initUpdCntr, cntrUpdData);
log.debug("[op=init" +
", grpId=" + grp.groupId() +
@@ -74,7 +79,7 @@
", before=" + toString());
try {
- super.updateInitial(start, delta);
+ delegate.updateInitial(start, delta);
}
finally {
log.debug(sb.a(", after=" + toString() +
@@ -92,7 +97,7 @@
", before=" + toString());
try {
- return super.next();
+ return delegate.next();
}
finally {
log.debug(sb.a(", after=" + toString() +
@@ -111,7 +116,7 @@
", before=" + toString());
try {
- return super.next();
+ return delegate.next();
}
finally {
log.debug(sb.a(", after=" + toString() +
@@ -130,7 +135,7 @@
", before=" + toString());
try {
- super.update(val);
+ delegate.update(val);
}
finally {
log.debug(sb.a(", after=" + toString() +
@@ -149,7 +154,7 @@
']');
try {
- return super.finalizeUpdateCounters();
+ return delegate.finalizeUpdateCounters();
}
finally {
log.debug(sb.a(", after=" + toString() +
@@ -168,7 +173,7 @@
", before=" + toString());
try {
- return super.reserve(delta);
+ return delegate.reserve(delta);
}
finally {
log.debug(sb.a(", after=" + toString() +
@@ -189,7 +194,7 @@
boolean updated = false;
try {
- updated = super.update(start, delta);
+ updated = delegate.update(start, delta);
}
finally {
log.debug(sb.a(", after=" + toString() +
@@ -199,6 +204,7 @@
return updated;
}
+ /** {@inheritDoc} */
@Override public synchronized void reset() {
SB sb = new SB();
@@ -208,11 +214,51 @@
", before=" + toString());
try {
- super.reset();
+ delegate.reset();
}
finally {
log.debug(sb.a(", after=" + toString() +
']').toString());
}
}
+
+ /** {@inheritDoc} */
+ @Override public long initial() {
+ return delegate.initial();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long get() {
+ return delegate.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long reserved() {
+ return delegate.reserved();
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public byte[] getBytes() {
+ return delegate.getBytes();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean sequential() {
+ return delegate.sequential();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean empty() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<long[]> iterator() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheGroupContext context() {
+ return delegate.context();
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionMvccTxUpdateCounterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java
similarity index 83%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionMvccTxUpdateCounterImpl.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java
index 2f5d77a..75cfa9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionMvccTxUpdateCounterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java
@@ -20,7 +20,14 @@
/**
* Update counter implementation for MVCC mode.
*/
-public class PartitionMvccTxUpdateCounterImpl extends PartitionTxUpdateCounterImpl {
+public class PartitionUpdateCounterMvccImpl extends PartitionUpdateCounterTrackingImpl {
+ /**
+ * @param grp Group.
+ */
+ public PartitionUpdateCounterMvccImpl(CacheGroupContext grp) {
+ super(grp);
+ }
+
/** {@inheritDoc} */
@Override public long reserve(long delta) {
return next(delta);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
similarity index 92%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
index 06692d9..68318eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
@@ -36,14 +36,16 @@
import org.jetbrains.annotations.Nullable;
/**
- * Update counter implementation used for transactional cache groups in persistent mode.
+ * Update counter implementation used for cache groups in persistent mode for both tx and atomic caches.
* <p>
- * Implements new partition update counter flow to avoid situations when:
+ * Implements the partition update counter flow to avoid situations when:
* <ol>
* <li>update counter could be incremented and persisted while corresponding update is not recorded to WAL.</li>
- * <li>update counter could be prematurely incremented causing missed rebalancing.</li>
+ * <li>update counter could be updated out of order.</li>
* </ol>
- * All these situations are sources of partitions desync.
+ * All these situations are sources of partitions desync in case of node failure under load.
+ * <p>
+ * The main idea is to track updates received out-of-order to ensure valid state of the update counter for rebalancing.
* <p>
* Below a short description of new flow:
* <ol>
@@ -56,7 +58,7 @@
* logged to WAL using {@link RollbackRecord} for further recovery purposes.</li>
* </ol>
*/
-public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
+public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounter {
/**
* Max allowed missed updates. Overflow will trigger critical failure handler to prevent OOM.
*/
@@ -77,12 +79,22 @@
/** */
private boolean first = true;
+ /** */
+ protected final CacheGroupContext grp;
+
/**
* Initial counter points to last sequential update after WAL recovery.
* @deprecated TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11794
*/
@Deprecated private long initCntr;
+ /**
+ * @param grp Group.
+ */
+ public PartitionUpdateCounterTrackingImpl(CacheGroupContext grp) {
+ this.grp = grp;
+ }
+
/** {@inheritDoc} */
@Override public void init(long initUpdCntr, @Nullable byte[] cntrUpdData) {
cntr.set(initUpdCntr);
@@ -393,7 +405,7 @@
if (start != item.start)
return false;
- return (delta != item.delta);
+ return delta == item.delta;
}
}
@@ -404,7 +416,7 @@
if (o == null || getClass() != o.getClass())
return false;
- PartitionTxUpdateCounterImpl cntr = (PartitionTxUpdateCounterImpl)o;
+ PartitionUpdateCounterTrackingImpl cntr = (PartitionUpdateCounterTrackingImpl)o;
if (!queue.equals(cntr.queue))
return false;
@@ -432,4 +444,9 @@
return "Counter [lwm=" + get() + ", holes=" + queue +
", maxApplied=" + highestAppliedCounter() + ", hwm=" + reserveCntr.get() + ']';
}
+
+ /** {@inheritDoc} */
+ @Override public CacheGroupContext context() {
+ return grp;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionAtomicUpdateCounterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
similarity index 83%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionAtomicUpdateCounterImpl.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
index a672a10..4945358 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionAtomicUpdateCounterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
@@ -24,11 +24,14 @@
import org.jetbrains.annotations.Nullable;
/**
- * Partition update counter for non-tx scenarios without support for tracking missed updates.
- * Currently used for atomic, mixed tx-atomic and in-memory cache groups.
- * TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11797
+ * Partition update counter for volatile caches.
+ * <p>
+ * Doesn't track gaps in update sequence because it's not needed for volatile caches
+ * (because their state is lost on node failure).
+ * <p>
+ * In this mode LWM and HWM are non distinguishable.
*/
-public class PartitionAtomicUpdateCounterImpl implements PartitionUpdateCounter {
+public class PartitionUpdateCounterVolatileImpl implements PartitionUpdateCounter {
/** Counter of applied updates in partition. */
private final AtomicLong cntr = new AtomicLong();
@@ -37,6 +40,16 @@
*/
private long initCntr;
+ /** */
+ private final CacheGroupContext grp;
+
+ /**
+ * @param grp Group.
+ */
+ public PartitionUpdateCounterVolatileImpl(CacheGroupContext grp) {
+ this.grp = grp;
+ }
+
/** {@inheritDoc} */
@Override public void init(long initUpdCntr, @Nullable byte[] cntrUpdData) {
cntr.set(initUpdCntr);
@@ -120,7 +133,7 @@
if (o == null || getClass() != o.getClass())
return false;
- PartitionAtomicUpdateCounterImpl cntr = (PartitionAtomicUpdateCounterImpl)o;
+ PartitionUpdateCounterVolatileImpl cntr = (PartitionUpdateCounterVolatileImpl)o;
return this.cntr.get() == cntr.cntr.get();
}
@@ -144,4 +157,9 @@
@Override public String toString() {
return "Counter [init=" + initCntr + ", val=" + get() + ']';
}
+
+ /** {@inheritDoc} */
+ @Override public CacheGroupContext context() {
+ return grp;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index b922c35..c215939 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1364,8 +1364,7 @@
if (entry.partitionCounter() > from && entry.partitionCounter() <= to) {
// Partition will be marked as done for current entry on next iteration.
- if (++rebalancedCntrs[idx] == to ||
- entry.partitionCounter() == to && grp.hasAtomicCaches())
+ if (++rebalancedCntrs[idx] == to)
donePart = entry.partitionId();
next = entry;
@@ -1406,8 +1405,10 @@
if (rebalancedCntrs[idx] == partMap.updateCounterAt(idx)) {
if (log.isDebugEnabled()) {
- log.debug("Partition done [partId=" + donePart +
- " from=" + from + " to=" + to + ']');
+ log.debug("Partition done [grpId=" + grp.groupId() +
+ ", partId=" + donePart +
+ ", from=" + from +
+ ", to=" + to + ']');
}
doneParts.add(rbRec.partitionId()); // Add to done set immediately.
@@ -1416,8 +1417,20 @@
}
}
- assert entryIt != null || doneParts.size() == partMap.size() :
- "Reached end of WAL but not all partitions are done";
+ if (entryIt == null && doneParts.size() != partMap.size()) {
+ for (int i = 0; i < partMap.size(); i++) {
+ int p = partMap.partitionAt(i);
+
+ if (!doneParts.contains(p)) {
+ log.warning("Some partition entries were missed during historical rebalance [grp=" + grp + ", part=" + p + ", missed=" +
+ (partMap.updateCounterAt(i) - rebalancedCntrs[i]) + ']');
+
+ doneParts.add(p);
+ }
+ }
+
+ return;
+ }
}
}
}
diff --git a/modules/core/src/test/config/log4j-test.xml b/modules/core/src/test/config/log4j-test.xml
index 370a50a..0df622f 100755
--- a/modules/core/src/test/config/log4j-test.xml
+++ b/modules/core/src/test/config/log4j-test.xml
@@ -101,7 +101,7 @@
Uncomment to enable debugging of partition counters.
-->
<!--
- <category name="org.apache.ignite.internal.processors.cache.PartitionTxUpdateCounterDebugWrapper">
+ <category name="org.apache.ignite.internal.processors.cache.PartitionUpdateCounterDebugWrapper">
<level value="DEBUG"/>
</category>
-->
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
index b8f4b98..564a728 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
@@ -31,7 +31,7 @@
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.PartitionTxUpdateCounterImpl;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterTrackingImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -146,7 +146,7 @@
assertNotNull(part);
- PartitionTxUpdateCounterImpl cntr0 = (PartitionTxUpdateCounterImpl)part.dataStore().partUpdateCounter();
+ PartitionUpdateCounterTrackingImpl cntr0 = (PartitionUpdateCounterTrackingImpl)part.dataStore().partUpdateCounter();
AtomicLong cntr = U.field(cntr0, "cntr");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionMvccTxUpdateCounterImpl.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.java
similarity index 60%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionMvccTxUpdateCounterImpl.java
copy to modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.java
index 2f5d77a..2eff7ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionMvccTxUpdateCounterImpl.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.java
@@ -15,19 +15,15 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.cache;
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
/**
- * Update counter implementation for MVCC mode.
+ * Test partitions consistency in various scenarios when all rebalance is historical.
*/
-public class PartitionMvccTxUpdateCounterImpl extends PartitionTxUpdateCounterImpl {
- /** {@inheritDoc} */
- @Override public long reserve(long delta) {
- return next(delta);
- }
-
- /** {@inheritDoc} */
- @Override public long reserved() {
- return get();
- }
+@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+public class AtomicPartitionCounterStateConsistencyHistoryRebalanceTest extends AtomicPartitionCounterStateConsistencyTest {
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyTest.java
new file mode 100644
index 0000000..f3b9553
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.X;
+import org.junit.Ignore;
+
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ * Test partitions consistency for atomic caches trying to reuse tx scenarios as much as possible.
+ */
+public class AtomicPartitionCounterStateConsistencyTest extends TxPartitionCounterStateConsistencyTest {
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration<Object, Object> cacheConfiguration(String name) {
+ return super.cacheConfiguration(name).setAtomicityMode(ATOMIC);
+ }
+
+ /** {@inheritDoc} */
+ @Ignore
+ @Override public void testPartitionConsistencyDuringRebalanceAndConcurrentUpdates_SameAffinityPME() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Ignore
+ @Override public void testPartitionConsistencyDuringRebalanceAndConcurrentUpdates_TxDuringPME() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Ignore
+ @Override public void testPartitionConsistencyDuringRebalanceAndConcurrentUpdates_LateAffinitySwitch() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteInternalFuture<?> doRandomUpdates(Random r, Ignite near, List<Integer> primaryKeys,
+ IgniteCache<Object, Object> cache, BooleanSupplier stopClo) throws Exception {
+ LongAdder puts = new LongAdder();
+ LongAdder removes = new LongAdder();
+
+ final int max = 100;
+
+ return multithreadedAsync(() -> {
+ while (!stopClo.getAsBoolean()) {
+ int rangeStart = r.nextInt(primaryKeys.size() - max);
+ int range = 5 + r.nextInt(max - 5);
+
+ List<Integer> keys = primaryKeys.subList(rangeStart, rangeStart + range);
+
+ final boolean batch = r.nextBoolean();
+
+ try {
+ List<Integer> insertedKeys = new ArrayList<>();
+ List<Integer> rmvKeys = new ArrayList<>();
+
+ for (Integer key : keys) {
+ if (!batch)
+ cache.put(key, key);
+
+ insertedKeys.add(key);
+
+ puts.increment();
+
+ boolean rmv = r.nextFloat() < 0.5;
+ if (rmv) {
+ key = insertedKeys.get(r.nextInt(insertedKeys.size()));
+
+ if (!batch)
+ cache.remove(key);
+ else
+ rmvKeys.add(key);
+
+ removes.increment();
+ }
+ }
+
+ if (batch) {
+ cache.putAll(insertedKeys.stream().collect(toMap(k -> k, v -> v, (k, v) -> v, LinkedHashMap::new)));
+ cache.removeAll(new LinkedHashSet<>(rmvKeys));
+ }
+ }
+ catch (Exception e) {
+ assertTrue(X.getFullStackTrace(e), X.hasCause(e, ClusterTopologyException.class));
+ }
+ }
+
+ log.info("ATOMIC: puts=" + puts.sum() + ", removes=" + removes.sum() + ", size=" + cache.size());
+
+ }, Runtime.getRuntime().availableProcessors() * 2, "tx-update-thread");
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/PartitionUpdateCounterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/PartitionUpdateCounterTest.java
index 3ad0049..688a6aa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/PartitionUpdateCounterTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/PartitionUpdateCounterTest.java
@@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.NavigableMap;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -39,10 +40,11 @@
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.PartitionAtomicUpdateCounterImpl;
-import org.apache.ignite.internal.processors.cache.PartitionTxUpdateCounterImpl;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterVolatileImpl;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterTrackingImpl;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -105,7 +107,7 @@
for (int i = 0; i < 100; i++) {
Collections.shuffle(tmp);
- PartitionUpdateCounter pc0 = new PartitionTxUpdateCounterImpl();
+ PartitionUpdateCounter pc0 = new PartitionUpdateCounterTrackingImpl(null);
for (int[] pair : tmp)
pc0.update(pair[0], pair[1]);
@@ -128,7 +130,7 @@
*/
@Test
public void testStaleUpdate() {
- PartitionUpdateCounter pc = new PartitionTxUpdateCounterImpl();
+ PartitionUpdateCounter pc = new PartitionUpdateCounterTrackingImpl(null);
assertTrue(pc.update(0, 1));
assertFalse(pc.update(0, 1));
@@ -147,7 +149,7 @@
*/
@Test
public void testMixedModeMultithreaded() throws Exception {
- PartitionUpdateCounter pc = new PartitionTxUpdateCounterImpl();
+ PartitionUpdateCounter pc = new PartitionUpdateCounterTrackingImpl(null);
AtomicBoolean stop = new AtomicBoolean();
@@ -197,10 +199,10 @@
*/
@Test
public void testMaxGaps() {
- PartitionUpdateCounter pc = new PartitionTxUpdateCounterImpl();
+ PartitionUpdateCounter pc = new PartitionUpdateCounterTrackingImpl(null);
int i;
- for (i = 1; i <= PartitionTxUpdateCounterImpl.MAX_MISSED_UPDATES; i++)
+ for (i = 1; i <= PartitionUpdateCounterTrackingImpl.MAX_MISSED_UPDATES; i++)
pc.update(i * 3, i * 3 + 1);
i++;
@@ -219,7 +221,7 @@
*/
@Test
public void testFoldIntermediateUpdates() {
- PartitionUpdateCounter pc = new PartitionTxUpdateCounterImpl();
+ PartitionUpdateCounter pc = new PartitionUpdateCounterTrackingImpl(null);
pc.update(0, 59);
@@ -245,7 +247,7 @@
*/
@Test
public void testOutOfOrderUpdatesIterator() {
- PartitionUpdateCounter pc = new PartitionTxUpdateCounterImpl();
+ PartitionUpdateCounter pc = new PartitionUpdateCounterTrackingImpl(null);
pc.update(67, 3);
@@ -278,7 +280,7 @@
*/
@Test
public void testOverlap() {
- PartitionUpdateCounter pc = new PartitionTxUpdateCounterImpl();
+ PartitionUpdateCounter pc = new PartitionUpdateCounterTrackingImpl(null);
assertTrue(pc.update(13, 3));
@@ -302,7 +304,7 @@
/** */
@Test
public void testAtomicUpdateCounterMultithreaded() throws Exception {
- PartitionUpdateCounter cntr = new PartitionAtomicUpdateCounterImpl();
+ PartitionUpdateCounter cntr = new PartitionUpdateCounterVolatileImpl(null);
AtomicInteger id = new AtomicInteger();
@@ -345,6 +347,29 @@
}
/**
+ *
+ */
+ @Test
+ public void testGapsSerialization() {
+ PartitionUpdateCounter pc = new PartitionUpdateCounterTrackingImpl(null);
+
+ Random r = new Random();
+
+ for (int c = 1; c < 500; c++)
+ pc.update(c * 4, r.nextInt(3) + 1);
+
+ final byte[] bytes = pc.getBytes();
+
+ PartitionUpdateCounter pc2 = new PartitionUpdateCounterTrackingImpl(null);
+ pc2.init(0, bytes);
+
+ NavigableMap q0 = U.field(pc, "queue");
+ NavigableMap q1 = U.field(pc2, "queue");
+
+ assertEquals(q0, q1);
+ }
+
+ /**
* @param mode Mode.
*/
private void testWithPersistentNode(CacheAtomicityMode mode) throws Exception {
@@ -379,18 +404,7 @@
PartitionUpdateCounter cntr = counter(0, grid0.name());
- switch (mode) {
- case ATOMIC:
- assertTrue(cntr instanceof PartitionAtomicUpdateCounterImpl);
- break;
-
- case TRANSACTIONAL:
- assertTrue(cntr instanceof PartitionTxUpdateCounterImpl);
- break;
-
- default:
- fail(mode.toString());
- }
+ assertTrue(cntr instanceof PartitionUpdateCounterTrackingImpl);
assertEquals(cntr.initial(), cntr.get());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
index ce23983..54dece3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
@@ -17,111 +17,13 @@
package org.apache.ignite.internal.processors.cache.transactions;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.IntStream;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.testframework.junits.WithSystemProperty;
-import org.junit.Test;
-import static java.util.stream.Collectors.toList;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
/**
- * Test partitions consistency in various scenarios.
+ * Test partitions consistency in various scenarios when all rebalance is historical.
*/
@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
public class TxPartitionCounterStateConsistencyHistoryRebalanceTest extends TxPartitionCounterStateConsistencyTest {
- /** */
- @Test
- public void testConsistencyAfterBaselineNodeStopAndRemoval() throws Exception {
- doTestConsistencyAfterBaselineNodeStopAndRemoval(0);
- }
-
- /** */
- @Test
- public void testConsistencyAfterBaselineNodeStopAndRemoval_WithRestart() throws Exception {
- doTestConsistencyAfterBaselineNodeStopAndRemoval(1);
- }
-
- /** */
- @Test
- public void testConsistencyAfterBaselineNodeStopAndRemoval_WithRestartAndSkipCheckpoint() throws Exception {
- doTestConsistencyAfterBaselineNodeStopAndRemoval(2);
- }
-
- /**
- * Test a scenario when partition is evicted and owned again with non-zero initial and current counters.
- * Such partition should not be historically rebalanced, otherwise only subset of data will be rebalanced.
- */
- private void doTestConsistencyAfterBaselineNodeStopAndRemoval(int mode) throws Exception {
- backups = 2;
-
- final int srvNodes = SERVER_NODES + 1;
-
- IgniteEx prim = startGrids(srvNodes);
-
- prim.cluster().active(true);
-
- for (int p = 0; p < partitions(); p++) {
- prim.cache(DEFAULT_CACHE_NAME).put(p, p);
- prim.cache(DEFAULT_CACHE_NAME).put(p + PARTS_CNT, p * 2);
- }
-
- forceCheckpoint();
-
- stopGrid(1); // topVer=5,0
-
- awaitPartitionMapExchange();
-
- resetBaselineTopology(); // topVer=5,1
-
- awaitPartitionMapExchange();
-
- forceCheckpoint(); // Will force GridCacheDataStore.exists=true mode after part store re-creation.
-
- startGrid(1); // topVer=6,0
-
- awaitPartitionMapExchange();
-
- resetBaselineTopology(); // topVer=6,1
-
- awaitPartitionMapExchange(true, true, null);
-
- // Create counter difference with evicted partition so it's applicable for historical rebalancing.
- for (int p = 0; p < partitions(); p++)
- prim.cache(DEFAULT_CACHE_NAME).put(p + PARTS_CNT, p * 2 + 1);
-
- stopGrid(1); // topVer=7,0
-
- if (mode > 0) {
- stopGrid(mode == 1, grid(2).name());
- stopGrid(mode == 1, grid(3).name());
-
- startGrid(2);
- startGrid(3);
- }
-
- prim.context().cache().context().exchange().rebalanceDelay(500);
-
- Random r = new Random();
-
- AtomicBoolean stop = new AtomicBoolean();
-
- final IgniteInternalFuture<?> fut = doRandomUpdates(r,
- prim,
- IntStream.range(0, 1000).boxed().collect(toList()),
- prim.cache(DEFAULT_CACHE_NAME),
- stop::get);
-
- resetBaselineTopology(); // topVer=7,1
-
- awaitPartitionMapExchange();
-
- stop.set(true);
- fut.get();
-
- assertPartitionsSame(idleVerify(prim, DEFAULT_CACHE_NAME));
- }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
index 1c8fe94..4c9521e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
@@ -38,6 +38,7 @@
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -72,7 +73,11 @@
import org.apache.ignite.transactions.TransactionRollbackException;
import org.junit.Test;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -119,14 +124,17 @@
LinkedList<T2<Integer, GridCacheOperation>> ops = new LinkedList<>();
+ final CacheAtomicityMode mode = atomicityMode(cache);
+ final GridCacheOperation op = mode == ATOMIC ? UPDATE : CREATE;
+
cache.put(keys.get(0), new TestVal(keys.get(0)));
- ops.add(new T2<>(keys.get(0), GridCacheOperation.CREATE));
+ ops.add(new T2<>(keys.get(0), op));
cache.put(keys.get(1), new TestVal(keys.get(1)));
- ops.add(new T2<>(keys.get(1), GridCacheOperation.CREATE));
+ ops.add(new T2<>(keys.get(1), op));
cache.put(keys.get(2), new TestVal(keys.get(2)));
- ops.add(new T2<>(keys.get(2), GridCacheOperation.CREATE));
+ ops.add(new T2<>(keys.get(2), op));
assertCountersSame(PARTITION_ID, false);
@@ -926,6 +934,101 @@
assertEquals(cntr2.toString(), 2, cntr2.reserved());
}
+ /** */
+ @Test
+ public void testConsistencyAfterBaselineNodeStopAndRemoval() throws Exception {
+ doTestConsistencyAfterBaselineNodeStopAndRemoval(0);
+ }
+
+ /** */
+ @Test
+ public void testConsistencyAfterBaselineNodeStopAndRemoval_WithRestart() throws Exception {
+ doTestConsistencyAfterBaselineNodeStopAndRemoval(1);
+ }
+
+ /** */
+ @Test
+ public void testConsistencyAfterBaselineNodeStopAndRemoval_WithRestartAndSkipCheckpoint() throws Exception {
+ doTestConsistencyAfterBaselineNodeStopAndRemoval(2);
+ }
+
+ /**
+ * Test a scenario when partition is evicted and owned again with non-zero initial and current counters.
+ * When rebalancing is finished no partition desync should happen.
+ */
+ private void doTestConsistencyAfterBaselineNodeStopAndRemoval(int mode) throws Exception {
+ backups = 2;
+
+ final int srvNodes = SERVER_NODES + 1;
+
+ IgniteEx prim = startGrids(srvNodes);
+
+ prim.cluster().active(true);
+
+ for (int p = 0; p < partitions(); p++) {
+ prim.cache(DEFAULT_CACHE_NAME).put(p, p);
+ prim.cache(DEFAULT_CACHE_NAME).put(p + partitions(), p * 2);
+ }
+
+ forceCheckpoint();
+
+ stopGrid(1); // topVer=5,0
+
+ awaitPartitionMapExchange();
+
+ if (persistenceEnabled())
+ resetBaselineTopology(); // topVer=5,1
+
+ awaitPartitionMapExchange();
+
+ forceCheckpoint(); // Will force GridCacheDataStore.exists=true mode after part store re-creation.
+
+ startGrid(1); // topVer=6,0
+
+ awaitPartitionMapExchange();
+
+ if (persistenceEnabled())
+ resetBaselineTopology(); // topVer=6,1
+
+ awaitPartitionMapExchange(true, true, null);
+
+ // Create counter difference with evicted partition so it's applicable for historical rebalancing.
+ for (int p = 0; p < partitions(); p++)
+ prim.cache(DEFAULT_CACHE_NAME).put(p + partitions(), p * 2 + 1);
+
+ stopGrid(1); // topVer=7,0
+
+ if (mode > 0) {
+ stopGrid(mode == 1, grid(2).name());
+ stopGrid(mode == 1, grid(3).name());
+
+ startGrid(2);
+ startGrid(3);
+ }
+
+ prim.context().cache().context().exchange().rebalanceDelay(500);
+
+ Random r = new Random();
+
+ AtomicBoolean stop = new AtomicBoolean();
+
+ final IgniteInternalFuture<?> fut = doRandomUpdates(r,
+ prim,
+ IntStream.range(0, 1000).boxed().collect(toList()),
+ prim.cache(DEFAULT_CACHE_NAME),
+ stop::get);
+
+ if (persistenceEnabled())
+ resetBaselineTopology(); // topVer=7,1
+
+ awaitPartitionMapExchange();
+
+ stop.set(true);
+ fut.get();
+
+ assertPartitionsSame(idleVerify(prim, DEFAULT_CACHE_NAME));
+ }
+
/**
* @param ignite Ignite.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
index 00e3256..c2ec2a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
@@ -26,7 +26,7 @@
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
-import org.apache.ignite.internal.processors.cache.PartitionTxUpdateCounterImpl;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterTrackingImpl;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
@@ -345,7 +345,7 @@
if (expectAliveNodes == 1) {
IgniteEx node = (IgniteEx)G.allGrids().iterator().next();
- PartitionTxUpdateCounterImpl cntr = (PartitionTxUpdateCounterImpl)counter(PARTITION_ID, node.name());
+ PartitionUpdateCounterTrackingImpl cntr = (PartitionUpdateCounterTrackingImpl)counter(PARTITION_ID, node.name());
assertTrue(cntr.sequential());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
index 12d9162..0bfbc4b 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
@@ -47,6 +47,8 @@
import org.apache.ignite.internal.processors.cache.eviction.paged.PageEvictionMultinodeMixedRegionsTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheAssignmentNodeRestartsTest;
import org.apache.ignite.internal.processors.cache.persistence.db.CheckpointBufferDeadlockTest;
+import org.apache.ignite.internal.processors.cache.transactions.AtomicPartitionCounterStateConsistencyHistoryRebalanceTest;
+import org.apache.ignite.internal.processors.cache.transactions.AtomicPartitionCounterStateConsistencyTest;
import org.apache.ignite.internal.processors.cache.transactions.TransactionIntegrityWithPrimaryIndexCorruptionTest;
import org.apache.ignite.internal.processors.cache.transactions.TxCrossCacheMapOnInvalidTopologyTest;
import org.apache.ignite.internal.processors.cache.transactions.TxCrossCacheRemoteMultiplePartitionReservationTest;
@@ -126,6 +128,9 @@
GridTestUtils.addTestIfNeeded(suite, TxRecoveryWithConcurrentRollbackTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, AtomicPartitionCounterStateConsistencyTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.class, ignoredTests);
+
return suite;
}
}