blob: 6755f9495c2b7e8d1508b6490a18eece86193f02 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.consistency;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheConsistencyViolationEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.processors.cache.consistency.ReadRepairDataGenerator.InconsistentMapping;
import org.apache.ignite.internal.processors.cache.consistency.ReadRepairDataGenerator.ReadRepairData;
import org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteIrreparableConsistencyViolationException;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.events.EventType.EVT_CONSISTENCY_VIOLATION;
/**
*
*/
public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
/** Partitions. */
protected static final int PARTITIONS = 32;
/** Events. */
private static final ConcurrentLinkedDeque<CacheConsistencyViolationEvent> evtDeq = new ConcurrentLinkedDeque<>();
/** External class loader. */
private static final ClassLoader extClsLdr = getExternalClassLoader();
/** Use external class loader. */
private static volatile boolean useExtClsLdr;
/** Nodes aware of the entry value class. */
protected static volatile List<Ignite> clsAwareNodes;
/** Generator. */
protected static volatile ReadRepairDataGenerator gen;
/** Backups count. */
protected Integer backupsCount() {
return 3;
}
/** Cache mode. */
protected CacheMode cacheMode() {
return PARTITIONED;
}
/** Atomicy mode. */
protected CacheAtomicityMode atomicityMode() {
return TRANSACTIONAL;
}
/** Persistence enabled. */
protected boolean persistenceEnabled() {
return false;
}
/** Server nodes count. */
private int serverNodesCount() {
return backupsCount() + 1/*primary*/ + 1/*not an owner*/;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
if (persistenceEnabled())
cleanPersistenceDir();
Ignite ignite = startGrids(serverNodesCount() / 2); // Server nodes.
useExtClsLdr = true;
List<Ignite> extClsLdrNodes = new ArrayList<>();
while (G.allGrids().size() < serverNodesCount())
extClsLdrNodes.add(startGrid(G.allGrids().size())); // Server nodes.
extClsLdrNodes.add(startClientGrid(G.allGrids().size())); // Client node 1.
clsAwareNodes = extClsLdrNodes;
useExtClsLdr = false;
startClientGrid(G.allGrids().size()); // Client node 2.
final IgniteEvents evts = ignite.events();
evts.remoteListen(null,
(IgnitePredicate<Event>)e -> {
assert e instanceof CacheConsistencyViolationEvent;
evtDeq.add((CacheConsistencyViolationEvent)e);
return true;
},
EVT_CONSISTENCY_VIOLATION);
if (persistenceEnabled())
ignite.cluster().state(ClusterState.ACTIVE);
ignite.getOrCreateCache(cacheConfiguration());
awaitPartitionMapExchange();
gen = new ReadRepairDataGenerator(
DEFAULT_CACHE_NAME,
clsAwareNodes,
extClsLdr,
this::primaryNode,
this::backupNodes,
this::serverNodesCount,
this::backupsCount);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
log.info("Checked " + gen.generated() + " keys");
stopAllGrids();
if (persistenceEnabled())
cleanPersistenceDir();
}
/**
*
*/
protected CacheConfiguration<Integer, Object> cacheConfiguration() {
CacheConfiguration<Integer, Object> cfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
cfg.setCacheMode(cacheMode());
cfg.setAtomicityMode(atomicityMode());
cfg.setBackups(backupsCount());
cfg.setAffinity(new RendezvousAffinityFunction().setPartitions(PARTITIONS));
return cfg;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setIncludeEventTypes(EventType.EVTS_ALL);
if (persistenceEnabled()) {
cfg.setDataStorageConfiguration(new DataStorageConfiguration());
cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().setMaxSize(100 * 1024 * 1024);
cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().setPersistenceEnabled(true);
}
if (useExtClsLdr)
cfg.setClassLoader(extClsLdr);
return cfg;
}
/**
*
*/
protected void checkEventMissed() {
assertTrue(evtDeq.isEmpty());
}
/**
*
*/
protected void checkEvent(ReadRepairData rrd, IgniteIrreparableConsistencyViolationException e) {
Map<Object, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> evtEntries = new HashMap<>();
Map<Object, Object> evtRepaired = new HashMap<>();
Map<Object, InconsistentMapping> inconsistent = rrd.data.entrySet().stream()
.filter(entry -> !entry.getValue().consistent)
.collect(Collectors.toMap(
entry -> gen.toBinary(entry.getKey()), // Events are always contain binary keys, converting expectations.
Map.Entry::getValue));
while (!evtEntries.keySet().equals(inconsistent.keySet())) {
if (!evtDeq.isEmpty()) {
CacheConsistencyViolationEvent evt = evtDeq.remove();
assertEquals(rrd.strategy, evt.getStrategy());
// Optimistic and read committed transactions produce per key repair.
for (Map.Entry<Object, CacheConsistencyViolationEvent.EntriesInfo> entries : evt.getEntries().entrySet()) {
assertFalse(evtEntries.containsKey(entries.getKey())); // Checking that duplicates are absent.
evtEntries.put(entries.getKey(), entries.getValue().getMapping());
}
evtRepaired.putAll(evt.getRepairedEntries());
}
}
for (Map.Entry<Object, InconsistentMapping> mapping : inconsistent.entrySet()) {
Object key = mapping.getKey();
Object repairedBin = mapping.getValue().repairedBin;
Object primaryBin = mapping.getValue().primaryBin;
boolean repairable = mapping.getValue().repairable;
if (!repairable)
assertNotNull(e);
if (e == null) {
assertTrue(repairable);
assertTrue(evtRepaired.containsKey(key));
assertEqualsArraysAware(repairedBin, evtRepaired.get(key));
}
// Repairable but not repaired (because of irreparable entry at the same tx) entries.
else if (e.irreparableKeys().contains(key) || (e.repairableKeys() != null && e.repairableKeys().contains(key)))
assertFalse(evtRepaired.containsKey(key));
Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> evtEntryInfos = evtEntries.get(key);
if (evtEntryInfos != null)
for (Map.Entry<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> evtEntryInfo : evtEntryInfos.entrySet()) {
ClusterNode node = evtEntryInfo.getKey();
CacheConsistencyViolationEvent.EntryInfo info = evtEntryInfo.getValue();
Object val = info.getValue();
if (info.isCorrect())
assertEqualsArraysAware(repairedBin, val);
if (info.isPrimary()) {
assertEqualsArraysAware(primaryBin, val);
assertEquals(node, primaryNode(key, DEFAULT_CACHE_NAME).cluster().localNode());
}
}
}
int irreparableCnt = 0;
int repairableCnt = 0;
if (e != null) {
irreparableCnt = e.irreparableKeys().size();
repairableCnt = e.repairableKeys() != null ? e.repairableKeys().size() : 0;
}
if (repairableCnt > 0)
// Mentioned when pessimistic tx read-repair get contains irreparable entries,
// and it's impossible to repair repairable entries during this call.
assertEquals(TRANSACTIONAL, atomicityMode());
int expectedRepairedCnt = inconsistent.size() - (irreparableCnt + repairableCnt);
assertEquals(expectedRepairedCnt, evtRepaired.size());
assertTrue(evtDeq.isEmpty());
}
/**
* @param obj Object.
*/
protected static Object unwrapBinaryIfNeeded(Object obj) {
return gen.unwrapBinaryIfNeeded(obj);
}
/**
*
*/
protected void generateAndCheck(
Ignite initiator,
int cnt,
boolean raw,
boolean async,
boolean misses,
boolean nulls,
boolean binary,
Consumer<ReadRepairData> c) throws Exception {
gen.generateAndCheck(
initiator,
cnt,
raw,
async,
misses,
nulls,
binary,
null,
c);
}
}