blob: 7e4d832cb865348f93e492c8f56ba3c44cf15127 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.PartitionLossPolicy.IGNORE;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
/**
*
*/
@RunWith(Parameterized.class)
public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTest {
/** */
private static final int PARTS_CNT = 32;
/** */
private boolean client;
/** */
@Parameterized.Parameter(value = 0)
public CacheAtomicityMode atomicityMode;
/** */
@Parameterized.Parameter(value = 1)
public PartitionLossPolicy partLossPlc;
/** */
@Parameterized.Parameter(value = 2)
public int backups;
/** */
@Parameterized.Parameter(value = 3)
public boolean autoAdjust;
/** */
@Parameterized.Parameter(value = 4)
public int nodes;
/** */
@Parameterized.Parameter(value = 5)
public int[] stopNodes;
/** */
@Parameterized.Parameter(value = 6)
public boolean persistence;
/** */
private static final String[] CACHES = new String[]{"cache1", "cache2"};
/** */
@Parameterized.Parameters(name = "{0} {1} {2} {3} {4} {6}")
public static List<Object[]> parameters() {
ArrayList<Object[]> params = new ArrayList<>();
Random r = new Random();
System.out.println("Seed: " + U.field(r, "seed"));
for (CacheAtomicityMode mode : Arrays.asList(TRANSACTIONAL, ATOMIC)) {
// Test always scenarios.
params.add(new Object[]{mode, IGNORE, 0, false, 3, new int[]{2}, false});
params.add(new Object[]{mode, IGNORE, 0, false, 3, new int[]{2}, true});
params.add(new Object[]{mode, READ_ONLY_SAFE, 1, true, 4, new int[]{2, 0}, false});
params.add(new Object[]{mode, IGNORE, 1, false, 4, new int[]{0, 2}, false});
params.add(new Object[]{mode, READ_WRITE_SAFE, 2, true, 5, new int[]{1, 0, 2}, false});
// Random scenarios.
for (Integer backups : Arrays.asList(0, 1, 2)) {
int nodes = backups + 3;
int[] stopIdxs = new int[backups + 1];
List<Integer> tmp = IntStream.range(0, nodes).boxed().collect(Collectors.toList());
Collections.shuffle(tmp, r);
for (int i = 0; i < stopIdxs.length; i++)
stopIdxs[i] = tmp.get(i);
params.add(new Object[]{mode, READ_WRITE_SAFE, backups, false, nodes, stopIdxs, false});
params.add(new Object[]{mode, IGNORE, backups, false, nodes, stopIdxs, false});
params.add(new Object[]{mode, READ_ONLY_SAFE, backups, false, nodes, stopIdxs, false});
params.add(new Object[]{mode, READ_ONLY_ALL, backups, false, nodes, stopIdxs, false});
params.add(new Object[]{mode, READ_WRITE_SAFE, backups, true, nodes, stopIdxs, false});
params.add(new Object[]{mode, IGNORE, backups, true, nodes, stopIdxs, false});
params.add(new Object[]{mode, READ_ONLY_SAFE, backups, true, nodes, stopIdxs, false});
params.add(new Object[]{mode, READ_ONLY_ALL, backups, true, nodes, stopIdxs, false});
boolean ignored = false; // Autoadjust is currently ignored for persistent mode.
params.add(new Object[]{mode, READ_WRITE_SAFE, backups, ignored, nodes, stopIdxs, true});
params.add(new Object[]{mode, IGNORE, backups, ignored, nodes, stopIdxs, true});
params.add(new Object[]{mode, READ_ONLY_SAFE, backups, ignored, nodes, stopIdxs, true});
params.add(new Object[]{mode, READ_ONLY_ALL, backups, ignored, nodes, stopIdxs, true});
params.add(new Object[]{mode, READ_WRITE_SAFE, backups, ignored, nodes, stopIdxs, true});
params.add(new Object[]{mode, IGNORE, backups, ignored, nodes, stopIdxs, true});
params.add(new Object[]{mode, READ_ONLY_SAFE, backups, ignored, nodes, stopIdxs, true});
params.add(new Object[]{mode, READ_ONLY_ALL, backups, ignored, nodes, stopIdxs, true});
}
}
return params;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
cfg.setConsistentId(gridName);
cfg.setClientMode(client);
cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
.setWalMode(WALMode.LOG_ONLY)
.setWalSegmentSize(4 * 1024 * 1024)
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(persistence)
.setMaxSize(100L * 1024 * 1024))
);
CacheConfiguration[] ccfgs = new CacheConfiguration[CACHES.length];
for (int i = 0; i < ccfgs.length; i++) {
ccfgs[i] = new CacheConfiguration(CACHES[i])
.setAtomicityMode(atomicityMode)
.setCacheMode(PARTITIONED)
.setBackups(backups)
.setWriteSynchronizationMode(FULL_SYNC)
.setPartitionLossPolicy(partLossPlc)
.setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
}
cfg.setCacheConfiguration(ccfgs);
cfg.setIncludeEventTypes(EventType.EVTS_ALL);
cfg.setActiveOnStart(false);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
cleanPersistenceDir();
}
/**
* @throws Exception if failed.
*/
@Test
public void checkLostPartition() throws Exception {
log.info("Stop sequence: " + IntStream.of(stopNodes).boxed().collect(Collectors.toList()));
boolean safe = persistence || !(partLossPlc == IGNORE && autoAdjust);
String cacheName = CACHES[ThreadLocalRandom.current().nextInt(CACHES.length)];
Map<UUID, Set<Integer>> lostMap = new ConcurrentHashMap<>();
Set<Integer> expLostParts = prepareTopology(nodes, autoAdjust, new P1<Event>() {
@Override public boolean apply(Event evt) {
assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt;
lostMap.computeIfAbsent(evt.node().id(), k -> Collections.synchronizedSet(new HashSet<>())).add(cacheEvt.partition());
return true;
}
}, stopNodes);
int[] stopNodesSorted = Arrays.copyOf(stopNodes, stopNodes.length);
Arrays.sort(stopNodesSorted);
for (Ignite ig : G.allGrids()) {
if (Arrays.binarySearch(stopNodesSorted, getTestIgniteInstanceIndex(ig.name())) >= 0)
continue;
verifyCacheOps(cacheName, expLostParts, ig, safe);
}
// Check that partition state does not change after we return nodes.
for (int i = 0; i < stopNodes.length; i++) {
int node = stopNodes[i];
IgniteEx grd = startGrid(node);
info("Newly started node: " + grd.cluster().localNode().id());
}
for (int i = 0; i < nodes + 1; i++)
verifyCacheOps(cacheName, expLostParts, grid(i), safe);
if (safe)
ignite(0).resetLostPartitions(Arrays.asList(CACHES));
// Do not wait for evictions because it's not guaranteed in the current implementation.
awaitPartitionMapExchange();
for (Ignite ig : G.allGrids()) {
IgniteCache<Integer, Integer> cache = ig.cache(cacheName);
assertTrue(cache.lostPartitions().isEmpty());
int parts = ig.affinity(cacheName).partitions();
for (int i = 0; i < parts; i++) {
cache.get(i);
cache.put(i, i);
}
}
if (safe) {
for (Ignite ig : G.allGrids()) {
if (Arrays.binarySearch(stopNodesSorted, getTestIgniteInstanceIndex(ig.name())) >= 0)
continue;
Set<Integer> lostParts = lostMap.get(ig.cluster().localNode().id());
assertEquals(expLostParts, lostParts);
}
}
}
/**
* @param cacheName Cache name.
* @param expLostParts Expected lost parts.
* @param ig Ignite.
* @param safe Safe.
*/
private void verifyCacheOps(String cacheName, Set<Integer> expLostParts, Ignite ig, boolean safe) {
boolean readOnly = partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL;
IgniteCache<Integer, Integer> cache = ig.cache(cacheName);
int parts = ig.affinity(cacheName).partitions();
if (!safe)
assertTrue(cache.lostPartitions().isEmpty());
// Check single reads.
for (int p = 0; p < parts; p++) {
try {
Integer actual = cache.get(p);
if (safe) {
assertTrue("Reading from a lost partition should have failed [part=" + p + ']',
!cache.lostPartitions().contains(p));
assertEquals(p, actual.intValue());
}
else
assertEquals(expLostParts.contains(p) ? null : p, actual);
}
catch (CacheException e) {
assertTrue(X.getFullStackTrace(e), X.hasCause(e, CacheInvalidStateException.class));
assertTrue("Read exception should only be triggered for a lost partition " +
"[ex=" + X.getFullStackTrace(e) + ", part=" + p + ']', cache.lostPartitions().contains(p));
}
}
// Check single writes.
for (int p = 0; p < parts; p++) {
try {
cache.put(p, p);
if (!safe && expLostParts.contains(p))
cache.remove(p);
if (readOnly) {
assertTrue(!cache.lostPartitions().contains(p));
fail("Writing to a cache containing lost partitions should have failed [part=" + p + ']');
}
if (safe) {
assertTrue("Writing to a lost partition should have failed [part=" + p + ']',
!cache.lostPartitions().contains(p));
}
}
catch (CacheException e) {
assertTrue(X.getFullStackTrace(e), X.hasCause(e, CacheInvalidStateException.class));
assertTrue("Write exception should only be triggered for a lost partition or in read-only mode " +
"[ex=" + X.getFullStackTrace(e) + ", part=" + p + ']', readOnly || cache.lostPartitions().contains(p));
}
}
Set<Integer> notLost = IntStream.range(0, parts).boxed().filter(p -> !expLostParts.contains(p)).collect(Collectors.toSet());
try {
Map<Integer, Integer> res = cache.getAll(expLostParts);
assertFalse("Reads from lost partitions should have been allowed only in non-safe mode", safe);
}
catch (CacheException e) {
assertTrue(X.getFullStackTrace(e), X.hasCause(e, CacheInvalidStateException.class));
}
try {
Map<Integer, Integer> res = cache.getAll(notLost);
}
catch (Exception e) {
fail("Reads from non lost partitions should have been always allowed");
}
try {
cache.putAll(expLostParts.stream().collect(Collectors.toMap(k -> k, v -> v)));
assertFalse("Writes to lost partitions should have been allowed only in non-safe mode", safe);
cache.removeAll(expLostParts);
}
catch (CacheException e) {
assertTrue(X.getFullStackTrace(e), X.hasCause(e, CacheInvalidStateException.class));
}
try {
cache.putAll(notLost.stream().collect(Collectors.toMap(k -> k, v -> v)));
assertTrue("Writes to non-lost partitions should have been allowed only in read-write or non-safe mode",
!safe || !readOnly);
}
catch (CacheException e) {
assertTrue(X.getFullStackTrace(e), X.hasCause(e, CacheInvalidStateException.class));
}
// Check queries.
for (int p = 0; p < parts; p++) {
boolean loc = ig.affinity(cacheName).isPrimary(ig.cluster().localNode(), p);
List<?> objects;
try {
objects = runQuery(ig, cacheName, false, p);
assertTrue("Query over lost partition should have failed: safe=" + safe +
", expLost=" + expLostParts + ", p=" + p, !safe || !expLostParts.contains(p));
if (safe)
assertEquals(1, objects.size());
} catch (Exception e) {
assertTrue(X.getFullStackTrace(e), X.hasCause(e, CacheInvalidStateException.class));
}
try {
runQuery(ig, cacheName, false, -1);
assertFalse("Query should have failed in safe mode with lost partitions", safe);
} catch (Exception e) {
assertTrue("Query must always work in unsafe mode", safe);
assertTrue(X.getFullStackTrace(e), X.hasCause(e, CacheInvalidStateException.class));
}
if (loc) {
try {
objects = runQuery(ig, cacheName, true, p);
assertTrue("Query over lost partition should have failed: safe=" + safe +
", expLost=" + expLostParts + ", p=" + p, !safe || !expLostParts.contains(p));
if (safe)
assertEquals(1, objects.size());
} catch (Exception e) {
assertTrue(X.getFullStackTrace(e), X.hasCause(e, CacheInvalidStateException.class));
}
}
}
}
/**
* @param ig Ignite.
* @param cacheName Cache name.
* @param loc Local.
* @param part Partition.
*/
protected List<?> runQuery(Ignite ig, String cacheName, boolean loc, int part) {
IgniteCache cache = ig.cache(cacheName);
ScanQuery qry = new ScanQuery();
if (part != -1)
qry.setPartition(part);
if (loc)
qry.setLocal(true);
return cache.query(qry).getAll();
}
/**
* @param nodes Nodes.
* @param autoAdjust Auto adjust.
* @param lsnr Listener.
* @param stopNodes Stop nodes.
*/
private Set<Integer> prepareTopology(int nodes, boolean autoAdjust, P1<Event> lsnr, int... stopNodes) throws Exception {
final IgniteEx crd = startGrids(nodes);
crd.cluster().baselineAutoAdjustEnabled(autoAdjust);
crd.cluster().active(true);
Affinity<Object> aff = ignite(0).affinity(CACHES[0]);
for (int i = 0; i < aff.partitions(); i++) {
for (String cacheName0 : CACHES)
ignite(0).cache(cacheName0).put(i, i);
}
client = true;
startGrid(nodes);
client = false;
for (int i = 0; i < nodes; i++)
info(">>> Node [idx=" + i + ", nodeId=" + ignite(i).cluster().localNode().id() + ']');
awaitPartitionMapExchange();
Set<Integer> expLostParts = new HashSet<>();
int[] stopNodesSorted = Arrays.copyOf(stopNodes, stopNodes.length);
Arrays.sort(stopNodesSorted);
// Find partitions not owned by any remaining node.
for (int i = 0; i < PARTS_CNT; i++) {
int c = 0;
for (int idx = 0; idx < nodes; idx++) {
if (Arrays.binarySearch(stopNodesSorted, idx) < 0 && !aff.isPrimary(grid(idx).localNode(), i) && !aff.isBackup(grid(idx).localNode(), i))
c++;
}
if (c == nodes - stopNodes.length)
expLostParts.add(i);
}
assertFalse("Expecting lost partitions for the test scneario", expLostParts.isEmpty());
for (Ignite ignite : G.allGrids()) {
// Prevent rebalancing to bring partitions in owning state.
if (backups > 0) {
TestRecordingCommunicationSpi.spi(ignite).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode clusterNode, Message msg) {
return msg instanceof GridDhtPartitionDemandMessage;
}
});
}
ignite.events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
}
for (int i = 0; i < stopNodes.length; i++)
stopGrid(stopNodes[i], true);
return expLostParts;
}
}