Merge remote-tracking branch 'remotes/origin/master' into ignite-10044
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 07fbef1..f6c2de8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -499,7 +499,11 @@
null);
}
+ grp.topology().detectLostPartitions(topVer, null);
+
assert grpHolder.affinity().lastVersion().equals(grp.affinity().lastVersion());
+
+ topFut.validate(grp, discoCache.allNodes());
}
}
else if (!fetchFuts.containsKey(grp.groupId())) {
@@ -559,6 +563,8 @@
grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet(), null, null);
+ grp.topology().detectLostPartitions(topVer, null);
+
topFut.validate(grp, discoCache.allNodes());
}
catch (IgniteCheckedException e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 0a0e709..8428cc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -856,7 +856,7 @@
aff.partitions());
GridClientPartitionTopology old = clientTops.putIfAbsent(grpId,
- top = new GridClientPartitionTopology(cctx, discoCache, grpId, aff.partitions(), affKey));
+ top = new GridClientPartitionTopology(cctx, discoCache, grpId, aff.partitions(), ccfg.getPartitionLossPolicy(), affKey));
return old != null ? old : top;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 89e03a2..e429bf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1795,7 +1795,7 @@
Set<String> caches = exchActions.cachesToResetLostPartitions();
if (!F.isEmpty(caches))
- resetLostPartitions(caches);
+ resetLostPartitions(caches, false);
}
if (cctx.kernalContext().clientNode() || (dynamicCacheStartExchange() && exchangeLocE != null)) {
@@ -2056,7 +2056,7 @@
}
if (serverNodeDiscoveryEvent() || localJoinExchange())
- detectLostPartitions(res);
+ detectLostPartitions(res, false);
Map<Integer, CacheGroupValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
@@ -2971,8 +2971,9 @@
* Detect lost partitions.
*
* @param resTopVer Result topology version.
+ * @param crd {@code True} if run on coordinator.
*/
- private void detectLostPartitions(AffinityTopologyVersion resTopVer) {
+ private void detectLostPartitions(AffinityTopologyVersion resTopVer, boolean crd) {
boolean detected = false;
long time = System.currentTimeMillis();
@@ -2991,6 +2992,11 @@
detected |= detectedOnGrp;
}
}
+
+ if (crd) {
+ for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
+ top.detectLostPartitions(resTopVer, null);
+ }
}
if (detected) {
@@ -3007,24 +3013,29 @@
/**
* @param cacheNames Cache names.
+ * @param crd {@code True} if run on coordinator.
*/
- private void resetLostPartitions(Collection<String> cacheNames) {
+ private void resetLostPartitions(Collection<String> cacheNames, boolean crd) {
assert !exchCtx.mergeExchanges();
synchronized (cctx.exchange().interruptLock()) {
if (Thread.currentThread().isInterrupted())
return;
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
+ for (String cacheName : cacheNames) {
+ DynamicCacheDescriptor cacheDesc = cctx.affinity().caches().get(CU.cacheId(cacheName));
+
+ if (cacheDesc == null || cacheDesc.cacheConfiguration().getCacheMode() == CacheMode.LOCAL)
continue;
- for (String cacheName : cacheNames) {
- if (grp.hasCache(cacheName)) {
- grp.topology().resetLostPartitions(initialVersion());
+ CacheGroupContext grp = cctx.cache().cacheGroup(cacheDesc.groupId());
- break;
- }
+ if (grp != null)
+ grp.topology().resetLostPartitions(initialVersion());
+ else if (crd) {
+ GridDhtPartitionTopology top = cctx.exchange().clientTopology(cacheDesc.groupId(), context().events().discoveryCache());
+
+ top.resetLostPartitions(initialVersion());
}
}
}
@@ -3235,6 +3246,7 @@
if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
assert firstDiscoEvt instanceof DiscoveryCustomEvent;
+ assert !events().hasServerJoin() && !events().hasServerLeft();
if (activateCluster() || changedBaseline())
assignPartitionsStates();
@@ -3243,12 +3255,12 @@
if (discoveryCustomMessage instanceof DynamicCacheChangeBatch) {
if (exchActions != null) {
- assignPartitionsStates();
-
Set<String> caches = exchActions.cachesToResetLostPartitions();
if (!F.isEmpty(caches))
- resetLostPartitions(caches);
+ resetLostPartitions(caches, true);
+
+ assignPartitionsStates();
}
}
else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage
@@ -3256,11 +3268,11 @@
assignPartitionsStates();
}
else {
+ if (exchCtx.events().hasServerLeft())
+ detectLostPartitions(resTopVer, true);
+
if (exchCtx.events().hasServerJoin())
assignPartitionsStates();
-
- if (exchCtx.events().hasServerLeft())
- detectLostPartitions(resTopVer);
}
// Recalculate new affinity based on partitions availability.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 9fb6825..710dc42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -26,11 +26,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -59,6 +61,7 @@
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
@@ -95,7 +98,7 @@
private AffinityTopologyVersion lastExchangeVer;
/** */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+ private AffinityTopologyVersion topVer;
/** */
private volatile boolean stopping;
@@ -124,11 +127,18 @@
/** */
private volatile Map<Integer, Long> globalPartSizes;
+ /** */
+ private final PartitionLossPolicy partLossPlc;
+
+ /** */
+ private TreeSet<Integer> lostParts;
+
/**
* @param cctx Context.
* @param discoCache Discovery data cache.
* @param grpId Group ID.
* @param parts Number of partitions in the group.
+ * @param partLossPlc Partition loss policy.
* @param similarAffKey Key to find caches with similar affinity.
*/
public GridClientPartitionTopology(
@@ -136,6 +146,7 @@
DiscoCache discoCache,
int grpId,
int parts,
+ PartitionLossPolicy partLossPlc,
Object similarAffKey
) {
this.cctx = cctx;
@@ -143,6 +154,7 @@
this.grpId = grpId;
this.similarAffKey = similarAffKey;
this.parts = parts;
+ this.partLossPlc = partLossPlc;
topVer = AffinityTopologyVersion.NONE;
@@ -939,6 +951,15 @@
if (cur == null || !cur.equals(parts))
changed = true;
+ if (lostParts != null && partLossPlc != PartitionLossPolicy.IGNORE) {
+ for (Integer lostPart : lostParts) {
+ GridDhtPartitionState state0 = parts.get(lostPart);
+
+ if (state0 != null && state0.active())
+ parts.put(lostPart, LOST);
+ }
+ }
+
node2part.put(parts.nodeId(), parts);
// Add new mappings.
@@ -991,14 +1012,56 @@
/** {@inheritDoc} */
@Override public boolean detectLostPartitions(AffinityTopologyVersion affVer, DiscoveryEvent discoEvt) {
- assert false : "detectLostPartitions should never be called on client topology";
+ if (partLossPlc == PartitionLossPolicy.IGNORE)
+ return false;
- return false;
+ lock.writeLock().lock();
+
+
+ boolean changed = false;
+
+ try {
+ for (int part = 0; part < parts; part++) {
+ boolean lost = F.contains(lostParts, part);
+
+ if (!lost) {
+ boolean hasOwner = false;
+
+ for (GridDhtPartitionMap partMap : node2part.values()) {
+ if (partMap.get(part) == OWNING) {
+ hasOwner = true;
+ break;
+ }
+ }
+
+ if (!hasOwner) {
+ if (lostParts == null)
+ lostParts = new TreeSet<>();
+
+ changed = true;
+
+ lostParts.add(part);
+ }
+ }
+ }
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ return changed;
}
/** {@inheritDoc} */
@Override public void resetLostPartitions(AffinityTopologyVersion affVer) {
- assert false : "resetLostPartitions should never be called on client topology";
+ lock.writeLock().lock();
+
+ try {
+ lostParts = null;
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index c90d40f..e55ab2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -1807,6 +1807,15 @@
if (cur == null || !cur.equals(parts))
changed = true;
+ if (lostParts != null && grp.config().getPartitionLossPolicy() != PartitionLossPolicy.IGNORE) {
+ for (Integer lostPart : lostParts) {
+ GridDhtPartitionState state0 = parts.get(lostPart);
+
+ if (state0 != null && state0.active())
+ parts.put(lostPart, LOST);
+ }
+ }
+
node2part.put(parts.nodeId(), parts);
// During exchange diff is calculated after all messages are received and affinity initialized.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
index 226ae22..100cc74 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -74,6 +75,12 @@
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
+ private static final String CACHE_EXCLUDE_ON_CRD = "cache-exclude-crd";
+
+ /** */
+ private static final String[] CACHE_NAMES = {DEFAULT_CACHE_NAME, CACHE_EXCLUDE_ON_CRD};
+
+ /** */
private boolean client;
/** */
@@ -87,11 +94,17 @@
/** */
private final TopologyChanger killSingleNode = new TopologyChanger(
- false, singletonList(3), asList(0, 1, 2, 4), 0);
+ 4, false, singletonList(3), asList(0, 1, 2, 4), 0);
/** */
private boolean isPersistenceEnabled;
+ /** */
+ private boolean cacheInCfg = true;
+
+ /** */
+ private boolean clientCacheOnCrd;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -109,7 +122,16 @@
cfg.setClientMode(client);
- cfg.setCacheConfiguration(cacheConfiguration());
+ if (cacheInCfg) {
+ CacheConfiguration[] ccfgs;
+
+ if (gridName.equals(getTestIgniteInstanceName(0)))
+ ccfgs = new CacheConfiguration[]{cacheConfiguration(DEFAULT_CACHE_NAME)};
+ else
+ ccfgs = new CacheConfiguration[]{cacheConfiguration(DEFAULT_CACHE_NAME), cacheConfiguration(CACHE_EXCLUDE_ON_CRD)};
+
+ cfg.setCacheConfiguration(ccfgs);
+ }
cfg.setConsistentId(gridName);
@@ -124,10 +146,20 @@
}
/**
+ * @param node Node.
+ * @param cacheName Cache name.
+ * @return {@code True} if do access cache on given node.
+ */
+ private boolean skipCache(Ignite node, String cacheName) {
+ return cacheName.equals(CACHE_EXCLUDE_ON_CRD) && getTestIgniteInstanceName(0).equals(node.name());
+ }
+
+ /**
+ * @param cacheName Cache name.
* @return Cache configuration.
*/
- protected CacheConfiguration<Integer, Integer> cacheConfiguration() {
- CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+ protected CacheConfiguration<Integer, Integer> cacheConfiguration(String cacheName) {
+ CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(cacheName);
cacheCfg.setCacheMode(PARTITIONED);
cacheCfg.setBackups(backups);
@@ -135,6 +167,9 @@
cacheCfg.setPartitionLossPolicy(partLossPlc);
cacheCfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+ if (CACHE_EXCLUDE_ON_CRD.equals(cacheName))
+ cacheCfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
return cacheCfg;
}
@@ -173,11 +208,9 @@
* @throws Exception if failed.
*/
public void testReadOnlySafeWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_ONLY_SAFE;
-
isPersistenceEnabled = true;
- checkLostPartition(false, true, killSingleNode);
+ testReadOnlySafe();
}
/**
@@ -195,11 +228,9 @@
public void testReadOnlyAllWithPersistence() throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-10041");
- partLossPlc = PartitionLossPolicy.READ_ONLY_ALL;
-
isPersistenceEnabled = true;
- checkLostPartition(false, false, killSingleNode);
+ testReadOnlyAll();
}
/**
@@ -215,11 +246,9 @@
* @throws Exception if failed.
*/
public void testReadWriteSafeWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
isPersistenceEnabled = true;
- checkLostPartition(true, true, killSingleNode);
+ testReadWriteSafe();
}
/**
@@ -237,11 +266,9 @@
public void testReadWriteAllWithPersistence() throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-10041");
- partLossPlc = PartitionLossPolicy.READ_WRITE_ALL;
-
isPersistenceEnabled = true;
- checkLostPartition(true, false, killSingleNode);
+ testReadWriteAll();
}
/**
@@ -250,18 +277,16 @@
public void testReadWriteSafeAfterKillTwoNodes() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
- checkLostPartition(true, true, new TopologyChanger(false, asList(3, 2), asList(0, 1, 4), 0));
+ checkLostPartition(true, true, new TopologyChanger(4, false, asList(3, 2), asList(0, 1, 4), 0));
}
/**
* @throws Exception if failed.
*/
public void testReadWriteSafeAfterKillTwoNodesWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
isPersistenceEnabled = true;
- checkLostPartition(true, true, new TopologyChanger(false, asList(3, 2), asList(0, 1, 4), 0));
+ testReadWriteSafeAfterKillTwoNodes();
}
/**
@@ -270,18 +295,16 @@
public void testReadWriteSafeAfterKillTwoNodesWithDelay() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
- checkLostPartition(true, true, new TopologyChanger(false, asList(3, 2), asList(0, 1, 4), 20));
+ checkLostPartition(true, true, new TopologyChanger(4, false, asList(3, 2), asList(0, 1, 4), 20));
}
/**
* @throws Exception if failed.
*/
public void testReadWriteSafeAfterKillTwoNodesWithDelayWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
isPersistenceEnabled = true;
- checkLostPartition(true, true, new TopologyChanger(false, asList(3, 2), asList(0, 1, 4), 20));
+ testReadWriteSafeAfterKillTwoNodesWithDelay();
}
/**
@@ -292,20 +315,16 @@
backups = 1;
- checkLostPartition(true, true, new TopologyChanger(true, asList(3, 2, 1), asList(0, 4), 0));
+ checkLostPartition(true, true, new TopologyChanger(4, true, asList(3, 2, 1), asList(0, 4), 0));
}
/**
* @throws Exception if failed.
*/
public void testReadWriteSafeWithBackupsAfterKillThreeNodesWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
- backups = 1;
-
isPersistenceEnabled = true;
- checkLostPartition(true, true, new TopologyChanger(true, asList(3, 2, 1), asList(0, 4), 0));
+ testReadWriteSafeWithBackupsAfterKillThreeNodes();
}
/**
@@ -314,18 +333,16 @@
public void testReadWriteSafeAfterKillCrd() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
- checkLostPartition(true, true, new TopologyChanger(true, asList(3, 0), asList(1, 2, 4), 0));
+ checkLostPartition(true, true, new TopologyChanger(4, true, asList(3, 0), asList(1, 2, 4), 0));
}
/**
* @throws Exception if failed.
*/
public void testReadWriteSafeAfterKillCrdWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
isPersistenceEnabled = true;
- checkLostPartition(true, true, new TopologyChanger(true, asList(3, 0), asList(1, 2, 4), 0));
+ testReadWriteSafeAfterKillCrd();
}
/**
@@ -336,20 +353,16 @@
backups = 1;
- checkLostPartition(true, true, new TopologyChanger(true, asList(3, 2), asList(0, 1, 4), 0));
+ checkLostPartition(true, true, new TopologyChanger(4, true, asList(3, 2), asList(0, 1, 4), 0));
}
/**
* @throws Exception if failed.
*/
public void testReadWriteSafeWithBackupsWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
- backups = 1;
-
isPersistenceEnabled = true;
- checkLostPartition(true, true, new TopologyChanger(true, asList(3, 2), asList(0, 1, 4), 0));
+ testReadWriteSafeWithBackups();
}
/**
@@ -360,20 +373,16 @@
backups = 1;
- checkLostPartition(true, true, new TopologyChanger(true, asList(3, 0), asList(1, 2, 4), 0));
+ checkLostPartition(true, true, new TopologyChanger(5, true, asList(3, 2, 0), asList(1, 4, 5), 0));
}
/**
* @throws Exception if failed.
*/
public void testReadWriteSafeWithBackupsAfterKillCrdWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
- backups = 1;
-
isPersistenceEnabled = true;
- checkLostPartition(true, true, new TopologyChanger(true, asList(3, 0), asList(1, 2, 4), 0));
+ testReadWriteSafeWithBackupsAfterKillCrd();
}
/**
@@ -395,11 +404,9 @@
fail("https://issues.apache.org/jira/browse/IGNITE-10041");
- partLossPlc = PartitionLossPolicy.IGNORE;
-
isPersistenceEnabled = true;
- checkIgnore(killSingleNode);
+ testIgnore();
}
/**
@@ -410,8 +417,8 @@
// TODO aliveNodes should include node 4, but it fails due to https://issues.apache.org/jira/browse/IGNITE-5078.
// TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed.
- // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0);
- TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, asList(1, 2, 3), singletonList(0), 0);
+ // TopologyChanger onlyCrdIsAlive = new TopologyChanger(4, false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0);
+ TopologyChanger onlyCrdIsAlive = new TopologyChanger(4, false, asList(1, 2, 3), singletonList(0), 0);
checkIgnore(onlyCrdIsAlive);
}
@@ -422,16 +429,27 @@
public void testIgnoreKillThreeNodesWithPersistence() throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-10041");
- partLossPlc = PartitionLossPolicy.IGNORE;
-
isPersistenceEnabled = true;
- // TODO aliveNodes should include node 4, but it fails due to https://issues.apache.org/jira/browse/IGNITE-5078.
- // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed.
- // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0);
- TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, asList(1, 2, 3), singletonList(0), 0);
+ testIgnoreKillThreeNodes();
+ }
- checkIgnore(onlyCrdIsAlive);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartClientCacheOnCoordinator() throws Exception {
+ clientCacheOnCrd = true;
+
+ testReadWriteSafe();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartClientCacheOnCoordinatorWithPersistence() throws Exception {
+ isPersistenceEnabled = true;
+
+ testStartClientCacheOnCoordinator();
}
/**
@@ -441,19 +459,27 @@
private void checkIgnore(TopologyChanger topChanger) throws Exception {
topChanger.changeTopology();
- for (Ignite ig : G.allGrids()) {
- IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
+ for (String cacheName : CACHE_NAMES) {
+ for (Ignite ig : G.allGrids()) {
+ if (skipCache(ig, cacheName))
+ continue;
- Collection<Integer> lost = cache.lostPartitions();
+ IgniteCache<Integer, Integer> cache = ig.cache(cacheName);
- assertTrue("[grid=" + ig.name() + ", lost=" + lost.toString() + ']', lost.isEmpty());
+ Collection<Integer> lost = cache.lostPartitions();
- int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions();
+ assertTrue("[grid=" + ig.name() + ", lost=" + lost.toString() + ']', lost.isEmpty());
- for (int i = 0; i < parts; i++) {
- cache.get(i);
+ int parts = ig.affinity(cacheName).partitions();
- cache.put(i, i);
+ for (int i = 0; i < parts; i++) {
+ if (cacheName.equals(CACHE_EXCLUDE_ON_CRD) && ig.affinity(cacheName).mapPartitionToNode(i) == null)
+ continue;
+
+ cache.get(i);
+
+ cache.put(i, i);
+ }
}
}
}
@@ -467,7 +493,7 @@
private void checkLostPartition(boolean canWrite, boolean safe, TopologyChanger topChanger) throws Exception {
assert partLossPlc != null;
- List<Integer> lostParts = topChanger.changeTopology();
+ Map<String, List<Integer>> lostPartsMap = topChanger.changeTopology();
// Wait for all grids (servers and client) have same topology version
// to make sure that all nodes received map with lost partition.
@@ -488,36 +514,51 @@
assertTrue("Failed to wait for new topology", success);
for (Ignite ig : G.allGrids()) {
- info("Checking node: " + ig.cluster().localNode().id());
+ for (String cacheName : CACHE_NAMES) {
+ if (!clientCacheOnCrd && skipCache(ig, cacheName))
+ continue;
- IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
+ info("Checking node [cache=" + cacheName + ", node=" + ig.cluster().localNode().id() + ']');
- verifyLostPartitions(ig, lostParts);
+ IgniteCache<Integer, Integer> cache = ig.cache(cacheName);
- verifyCacheOps(canWrite, safe, ig);
+ verifyLostPartitions(ig, cacheName, lostPartsMap.get(cacheName));
- validateQuery(safe, ig);
+ verifyCacheOps(cacheName, canWrite, safe, ig);
- // TODO withPartitionRecover doesn't work with BLT - https://issues.apache.org/jira/browse/IGNITE-10041.
- if (!isPersistenceEnabled) {
- // Check we can read and write to lost partition in recovery mode.
- IgniteCache<Integer, Integer> recoverCache = cache.withPartitionRecover();
+ validateQuery(safe, ig, cacheName);
- for (int lostPart : recoverCache.lostPartitions()) {
- recoverCache.get(lostPart);
- recoverCache.put(lostPart, lostPart);
+ if (cacheName.equals(CACHE_EXCLUDE_ON_CRD) && lostPartsMap.get(cacheName).size() == ig.affinity(cacheName).partitions())
+ continue;
+
+ // TODO withPartitionRecover doesn't work with BLT - https://issues.apache.org/jira/browse/IGNITE-10041.
+ if (!isPersistenceEnabled) {
+ // Check we can read and write to lost partition in recovery mode.
+ IgniteCache<Integer, Integer> recoverCache = cache.withPartitionRecover();
+
+ for (int lostPart : recoverCache.lostPartitions()) {
+ recoverCache.get(lostPart);
+ recoverCache.put(lostPart, lostPart);
+ }
+
+ // Check that writing in recover mode does not clear partition state.
+ verifyLostPartitions(ig, cacheName, lostPartsMap.get(cacheName));
+
+ verifyCacheOps(cacheName, canWrite, safe, ig);
+
+ validateQuery(safe, ig, cacheName);
}
-
- // Check that writing in recover mode does not clear partition state.
- verifyLostPartitions(ig, lostParts);
-
- verifyCacheOps(canWrite, safe, ig);
-
- validateQuery(safe, ig);
}
}
checkNewNode(true, canWrite, safe);
+
+ cacheInCfg = false; // Check case when client cache is started dynamically.
+
+ checkNewNode(true, canWrite, safe);
+
+ cacheInCfg = true;
+
checkNewNode(false, canWrite, safe);
// Bring all nodes back.
@@ -526,11 +567,18 @@
info("Newly started node: " + grd.cluster().localNode().id());
- // Check that partition state does not change after we start each node.
- // TODO With persistence enabled LOST partitions become OWNING after a node joins back - https://issues.apache.org/jira/browse/IGNITE-10044.
- if (!isPersistenceEnabled) {
+ for (String cacheName : CACHE_NAMES) {
+ if (clientCacheOnCrd || !skipCache(grd, cacheName))
+ verifyLostPartitions(grd, cacheName, lostPartsMap.get(cacheName));
+
+ // Check that partition state does not change after we start each node.
for (Ignite ig : G.allGrids()) {
- verifyCacheOps(canWrite, safe, ig);
+ if (!clientCacheOnCrd && skipCache(ig, cacheName))
+ continue;
+
+ verifyLostPartitions(ig, cacheName, lostPartsMap.get(cacheName));
+
+ verifyCacheOps(cacheName, canWrite, safe, ig);
// TODO Query effectively waits for rebalance due to https://issues.apache.org/jira/browse/IGNITE-10057
// TODO and after resetLostPartition there is another OWNING copy in the cluster due to https://issues.apache.org/jira/browse/IGNITE-10058.
@@ -540,32 +588,63 @@
}
}
- ignite(4).resetLostPartitions(singletonList(DEFAULT_CACHE_NAME));
+ // Make sure cache did not really start on coordinator,
+ if (!clientCacheOnCrd && topChanger.aliveNodes.contains(0))
+ assertNull(((IgniteEx)ignite(0)).context().cache().cacheGroup(CU.cacheId(CACHE_EXCLUDE_ON_CRD)));
+
+ ignite(topChanger.srvNodes).resetLostPartitions(Arrays.asList(CACHE_NAMES));
awaitPartitionMapExchange(true, true, null);
for (Ignite ig : G.allGrids()) {
- IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
+ for (String cacheName : CACHE_NAMES) {
+ if (!clientCacheOnCrd && skipCache(ig, cacheName))
+ continue;
- assertTrue(cache.lostPartitions().isEmpty());
+ IgniteCache<Integer, Integer> cache = ig.cache(cacheName);
- int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions();
+ assertTrue(cache.lostPartitions().isEmpty());
- for (int i = 0; i < parts; i++) {
- cache.get(i);
+ int parts = ig.affinity(cacheName).partitions();
- cache.put(i, i);
+ for (int i = 0; i < parts; i++) {
+ cache.get(i);
+
+ cache.put(i, i);
+ }
+
+ for (int i = 0; i < parts; i++) {
+ checkQueryPasses(ig, false, cacheName, i);
+
+ if (shouldExecuteLocalQuery(ig, cacheName, i))
+ checkQueryPasses(ig, true, cacheName, i);
+
+ }
+
+ checkQueryPasses(ig, false, cacheName);
}
+ }
- for (int i = 0; i < parts; i++) {
- checkQueryPasses(ig, false, i);
+ // Make sure cache did not really start on coordinator,
+ if (!clientCacheOnCrd && topChanger.aliveNodes.contains(0))
+ assertNull(((IgniteEx)ignite(0)).context().cache().cacheGroup(CU.cacheId(CACHE_EXCLUDE_ON_CRD)));
- if (shouldExecuteLocalQuery(ig, i))
- checkQueryPasses(ig, true, i);
+ // Start new nodes after lost partitions reset.
+ startGrid(topChanger.srvNodes + 1);
+ client = true;
+
+ startGrid(topChanger.srvNodes + 2);
+
+ for (Ignite ig : G.allGrids()) {
+ for (String cacheName : CACHE_NAMES) {
+ if (!clientCacheOnCrd && skipCache(ig, cacheName))
+ continue;
+
+ IgniteCache<Integer, Integer> cache = ig.cache(cacheName);
+
+ assertTrue(cache.lostPartitions().isEmpty());
}
-
- checkQueryPasses(ig, false);
}
}
@@ -583,15 +662,19 @@
this.client = client;
try {
- IgniteEx cl = (IgniteEx)startGrid("newNode");
+ IgniteEx cl = startGrid("newNode");
- CacheGroupContext grpCtx = cl.context().cache().cacheGroup(CU.cacheId(DEFAULT_CACHE_NAME));
+ for (String cacheName : CACHE_NAMES) {
+ cl.cache(cacheName); // Make sure cache is started on node.`
- assertTrue(grpCtx.needsRecovery());
+ CacheGroupContext grpCtx = cl.context().cache().cacheGroup(CU.cacheId(cacheName));
- verifyCacheOps(canWrite, safe, cl);
+ assertTrue(grpCtx.needsRecovery());
- validateQuery(safe, cl);
+ verifyCacheOps(cacheName, canWrite, safe, cl);
+
+ validateQuery(safe, cl, cacheName);
+ }
}
finally {
stopGrid("newNode", false);
@@ -602,10 +685,11 @@
/**
* @param node Node.
+ * @param cacheName Cache name.
* @param lostParts Lost partition IDs.
*/
- private void verifyLostPartitions(Ignite node, List<Integer> lostParts) {
- IgniteCache<Integer, Integer> cache = node.cache(DEFAULT_CACHE_NAME);
+ private void verifyLostPartitions(Ignite node, String cacheName, List<Integer> lostParts) {
+ IgniteCache<Integer, Integer> cache = node.cache(cacheName);
Set<Integer> actualSortedLostParts = new TreeSet<>(cache.lostPartitions());
Set<Integer> expSortedLostParts = new TreeSet<>(lostParts);
@@ -614,14 +698,15 @@
}
/**
+ * @param cacheName Cache name.
* @param canWrite {@code True} if writes are allowed.
* @param safe {@code True} if lost partition should trigger exception.
* @param ig Ignite instance.
*/
- private void verifyCacheOps(boolean canWrite, boolean safe, Ignite ig) {
- IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
+ private void verifyCacheOps(String cacheName, boolean canWrite, boolean safe, Ignite ig) {
+ IgniteCache<Integer, Integer> cache = ig.cache(cacheName);
- int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions();
+ int parts = ig.affinity(cacheName).partitions();
// Check read.
for (int i = 0; i < parts; i++) {
@@ -665,11 +750,12 @@
}
/**
+ * @param cacheName Cache name.
* @param nodes List of nodes to find partition.
* @return List of partitions that aren't primary or backup for specified nodes.
*/
- private List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) {
- Affinity<Object> aff = ignite(4).affinity(DEFAULT_CACHE_NAME);
+ private List<Integer> noPrimaryOrBackupPartition(String cacheName, List<Integer> nodes) {
+ Affinity<Object> aff = ignite(4).affinity(cacheName);
List<Integer> parts = new ArrayList<>();
@@ -698,10 +784,11 @@
*
* @param safe Safe flag.
* @param node Node.
+ * @param cacheName Cache name.
*/
- private void validateQuery(boolean safe, Ignite node) {
+ private void validateQuery(boolean safe, Ignite node, String cacheName) {
// Get node lost and remaining partitions.
- IgniteCache<?, ?> cache = node.cache(DEFAULT_CACHE_NAME);
+ IgniteCache<?, ?> cache = node.cache(cacheName);
Collection<Integer> lostParts = cache.lostPartitions();
@@ -709,7 +796,7 @@
Integer remainingPart = null;
- for (int i = 0; i < node.affinity(DEFAULT_CACHE_NAME).partitions(); i++) {
+ for (int i = 0; i < node.affinity(cacheName).partitions(); i++) {
if (lostParts.contains(i))
continue;
@@ -718,22 +805,25 @@
break;
}
- assertNotNull("Failed to find a partition that isn't lost", remainingPart);
+ if (remainingPart == null && cacheName.equals(CACHE_EXCLUDE_ON_CRD))
+ return;
+
+ assertNotNull("Failed to find a partition that isn't lost for cache: " + cacheName, remainingPart);
// 1. Check query against all partitions.
- validateQuery0(safe, node);
+ validateQuery0(safe, node, cacheName);
// 2. Check query against LOST partition.
- validateQuery0(safe, node, part);
+ validateQuery0(safe, node, cacheName, part);
// 3. Check query on remaining partition.
- checkQueryPasses(node, false, remainingPart);
+ checkQueryPasses(node, false, cacheName, remainingPart);
- if (shouldExecuteLocalQuery(node, remainingPart))
- checkQueryPasses(node, true, remainingPart);
+ if (shouldExecuteLocalQuery(node, cacheName, remainingPart))
+ checkQueryPasses(node, true, cacheName, remainingPart);
// 4. Check query over two partitions - normal and LOST.
- validateQuery0(safe, node, part, remainingPart);
+ validateQuery0(safe, node, cacheName, part, remainingPart);
}
/**
@@ -741,32 +831,36 @@
*
* @param safe Safe flag.
* @param node Node.
+ * @param cacheName Cache name.
* @param parts Partitions.
*/
- private void validateQuery0(boolean safe, Ignite node, int... parts) {
+ private void validateQuery0(boolean safe, Ignite node, String cacheName, int... parts) {
if (safe)
- checkQueryFails(node, false, parts);
+ checkQueryFails(node, false, cacheName, parts);
else
- checkQueryPasses(node, false, parts);
+ checkQueryPasses(node, false, cacheName, parts);
- if (shouldExecuteLocalQuery(node, parts)) {
+ if (shouldExecuteLocalQuery(node, cacheName, parts)) {
if (safe)
- checkQueryFails(node, true, parts);
+ checkQueryFails(node, true, cacheName, parts);
else
- checkQueryPasses(node, true, parts);
+ checkQueryPasses(node, true, cacheName, parts);
}
}
/**
+ * @param node Node.
+ * @param cacheName Cache name.
+ * @param parts Partitions.
* @return true if the given node is primary for all given partitions.
*/
- private boolean shouldExecuteLocalQuery(Ignite node, int... parts) {
+ private boolean shouldExecuteLocalQuery(Ignite node, String cacheName, int... parts) {
if (parts == null || parts.length == 0)
return false;
int numOfPrimaryParts = 0;
- for (int nodePrimaryPart : node.affinity(DEFAULT_CACHE_NAME).primaryPartitions(node.cluster().localNode())) {
+ for (int nodePrimaryPart : node.affinity(cacheName).primaryPartitions(node.cluster().localNode())) {
for (int part : parts) {
if (part == nodePrimaryPart)
numOfPrimaryParts++;
@@ -779,9 +873,10 @@
/**
* @param node Node.
* @param loc Local flag.
+ * @param cacheName Cache name.
* @param parts Partitions.
*/
- protected void checkQueryPasses(Ignite node, boolean loc, int... parts) {
+ protected void checkQueryPasses(Ignite node, boolean loc, String cacheName, int... parts) {
// Scan queries don't support multiple partitions.
if (parts != null && parts.length > 1)
return;
@@ -790,7 +885,7 @@
if (loc)
return;
- IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
+ IgniteCache cache = node.cache(cacheName);
ScanQuery qry = new ScanQuery();
@@ -806,9 +901,10 @@
/**
* @param node Node.
* @param loc Local flag.
+ * @param cacheName Cache name.
* @param parts Partitions.
*/
- protected void checkQueryFails(Ignite node, boolean loc, int... parts) {
+ protected void checkQueryFails(Ignite node, boolean loc, String cacheName, int... parts) {
// TODO Scan queries never fail due to partition loss - https://issues.apache.org/jira/browse/IGNITE-9902.
// TODO Need to add an actual check after https://issues.apache.org/jira/browse/IGNITE-9902 is fixed.
// No-op.
@@ -816,6 +912,9 @@
/** */
private class TopologyChanger {
+ /** */
+ private int srvNodes;
+
/** Flag to delay partition exchange */
private boolean delayExchange;
@@ -829,13 +928,19 @@
private long stopDelay;
/**
+ * @param srvNodes Number of server nodes to start.
* @param delayExchange Flag for delay partition exchange.
* @param killNodes List of nodes to kill.
* @param aliveNodes List of nodes to be alive.
* @param stopDelay Delay between stopping nodes.
*/
- private TopologyChanger(boolean delayExchange, List<Integer> killNodes, List<Integer> aliveNodes,
+ private TopologyChanger(
+ int srvNodes,
+ boolean delayExchange,
+ List<Integer> killNodes,
+ List<Integer> aliveNodes,
long stopDelay) {
+ this.srvNodes = srvNodes;
this.delayExchange = delayExchange;
this.killNodes = killNodes;
this.aliveNodes = aliveNodes;
@@ -846,57 +951,74 @@
* @return Lost partition ID.
* @throws Exception If failed.
*/
- private List<Integer> changeTopology() throws Exception {
- startGrids(4);
+ private Map<String, List<Integer>> changeTopology() throws Exception {
+ startGrids(srvNodes);
if (isPersistenceEnabled)
grid(0).cluster().active(true);
- Affinity<Object> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
+ for (String cacheName : CACHE_NAMES) {
+ Affinity<Object> aff = ignite(1).affinity(cacheName);
- for (int i = 0; i < aff.partitions(); i++)
- ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i);
+ for (int i = 0; i < aff.partitions(); i++)
+ ignite(1).cache(cacheName).put(i, i);
+ }
client = true;
- startGrid(4);
+ startGrid(srvNodes);
client = false;
- for (int i = 0; i < 5; i++)
+ for (int i = 0; i < srvNodes + 1; i++)
info(">>> Node [idx=" + i + ", nodeId=" + ignite(i).cluster().localNode().id() + ']');
awaitPartitionMapExchange();
- final List<Integer> parts = noPrimaryOrBackupPartition(aliveNodes);
+ Map<String, List<Integer>> lostParts = new HashMap<>();
- if (parts.isEmpty())
- throw new IllegalStateException("No partition on nodes: " + killNodes);
+ Map<String, List<Map<Integer, Semaphore>>> lostMaps = new HashMap<>();
- final List<Map<Integer, Semaphore>> lostMap = new ArrayList<>();
+ for (String cacheName : CACHE_NAMES) {
+ final List<Integer> parts = noPrimaryOrBackupPartition(cacheName, aliveNodes);
- for (int i : aliveNodes) {
- HashMap<Integer, Semaphore> semaphoreMap = new HashMap<>();
+ if (parts.isEmpty())
+ throw new IllegalStateException("No lost partitions [cache=" + cacheName + ", killNodes=" + killNodes + ']');
- for (Integer part : parts)
- semaphoreMap.put(part, new Semaphore(0));
+ lostParts.put(cacheName, parts);
- lostMap.add(semaphoreMap);
+ final List<Map<Integer, Semaphore>> lostMap = new ArrayList<>();
- grid(i).events().localListen(new P1<Event>() {
- @Override public boolean apply(Event evt) {
- assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+ lostMaps.put(cacheName, lostMap);
- CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt;
+ for (int i : aliveNodes) {
+ Ignite node = grid(i);
- if (F.eq(DEFAULT_CACHE_NAME, cacheEvt.cacheName())) {
- if (semaphoreMap.containsKey(cacheEvt.partition()))
- semaphoreMap.get(cacheEvt.partition()).release();
+ if (skipCache(node, cacheName))
+ continue;
+
+ HashMap<Integer, Semaphore> semaphoreMap = new HashMap<>();
+
+ for (Integer part : parts)
+ semaphoreMap.put(part, new Semaphore(0));
+
+ lostMap.add(semaphoreMap);
+
+ node.events().localListen(new P1<Event>() {
+ @Override public boolean apply(Event evt) {
+ assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+
+ CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt;
+
+ if (F.eq(cacheName, cacheEvt.cacheName())) {
+ if (semaphoreMap.containsKey(cacheEvt.partition()))
+ semaphoreMap.get(cacheEvt.partition()).release();
+ }
+
+ return true;
}
-
- return true;
- }
- }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
+ }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
+ }
}
if (delayExchange)
@@ -920,17 +1042,23 @@
Thread.sleep(5_000L);
- for (Map<Integer, Semaphore> map : lostMap) {
- for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
- assertTrue("Failed to wait for partition LOST event for partition: " + entry.getKey(), entry.getValue().tryAcquire(1));
+ for (String cacheName : CACHE_NAMES) {
+ List<Map<Integer, Semaphore>> lostMap = lostMaps.get(cacheName);
+
+ for (Map<Integer, Semaphore> map : lostMap) {
+ for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
+ assertTrue("Failed to wait for partition LOST event [cache=" + cacheName + ", part=" + entry.getKey() + ']',
+ entry.getValue().tryAcquire(1));
+ }
+
+ for (Map<Integer, Semaphore> map : lostMap) {
+ for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
+ assertFalse("Partition LOST event raised twice for partition [cache=" + cacheName + ", part=" + entry.getKey() + ']',
+ entry.getValue().tryAcquire(1));
+ }
}
- for (Map<Integer, Semaphore> map : lostMap) {
- for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
- assertFalse("Partition LOST event raised twice for partition: " + entry.getKey(), entry.getValue().tryAcquire(1));
- }
-
- return parts;
+ return lostParts;
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 0910292..bc2a025 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -647,6 +647,9 @@
long start = 0;
for (int i = 0; ; i++) {
+ if (c.lostPartitions().contains(p))
+ break;
+
boolean match = false;
GridCachePartitionExchangeManager<?, ?> exchMgr = dht.context().shared().exchange();
@@ -701,6 +704,8 @@
", affNodes=" + F.nodeIds(affNodes) +
", owners=" + F.nodeIds(owners) +
", topFut=" + topFut +
+ ", lostParts=" + c.lostPartitions() +
+ ", locState=" + (loc != null ? loc.state() : null) +
", locNode=" + g.cluster().localNode() + ']');
}
else
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java
index 7007499..2c47392 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java
@@ -28,8 +28,8 @@
*/
public class IndexingCachePartitionLossPolicySelfTest extends IgniteCachePartitionLossPolicySelfTest {
/** {@inheritDoc} */
- @Override protected CacheConfiguration<Integer, Integer> cacheConfiguration() {
- CacheConfiguration<Integer, Integer> ccfg = super.cacheConfiguration();
+ @Override protected CacheConfiguration<Integer, Integer> cacheConfiguration(String cacheName) {
+ CacheConfiguration<Integer, Integer> ccfg = super.cacheConfiguration(cacheName);
ccfg.setIndexedTypes(Integer.class, Integer.class);
@@ -37,18 +37,18 @@
}
/** {@inheritDoc} */
- protected void checkQueryPasses(Ignite node, boolean loc, int... parts) {
- executeQuery(node, loc, parts);
+ @Override protected void checkQueryPasses(Ignite node, boolean loc, String cacheName, int... parts) {
+ executeQuery(node, loc, cacheName, parts);
}
/** {@inheritDoc} */
- protected void checkQueryFails(Ignite node, boolean loc, int... parts) {
+ @Override protected void checkQueryFails(Ignite node, boolean loc, String cacheName, int... parts) {
// TODO: Local queries ignore partition loss, see https://issues.apache.org/jira/browse/IGNITE-7039.
if (loc)
return;
try {
- executeQuery(node, loc, parts);
+ executeQuery(node, loc, cacheName, parts);
fail("Exception is not thrown.");
}
@@ -64,12 +64,13 @@
/**
* Execute SQL query on a given node.
*
- * @param parts Partitions.
* @param node Node.
* @param loc Local flag.
+ * @param cacheName Cache name.
+ * @param parts Partitions.
*/
- private static void executeQuery(Ignite node, boolean loc, int... parts) {
- IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
+ private static void executeQuery(Ignite node, boolean loc, String cacheName, int... parts) {
+ IgniteCache cache = node.cache(cacheName);
SqlFieldsQuery qry = new SqlFieldsQuery("SELECT * FROM Integer");