blob: 20951ffc90fc523e62056ac48204d2a69da52706 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.consistency;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.internal.processors.cache.consistency.ReadRepairDataGenerator.InconsistentMapping;
import org.apache.ignite.internal.processors.cache.consistency.ReadRepairDataGenerator.ReadRepairData;
import org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteIrreparableConsistencyViolationException;
import org.junit.Test;
/**
*
*/
public abstract class AbstractFullSetReadRepairTest extends AbstractReadRepairTest {
/** Negative key to gain null. */
private static final AtomicInteger decrementalKey = new AtomicInteger();
/**
*
*/
protected static final Consumer<ReadRepairData> GET_CHECK_AND_REPAIR = (rrd) -> {
for (Object key : rrd.data.keySet()) { // Once.
assertEqualsArraysAware(unwrapBinaryIfNeeded(rrd.data.get(key).repairedBin), get(rrd));
}
};
/**
*
*/
protected static final Consumer<ReadRepairData> GETALL_CHECK_AND_REPAIR = (rrd) -> {
Map<Object, Object> res = getAll(rrd);
for (Object key : rrd.data.keySet())
assertEqualsArraysAware(unwrapBinaryIfNeeded(rrd.data.get(key).repairedBin), res.get(key));
};
/**
*
*/
protected static final Consumer<ReadRepairData> GET_NULL = (rrd) -> assertNull(get(invertKeys(rrd)));
/**
*
*/
protected static final Consumer<ReadRepairData> GET_ALL_NULL = (rrd) -> assertTrue(getAll(invertKeys(rrd)).isEmpty());
/**
*
*/
protected static final Consumer<ReadRepairData> CONTAINS_CHECK_AND_REPAIR = (rrd) -> {
Set<Object> keys = rrd.data.keySet();
assert keys.size() == 1;
for (Map.Entry<Object, InconsistentMapping> entry : rrd.data.entrySet()) { // Once.
Object key = entry.getKey();
boolean res = rrd.async ?
rrd.cache.withReadRepair(rrd.strategy).containsKeyAsync(key).get() :
rrd.cache.withReadRepair(rrd.strategy).containsKey(key);
assertEquals(entry.getValue().repairedBin != null, res);
}
};
/**
*
*/
protected static final Consumer<ReadRepairData> CONTAINS_ALL_CHECK_AND_REPAIR = (rrd) -> {
Set<Object> keys = rrd.data.keySet();
assert !keys.isEmpty();
boolean res = rrd.async ?
rrd.cache.withReadRepair(rrd.strategy).containsKeysAsync(keys).get() :
rrd.cache.withReadRepair(rrd.strategy).containsKeys(keys);
boolean containsAll = rrd.data.values().stream().allMatch(mapping -> mapping.repairedBin != null);
assertEquals(containsAll, res);
};
/**
*
*/
private static Object get(ReadRepairData rrd) {
Set<Object> keys = rrd.data.keySet();
assert keys.size() == 1;
Object key = keys.iterator().next();
Object res;
if (rrd.raw) {
CacheEntry<Object, Object> rawEntry = rrd.async ?
rrd.cache.withReadRepair(rrd.strategy).getEntryAsync(key).get() :
rrd.cache.withReadRepair(rrd.strategy).getEntry(key);
res = rawEntry != null ? rawEntry.getValue() : null;
}
else
res = rrd.async ?
rrd.cache.withReadRepair(rrd.strategy).getAsync(key).get() :
rrd.cache.withReadRepair(rrd.strategy).get(key);
return checkAndUnwrapBinaryIfNeeded(rrd, res);
}
/**
*
*/
private static Map<Object, Object> getAll(ReadRepairData rrd) {
Set<Object> keys = rrd.data.keySet();
assert !keys.isEmpty();
Map<Object, Object> objs;
if (rrd.raw) {
Collection<CacheEntry<Object, Object>> entryRes =
rrd.async ?
rrd.cache.withReadRepair(rrd.strategy).getEntriesAsync(keys).get() :
rrd.cache.withReadRepair(rrd.strategy).getEntries(keys);
objs = new HashMap<>();
for (CacheEntry<Object, Object> entry : entryRes)
objs.put(entry.getKey(), entry.getValue());
}
else {
objs = rrd.async ?
rrd.cache.withReadRepair(rrd.strategy).getAllAsync(keys).get() :
rrd.cache.withReadRepair(rrd.strategy).getAll(keys);
}
return objs.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey, entry -> checkAndUnwrapBinaryIfNeeded(rrd, entry.getValue())));
}
/**
*
*/
private static Object checkAndUnwrapBinaryIfNeeded(ReadRepairData rrd, Object res) {
if (rrd.binary) {
assert res == null ||
res instanceof Integer ||
res instanceof Map ||
res instanceof List ||
res instanceof Set ||
res instanceof int[] ||
res instanceof Object[] ||
res instanceof BinaryObject : res.getClass();
return unwrapBinaryIfNeeded(res);
}
else {
assert !(res instanceof BinaryObject) : res.getClass();
return res;
}
}
/**
* Inverts keys to gain null on get attempt.
*/
private static ReadRepairData invertKeys(ReadRepairData rrd) {
return new ReadRepairData(
rrd.cache,
rrd.data.entrySet().stream().collect(
Collectors.toMap(
e -> decrementalKey.decrementAndGet(), // Negative key to gain null.
Map.Entry::getValue,
(k, v) -> {
throw new IllegalStateException(String.format("Duplicate key %s", k));
},
HashMap::new)),
rrd.raw,
rrd.async,
rrd.strategy,
rrd.binary);
}
/**
*
*/
protected static final BiConsumer<ReadRepairData, IgniteIrreparableConsistencyViolationException> CHECK_REPAIRED =
(rrd, e) -> {
boolean raw = rrd.raw;
for (Map.Entry<Object, InconsistentMapping> entry : rrd.data.entrySet()) {
Object key = entry.getKey();
// Checking only repaired entries, while entries listed at exception were not repaired.
if (e != null && (e.irreparableKeys().contains(key) ||
(e.repairableKeys() != null && e.repairableKeys().contains(key))))
continue;
Object res;
if (raw) {
CacheEntry<Object, Object> rawEntry = rrd.cache.getEntry(key);
res = rawEntry != null ? rawEntry.getValue() : null;
}
else
res = rrd.cache.get(key);
assertEqualsArraysAware(unwrapBinaryIfNeeded(rrd.data.get(key).repairedBin), unwrapBinaryIfNeeded(res));
}
};
/**
*
*/
protected final BiConsumer<ReadRepairData, Runnable> repairIfRepairable = (rrd, r) -> {
try {
r.run();
assertTrue(rrd.repairable());
}
catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause == null) {
e.printStackTrace();
fail("Unexpected exception: " + e.getMessage());
}
if (!(cause instanceof IgniteIrreparableConsistencyViolationException)) {
cause.printStackTrace();
fail("Unexpected exception: " + cause.getMessage());
}
assertFalse(rrd.repairable());
check(rrd, (IgniteIrreparableConsistencyViolationException)cause, true);
}
};
/**
* @param rrd Data.
*/
protected void check(ReadRepairData rrd, IgniteIrreparableConsistencyViolationException e, boolean evtRecorded) {
Collection<Object> irreparableKeys = e != null ? e.irreparableKeys() : null;
if (e != null) {
Collection<Object> repairableKeys = e.repairableKeys();
if (repairableKeys != null)
assertTrue(Collections.disjoint(repairableKeys, irreparableKeys));
Collection<Object> expectedToBeIrreparableKeys = rrd.data.entrySet().stream()
.filter(entry -> !entry.getValue().repairable)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
assertEqualsCollectionsIgnoringOrder(expectedToBeIrreparableKeys, irreparableKeys);
}
assertEquals(irreparableKeys == null, rrd.repairable());
if (evtRecorded)
checkEvent(rrd, e);
else
checkEventMissed();
CHECK_REPAIRED.accept(rrd, e);
}
/**
*
*/
@Test
public void test() throws Exception {
assertFalse(clsAwareNodes.isEmpty());
for (Ignite initiator : clsAwareNodes) {
test(initiator, 1, false); // just get
test(initiator, 1, true); // 1 (all keys available at primary)
test(initiator, 2, true); // less than backups
test(initiator, 3, true); // equals to backups
test(initiator, 4, true); // equals to backups + primary
test(initiator, 10, true); // more than backups + primary
}
}
/**
*
*/
private void test(Ignite initiator, int cnt, boolean all) throws Exception {
testGet(initiator, cnt, all);
testGetNull(initiator, cnt, all);
testContains(initiator, cnt, all);
}
/**
*
*/
protected abstract void testGet(Ignite initiator, int cnt, boolean all) throws Exception;
/**
*
*/
protected abstract void testGetNull(Ignite initiator, int cnt, boolean all) throws Exception;
/**
*
*/
protected abstract void testContains(Ignite initiator, int cnt, boolean all) throws Exception;
}