blob: 3039c630483d38eea0755801b2e223156f06c412 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.DetachedClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
/**
*
*/
public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
/** */
private static final String CACHE_NAME = "cache";
/** */
private static final int NODE_COUNT = 4;
/** */
private boolean disableAutoActivation;
/** */
private Map<String, Object> userAttrs;
/** */
private static final String DATA_NODE = "dataNodeUserAttr";
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
cleanPersistenceDir();
disableAutoActivation = false;
System.clearProperty(IGNITE_WAL_LOG_TX_RECORDS);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setConsistentId(igniteInstanceName);
if (disableAutoActivation)
cfg.setAutoActivationEnabled(false);
cfg.setDataStorageConfiguration(
new DataStorageConfiguration().setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
.setMaxSize(100L * 1024 * 1024)
.setInitialSize(100L * 1024 * 1024)
)
.setDataRegionConfigurations(
new DataRegionConfiguration()
.setName("memory")
.setPersistenceEnabled(false)
.setMaxSize(100L * 1024 * 1024)
.setInitialSize(100L * 1024 * 1024)
)
.setWalMode(WALMode.LOG_ONLY)
);
if (userAttrs != null)
cfg.setUserAttributes(userAttrs);
return cfg;
}
/**
* Verifies that rebalance on cache with Node Filter happens when BaselineTopology changes.
*
* @throws Exception If failed.
*/
@Test
public void testRebalanceForCacheWithNodeFilter() throws Exception {
try {
final int EMPTY_NODE_IDX = 2;
userAttrs = U.newHashMap(1);
userAttrs.put(DATA_NODE, true);
startGrids(2);
userAttrs.put(DATA_NODE, false);
IgniteEx ignite = startGrid(2);
ignite.cluster().baselineAutoAdjustEnabled(false);
ignite.cluster().active(true);
awaitPartitionMapExchange();
IgniteCache<Integer, Integer> cache =
ignite.createCache(
new CacheConfiguration<Integer, Integer>()
.setName(CACHE_NAME)
.setCacheMode(PARTITIONED)
.setBackups(1)
.setPartitionLossPolicy(READ_ONLY_SAFE)
.setAffinity(new RendezvousAffinityFunction(32, null))
.setNodeFilter(new DataNodeFilter())
);
for (int k = 0; k < 10_000; k++)
cache.put(k, k);
Thread.sleep(500);
printSizesDataNodes(NODE_COUNT - 1, EMPTY_NODE_IDX);
userAttrs.put(DATA_NODE, true);
startGrid(3);
ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
awaitPartitionMapExchange();
Thread.sleep(500);
printSizesDataNodes(NODE_COUNT, EMPTY_NODE_IDX);
}
finally {
userAttrs = null;
}
}
/** */
private void printSizesDataNodes(int nodesCnt, int emptyNodeIdx) {
for (int i = 0; i < nodesCnt; i++) {
IgniteEx ig = grid(i);
int locSize = ig.cache(CACHE_NAME).localSize(CachePeekMode.PRIMARY);
if (i == emptyNodeIdx)
assertEquals("Cache local size on "
+ i
+ " node is expected to be zero", 0, locSize);
else
assertTrue("Cache local size on "
+ i
+ " node is expected to be non zero", locSize > 0);
}
}
/** */
private static class DataNodeFilter implements IgnitePredicate<ClusterNode> {
@Override public boolean apply(ClusterNode clusterNode) {
return clusterNode.attribute(DATA_NODE);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTopologyChangesWithFixedBaseline() throws Exception {
startGrids(NODE_COUNT);
IgniteEx ignite = grid(0);
ignite.cluster().baselineAutoAdjustEnabled(false);
ignite.cluster().active(true);
awaitPartitionMapExchange();
Map<ClusterNode, Ignite> nodes = new HashMap<>();
for (int i = 0; i < NODE_COUNT; i++) {
Ignite ig = grid(i);
nodes.put(ig.cluster().localNode(), ig);
}
IgniteCache<Integer, Integer> cache =
ignite.createCache(
new CacheConfiguration<Integer, Integer>()
.setName(CACHE_NAME)
.setCacheMode(PARTITIONED)
.setBackups(1)
.setPartitionLossPolicy(READ_ONLY_SAFE)
);
int key = -1;
for (int k = 0; k < 100_000; k++) {
if (!ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(k).contains(ignite.localNode())) {
key = k;
break;
}
}
assert key >= 0;
int part = ignite.affinity(CACHE_NAME).partition(key);
Collection<ClusterNode> initialMapping = ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert initialMapping.size() == 2 : initialMapping;
ignite.cluster().setBaselineTopology(baselineNodes(nodes.keySet()));
awaitPartitionMapExchange();
cache.put(key, 1);
Collection<ClusterNode> mapping = ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert initialMapping.size() == mapping.size() : mapping;
assert initialMapping.containsAll(mapping) : mapping;
IgniteEx newIgnite = startGrid(4);
awaitPartitionMapExchange();
mapping = ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert initialMapping.size() == mapping.size() : mapping;
assert initialMapping.containsAll(mapping) : mapping;
mapping = newIgnite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert initialMapping.size() == mapping.size() : mapping;
assert initialMapping.containsAll(mapping) : mapping;
Set<String> stoppedNodeNames = new HashSet<>();
ClusterNode node = mapping.iterator().next();
stoppedNodeNames.add(nodes.get(node).name());
nodes.get(node).close();
nodes.remove(node);
awaitPartitionMapExchange(true, true, null);
mapping = ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert mapping.size() == 1 : mapping;
assert initialMapping.containsAll(mapping);
node = mapping.iterator().next();
stoppedNodeNames.add(nodes.get(node).name());
nodes.get(node).close();
nodes.remove(node);
awaitPartitionMapExchange();
mapping = ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert mapping.isEmpty() : mapping;
GridDhtPartitionTopology topology = ignite.cachex(CACHE_NAME).context().topology();
assert topology.lostPartitions().contains(part);
for (String nodeName : stoppedNodeNames) {
startGrid(nodeName);
}
assert ignite.cluster().nodes().size() == NODE_COUNT + 1;
mapping = ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert initialMapping.size() == mapping.size() : mapping;
for (ClusterNode n1 : initialMapping) {
boolean found = false;
for (ClusterNode n2 : mapping) {
if (n2.consistentId().equals(n1.consistentId())) {
found = true;
break;
}
}
assert found;
}
ignite.resetLostPartitions(Collections.singleton(CACHE_NAME));
cache.put(key, 2);
}
/**
* @throws Exception If failed.
*/
@Test
public void testBaselineTopologyChangesFromServer() throws Exception {
testBaselineTopologyChanges(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testBaselineTopologyChangesFromClient() throws Exception {
testBaselineTopologyChanges(true);
}
/**
* @throws Exception if failed.
*/
@Test
public void testClusterActiveWhileBaselineChanging() throws Exception {
startGrids(NODE_COUNT);
IgniteEx ig = grid(0);
ig.cluster().baselineAutoAdjustEnabled(false);
ig.cluster().active(true);
assertTrue(ig.cluster().active());
startGrid(NODE_COUNT);
IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
try {
U.sleep(100);
}
catch (IgniteInterruptedCheckedException e) {
e.printStackTrace();
}
ig.cluster().setBaselineTopology(NODE_COUNT + 1);
});
while (!fut.isDone()) {
assertTrue(grid(0).cluster().active());
assertTrue(grid(0).context().state().publicApiActiveState(false));
assertTrue(grid(NODE_COUNT).cluster().active());
assertTrue(grid(NODE_COUNT).context().state().publicApiActiveState(false));
}
assertNull(String.valueOf(fut.error()), fut.error());
assertEquals(NODE_COUNT + 1, ig.cluster().currentBaselineTopology().size());
}
/**
* @throws Exception If failed.
*/
private void testBaselineTopologyChanges(boolean fromClient) throws Exception {
startGrids(NODE_COUNT);
IgniteEx ignite;
if (fromClient)
ignite = startClientGrid(NODE_COUNT + 10);
else
ignite = grid(0);
ignite.cluster().baselineAutoAdjustEnabled(false);
ignite.cluster().active(true);
awaitPartitionMapExchange();
Map<ClusterNode, Ignite> nodes = new HashMap<>();
for (int i = 0; i < NODE_COUNT; i++) {
Ignite ig = grid(i);
nodes.put(ig.cluster().localNode(), ig);
}
ignite.createCache(
new CacheConfiguration<Integer, Integer>()
.setName(CACHE_NAME)
.setCacheMode(PARTITIONED)
.setBackups(1)
.setPartitionLossPolicy(READ_ONLY_SAFE)
);
manualCacheRebalancing(ignite, CACHE_NAME);
int key = -1;
for (int k = 0; k < 100_000; k++) {
if (!ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(k).contains(ignite.localNode())) {
key = k;
break;
}
}
assert key >= 0;
Collection<ClusterNode> initialMapping = ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert initialMapping.size() == 2 : initialMapping;
ignite.cluster().setBaselineTopology(baselineNodes(nodes.keySet()));
Set<String> stoppedNodeNames = new HashSet<>();
ClusterNode node = initialMapping.iterator().next();
stoppedNodeNames.add(nodes.get(node).name());
nodes.get(node).close();
nodes.remove(node);
awaitPartitionMapExchange();
Collection<ClusterNode> mapping = ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert mapping.size() == 1 : mapping;
assert initialMapping.containsAll(mapping);
Set<ClusterNode> blt2 = new HashSet<>(ignite.cluster().nodes());
ignite.cluster().setBaselineTopology(baselineNodes(
blt2.stream().filter(n -> !n.isClient()).collect(Collectors.toSet())));
awaitPartitionMapExchange();
Collection<ClusterNode> initialMapping2 = ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert initialMapping2.size() == 2 : initialMapping2;
Ignite newIgnite = startGrid(NODE_COUNT);
awaitPartitionMapExchange();
mapping = ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert mapping.size() == initialMapping2.size() : mapping;
assert mapping.containsAll(initialMapping2);
assert ignite.affinity(CACHE_NAME).primaryPartitions(newIgnite.cluster().localNode()).length == 0;
Set<ClusterNode> blt3 = new HashSet<>(ignite.cluster().nodes());
ignite.cluster().setBaselineTopology(baselineNodes(
blt3.stream().filter(n -> !n.isClient()).collect(Collectors.toSet())));
awaitPartitionMapExchange();
Collection<ClusterNode> initialMapping3 = ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert initialMapping3.size() == 2;
assert ignite.affinity(CACHE_NAME).primaryPartitions(newIgnite.cluster().localNode()).length > 0;
newIgnite = startGrid(NODE_COUNT + 1);
awaitPartitionMapExchange();
mapping = ignite.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert mapping.size() == initialMapping3.size() : mapping;
assert mapping.containsAll(initialMapping3);
assert ignite.affinity(CACHE_NAME).primaryPartitions(newIgnite.cluster().localNode()).length == 0;
ignite.cluster().setBaselineTopology(null);
awaitPartitionMapExchange();
assert ignite.affinity(CACHE_NAME).primaryPartitions(newIgnite.cluster().localNode()).length > 0;
}
/**
* @throws Exception If failed.
*/
@Test
public void testPrimaryLeft() throws Exception {
startGrids(NODE_COUNT);
IgniteEx ig = grid(0);
ig.cluster().active(true);
awaitPartitionMapExchange();
IgniteCache<Integer, Integer> cache =
ig.createCache(
new CacheConfiguration<Integer, Integer>()
.setName(CACHE_NAME)
.setCacheMode(PARTITIONED)
.setBackups(1)
.setPartitionLossPolicy(READ_ONLY_SAFE)
.setReadFromBackup(true)
.setWriteSynchronizationMode(FULL_SYNC)
.setRebalanceDelay(-1)
);
int key = 1;
List<ClusterNode> affNodes = (List<ClusterNode>) ig.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert affNodes.size() == 2;
int primaryIdx = -1;
IgniteEx primary = null;
IgniteEx backup = null;
manualCacheRebalancing(ig, CACHE_NAME);
for (int i = 0; i < NODE_COUNT; i++) {
if (grid(i).localNode().equals(affNodes.get(0))) {
primaryIdx = i;
primary = grid(i);
}
else if (grid(i).localNode().equals(affNodes.get(1)))
backup = grid(i);
}
assert primary != null;
assert backup != null;
Integer val1 = 1;
Integer val2 = 2;
cache.put(key, val1);
assertEquals(val1, primary.cache(CACHE_NAME).get(key));
assertEquals(val1, backup.cache(CACHE_NAME).get(key));
if (ig == primary) {
ig = backup;
cache = ig.cache(CACHE_NAME);
}
primary.close();
backup.context().cache().context().exchange().affinityReadyFuture(new AffinityTopologyVersion(5, 0)).get();
assertEquals(backup.localNode(), ig.affinity(CACHE_NAME).mapKeyToNode(key));
cache.put(key, val2);
assertEquals(val2, backup.cache(CACHE_NAME).get(key));
primary = startGrid(primaryIdx);
assertEquals(backup.localNode(), ig.affinity(CACHE_NAME).mapKeyToNode(key));
manualCacheRebalancing(ig, CACHE_NAME);
awaitPartitionMapExchange();
assertEquals(primary.localNode(), ig.affinity(CACHE_NAME).mapKeyToNode(key));
assertEquals(val2, primary.cache(CACHE_NAME).get(key));
assertEquals(val2, backup.cache(CACHE_NAME).get(key));
}
/**
* @throws Exception If failed.
*/
@Test
public void testPrimaryLeftAndClusterRestart() throws Exception {
startGrids(NODE_COUNT);
IgniteEx ig = grid(0);
ig.cluster().active(true);
IgniteCache<Integer, Integer> cache =
ig.createCache(
new CacheConfiguration<Integer, Integer>()
.setName(CACHE_NAME)
.setWriteSynchronizationMode(FULL_SYNC)
.setCacheMode(PARTITIONED)
.setBackups(1)
.setPartitionLossPolicy(READ_ONLY_SAFE)
.setReadFromBackup(true)
.setRebalanceDelay(-1)
);
int key = 1;
List<ClusterNode> affNodes = (List<ClusterNode>) ig.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assert affNodes.size() == 2;
int primaryIdx = -1;
int backupIdx = -1;
IgniteEx primary = null;
IgniteEx backup = null;
manualCacheRebalancing(ig, CACHE_NAME);
for (int i = 0; i < NODE_COUNT; i++) {
if (grid(i).localNode().equals(affNodes.get(0))) {
primaryIdx = i;
primary = grid(i);
}
else if (grid(i).localNode().equals(affNodes.get(1))) {
backupIdx = i;
backup = grid(i);
}
}
assert primary != null;
assert backup != null;
Integer val1 = 1;
Integer val2 = 2;
cache.put(key, val1);
assertEquals(val1, primary.cache(CACHE_NAME).get(key));
assertEquals(val1, backup.cache(CACHE_NAME).get(key));
if (ig == primary) {
ig = backup;
cache = ig.cache(CACHE_NAME);
}
stopGrid(primaryIdx, false);
backup.context().cache().context().exchange().affinityReadyFuture(new AffinityTopologyVersion(5, 0)).get();
assertEquals(backup.localNode(), ig.affinity(CACHE_NAME).mapKeyToNode(key));
cache.put(key, val2);
assertEquals(val2, backup.cache(CACHE_NAME).get(key));
stopAllGrids(false);
startGrids(NODE_COUNT);
ig = grid(0);
primary = grid(primaryIdx);
backup = grid(backupIdx);
boolean activated = GridTestUtils.waitForCondition(() -> {
for (int i = 0; i < NODE_COUNT; i++) {
if (!grid(i).cluster().active())
return false;
}
return true;
}, 10_000);
assert activated;
// assertEquals(backup.localNode(), ig.affinity(CACHE_NAME).mapKeyToNode(key));
assertEquals(val2, primary.cache(CACHE_NAME).get(key));
assertEquals(val2, backup.cache(CACHE_NAME).get(key));
manualCacheRebalancing(ig, CACHE_NAME);
awaitPartitionMapExchange();
affNodes = (List<ClusterNode>) ig.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
assertEquals(primary.localNode(), affNodes.get(0));
assertEquals(backup.localNode(), affNodes.get(1));
assertEquals(val2, primary.cache(CACHE_NAME).get(key));
assertEquals(val2, backup.cache(CACHE_NAME).get(key));
}
/**
* @throws Exception if failed.
*/
@Test
public void testMetadataUpdate() throws Exception {
startGrids(5);
Ignite ignite3 = grid(3);
ignite3.cluster().active(true);
CacheConfiguration<Object, Object> repCacheCfg = new CacheConfiguration<>("replicated")
.setCacheMode(CacheMode.REPLICATED)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
IgniteCache<Object, Object> cache = ignite3.getOrCreateCache(repCacheCfg);
stopGrid(0);
stopGrid(1);
stopGrid(2);
stopGrid(4);
for (int i = 0; i < 100; i++)
cache.put(i, new TestValue(i));
stopAllGrids();
startGrids(5);
GridTestUtils.waitForCondition(() -> grid(0).cluster().active(), getTestTimeout());
for (int g = 0; g < 5; g++) {
for (int i = 0; i < 100; i++)
assertEquals(new TestValue(i), grid(g).cache("replicated").get(i));
}
}
/**
* @throws Exception if failed.
*/
@Test
public void testClusterRestoredOnRestart() throws Exception {
startGrids(5);
Ignite ignite3 = grid(3);
ignite3.cluster().active(true);
stopGrid(0);
CacheConfiguration<Object, Object> cacheConfiguration = new CacheConfiguration<>("unknown_cache");
cacheConfiguration.setBackups(3);
IgniteCache<Object, Object> cache0 = ignite3.getOrCreateCache(cacheConfiguration);
for (int i = 0; i < 2048; i++)
cache0.put(i, 0);
awaitPartitionMapExchange();
stopAllGrids();
startGrid(1); //TODO https://issues.apache.org/jira/browse/IGNITE-8717 (replace with startGrids(5); //after)
startGrid(0);
startGrid(2);
startGrid(3);
startGrid(4);
GridTestUtils.waitForCondition(() -> grid(0).cluster().active(), getTestTimeout());
for (int g = 0; g < 5; g++) {
for (int i = 0; i < 2048; i++)
assertEquals("For key: " + i, 0, grid(g).cache("unknown_cache").get(i));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testNonPersistentCachesDontIgnoreBaselineTopology() throws Exception {
Ignite ig = startGrids(4);
ig.cluster().active(true);
IgniteCache persistentCache = ig.createCache(CACHE_NAME);
IgniteCache inMemoryCache = ig.createCache(
new CacheConfiguration<>().setName(CACHE_NAME + 2).setDataRegionName("memory"));
Ignite newNode = startGrid(4);
awaitPartitionMapExchange();
assertEquals(0, ig.affinity(persistentCache.getName()).allPartitions(newNode.cluster().localNode()).length);
assertEquals(0, ig.affinity(inMemoryCache.getName()).allPartitions(newNode.cluster().localNode()).length);
}
/**
* @throws Exception If failed.
*/
@Test
public void testMapTxPrimaryNodes() throws Exception {
checkMapTxNodes(true, false);
}
/**
*
* @throws Exception If failed.
*/
@Test
public void testMapTxBackupNodes() throws Exception {
checkMapTxNodes(false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testMapNearTxPrimaryNodes() throws Exception {
checkMapTxNodes(true, true);
}
/**
*
* @throws Exception If failed.
*/
@Test
public void testMapNearTxBackupNodes() throws Exception {
checkMapTxNodes(false, true);
}
/**
* @param primary Whether non-baseline node is primary.
* @param near Whether non-baseline nod is near node.
* @throws Exception If failed.
*/
public void checkMapTxNodes(boolean primary, boolean near) throws Exception {
System.setProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS, "true");
int bltNodesCnt = 3;
Ignite ig = startGrids(bltNodesCnt);
ig.cluster().active(true);
ig.createCache(new CacheConfiguration<>()
.setName(CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setBackups(2));
ig.createCache(
new CacheConfiguration<>()
.setName(CACHE_NAME + 1)
.setDataRegionName("memory")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setBackups(2)
);
Ignite nonBltIgnite = startGrid(bltNodesCnt);
awaitPartitionMapExchange();
ClusterNode nonBltNode = nonBltIgnite.cluster().localNode();
Ignite nearIgnite = near ? nonBltIgnite : ig;
IgniteCache<Integer, Integer> persistentCache = nearIgnite.cache(CACHE_NAME);
IgniteCache<Integer, Integer> inMemoryCache = nearIgnite.cache(CACHE_NAME + 1);
assertEquals(0, nearIgnite.affinity(persistentCache.getName()).allPartitions(nonBltNode).length);
assertEquals(0, nearIgnite.affinity(inMemoryCache.getName()).allPartitions(nonBltNode).length);
}
/**
* @throws Exception if failed.
*/
@Test
public void testAffinityAssignmentChangedAfterRestart() throws Exception {
int parts = 32;
final List<Integer> partMapping = new ArrayList<>();
for (int p = 0; p < parts; p++)
partMapping.add(p);
final AffinityFunction affFunc = new TestAffinityFunction(new RendezvousAffinityFunction(false, parts));
TestAffinityFunction.partsAffMapping = partMapping;
String cacheName = CACHE_NAME + 2;
startGrids(4);
IgniteEx ig = grid(0);
ig.cluster().active(true);
IgniteCache<Integer, Integer> cache = ig.createCache(
new CacheConfiguration<Integer, Integer>()
.setName(cacheName)
.setCacheMode(PARTITIONED)
.setBackups(1)
.setPartitionLossPolicy(READ_ONLY_SAFE)
.setReadFromBackup(true)
.setWriteSynchronizationMode(FULL_SYNC)
.setRebalanceDelay(-1)
.setAffinity(affFunc));
Map<Integer, String> keyToConsId = new HashMap<>();
for (int k = 0; k < 1000; k++) {
cache.put(k, k);
keyToConsId.put(k, ig.affinity(cacheName).mapKeyToNode(k).consistentId().toString());
}
stopAllGrids();
Collections.shuffle(TestAffinityFunction.partsAffMapping, new Random(1));
/* There is a problem with handling simultaneous auto activation after restart and manual activation.
To properly catch the moment when cluster activation has finished we temporary disable auto activation. */
disableAutoActivation = true;
startGrids(4);
ig = grid(0);
ig.cluster().active(true);
cache = ig.cache(cacheName);
GridDhtPartitionFullMap partMap = ig.cachex(cacheName).context().topology().partitionMap(false);
for (int i = 1; i < 4; i++) {
IgniteEx ig0 = grid(i);
for (int p = 0; p < 32; p++)
assertEqualsCollections(ig.affinity(cacheName).mapPartitionToPrimaryAndBackups(p), ig0.affinity(cacheName).mapPartitionToPrimaryAndBackups(p));
}
for (Map.Entry<Integer, String> e : keyToConsId.entrySet()) {
int p = ig.affinity(cacheName).partition(e.getKey());
assertEquals("p=" + p, GridDhtPartitionState.OWNING, partMap.get(ig.affinity(cacheName).mapKeyToNode(e.getKey()).id()).get(p));
}
for (int k = 0; k < 1000; k++)
assertEquals("k=" + k, Integer.valueOf(k), cache.get(k));
}
/**
* Verify that in case of setting baseline topology with offline node among others
* {@link IgniteException} is thrown.
*
* @throws Exception If failed.
*/
@Test
@SuppressWarnings({"unchecked", "ThrowableNotThrown"})
public void testSettingBaselineTopologyWithOfflineNode() throws Exception {
Ignite ignite = startGrids(2);
ignite.cluster().active(true);
ignite(0).createCache(defaultCacheConfiguration().setNodeFilter(
(IgnitePredicate<ClusterNode>)node -> node.attribute("some-attr") != null));
Collection<ClusterNode> nodes = new ArrayList<>(ignite.cluster().nodes());
nodes.add(new DetachedClusterNode("non-existing-node-id", null));
GridTestUtils.assertThrows(log, (Callable<Void>)() -> {
ignite.cluster().setBaselineTopology(nodes);
return null;
}, IgniteException.class, "Check arguments. Node with consistent ID [non-existing-node-id] " +
"not found in server nodes.");
}
/**
* Verify that in case of setting baseline topology with offline node among others {@link IgniteException} is
* thrown.
*
* @throws Exception If failed.
*/
@Test
@SuppressWarnings({"unchecked", "ThrowableNotThrown"})
public void testSettingBaselineTopologyWithOfflineNodeFromOldTopology() throws Exception {
Ignite ignite = startGrids(2);
ignite.cluster().baselineAutoAdjustEnabled(false);
ignite.cluster().active(true);
stopGrid(1);
ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
GridTestUtils.assertThrows(log, (Callable<Void>)() -> {
ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion() - 1);
return null;
}, IgniteException.class, "Check arguments. Node with consistent ID " +
"[distributed.CacheBaselineTopologyTest1] not found in server nodes.");
}
/** */
private Collection<BaselineNode> baselineNodes(Collection<ClusterNode> clNodes) {
Collection<BaselineNode> res = new ArrayList<>(clNodes.size());
res.addAll(clNodes);
return res;
}
/**
*
*/
private static class TestValue {
/** */
int f1;
/** */
int f2;
/** */
int f3;
/** */
int f4;
/**
* @param a Init value.
*/
private TestValue(int a) {
f1 = f2 = f3 = f4 = a;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof TestValue))
return false;
TestValue other = (TestValue)o;
return
f1 == other.f1 &&
f2 == other.f2 &&
f3 == other.f3 &&
f4 == other.f4;
}
/** {@inheritDoc} */
@Override public int hashCode() {
int result = f1;
result = 31 * result + f2;
result = 31 * result + f3;
result = 31 * result + f4;
return result;
}
}
/**
*
*/
private static class TestAffinityFunction implements AffinityFunction {
/** */
private final AffinityFunction delegate;
/** */
private static List<Integer> partsAffMapping;
/** */
public TestAffinityFunction(AffinityFunction delegate) {
this.delegate = delegate;
}
/** {@inheritDoc} */
@Override public void reset() {
delegate.reset();
}
/** {@inheritDoc} */
@Override public int partitions() {
return delegate.partitions();
}
/** {@inheritDoc} */
@Override public int partition(Object key) {
return delegate.partition(key);
}
/** {@inheritDoc} */
@Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
List<List<ClusterNode>> res0 = delegate.assignPartitions(affCtx);
List<List<ClusterNode>> res = new ArrayList<>(res0.size());
for (int p = 0; p < res0.size(); p++)
res.add(p, null);
for (int p = 0; p < res0.size(); p++)
res.set(partsAffMapping.get(p), res0.get(p));
return res;
}
/** {@inheritDoc} */
@Override public void removeNode(UUID nodeId) {
delegate.removeNode(nodeId);
}
}
}