blob: a055be5f0b2d322e40a568315f7cb1bcfd29cef1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.consistency;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteConsistencyViolationException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Ideally, this test should be equal to {@link ImplicitTransactionalReadRepairTest} except atomicy mode
* configuration.
*
* Implement repair on atomic caches to make this happen :)
*/
@RunWith(Parameterized.class)
public class AtomicReadRepairTest extends ImplicitTransactionalReadRepairTest {
/**
*
*/
private static final Consumer<ReadRepairData> GET_CHECK_AND_FAIL = (data) -> {
IgniteCache<Integer, Integer> cache = data.cache;
Set<Integer> keys = data.data.keySet();
boolean raw = data.raw;
boolean async = data.async;
assert keys.size() == 1;
for (Map.Entry<Integer, InconsistentMapping> entry : data.data.entrySet()) { // Once.
try {
Integer key = entry.getKey();
Integer res =
raw ?
async ?
cache.withReadRepair().getEntryAsync(key).get().getValue() :
cache.withReadRepair().getEntry(key).getValue() :
async ?
cache.withReadRepair().getAsync(key).get() :
cache.withReadRepair().get(key);
fail("Should not happen.");
}
catch (CacheException e) {
assertTrue(e.getCause() instanceof IgniteConsistencyViolationException);
}
}
};
/**
*
*/
private static final Consumer<ReadRepairData> GETALL_CHECK_AND_FAIL = (data) -> {
IgniteCache<Integer, Integer> cache = data.cache;
Set<Integer> keys = data.data.keySet();
boolean raw = data.raw;
boolean async = data.async;
assert !keys.isEmpty();
try {
if (raw) {
Collection<CacheEntry<Integer, Integer>> res =
async ?
cache.withReadRepair().getEntriesAsync(keys).get() :
cache.withReadRepair().getEntries(keys);
for (CacheEntry<Integer, Integer> entry : res)
assertEquals(data.data.get(entry.getKey()).latest, entry.getValue());
}
else {
Map<Integer, Integer> res =
async ?
cache.withReadRepair().getAllAsync(keys).get() :
cache.withReadRepair().getAll(keys);
for (Map.Entry<Integer, Integer> entry : res.entrySet())
assertEquals(data.data.get(entry.getKey()).latest, entry.getValue());
}
fail("Should not happen.");
}
catch (CacheException e) {
assertTrue(e.getCause() instanceof IgniteConsistencyViolationException);
}
};
/**
*
*/
private static final Consumer<ReadRepairData> CONTAINS_CHECK_AND_FAIL = (data) -> {
IgniteCache<Integer, Integer> cache = data.cache;
Set<Integer> keys = data.data.keySet();
boolean async = data.async;
assert keys.size() == 1;
for (Map.Entry<Integer, InconsistentMapping> entry : data.data.entrySet()) { // Once.
try {
Integer key = entry.getKey();
boolean res = async ?
cache.withReadRepair().containsKeyAsync(key).get() :
cache.withReadRepair().containsKey(key);
fail("Should not happen.");
}
catch (Exception e) {
assertTrue(e.getCause() instanceof IgniteConsistencyViolationException);
}
}
};
/**
*
*/
private static final Consumer<ReadRepairData> CONTAINS_ALL_CHECK_AND_FAIL = (data) -> {
IgniteCache<Integer, Integer> cache = data.cache;
Set<Integer> keys = data.data.keySet();
boolean async = data.async;
try {
boolean res = async ?
cache.withReadRepair().containsKeysAsync(keys).get() :
cache.withReadRepair().containsKeys(keys);
fail("Should not happen.");
}
catch (Exception e) {
assertTrue(e.getCause() instanceof IgniteConsistencyViolationException);
}
};
/** {@inheritDoc} */
@Override protected CacheAtomicityMode atomicyMode() {
return CacheAtomicityMode.ATOMIC;
}
/** {@inheritDoc} */
@Override protected void testGet(Ignite initiator, Integer cnt, boolean all) throws Exception {
prepareAndCheck(
initiator,
cnt,
raw,
async,
(ReadRepairData data) -> {
if (all)
GETALL_CHECK_AND_FAIL.accept(data);
else
GET_CHECK_AND_FAIL.accept(data);
check(data, true, false);
});
}
/** {@inheritDoc} */
@Override protected void testContains(Ignite initiator, Integer cnt, boolean all) throws Exception {
prepareAndCheck(
initiator,
cnt,
raw,
async,
(ReadRepairData data) -> {
if (all)
CONTAINS_ALL_CHECK_AND_FAIL.accept(data);
else
CONTAINS_CHECK_AND_FAIL.accept(data);
check(data, true, false);
});
}
}