blob: b55dc77fd268fcf30d69be4767822d8475a151b0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.consistency;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.internal.util.typedef.G;
import org.junit.Test;
public abstract class AbstractFullSetReadRepairTest extends AbstractReadRepairTest {
/**
*
*/
protected static final Consumer<ReadRepairData> GET_CHECK_AND_FIX = (data) -> {
IgniteCache<Integer, Integer> cache = data.cache;
Set<Integer> keys = data.data.keySet();
boolean raw = data.raw;
boolean async = data.async;
assert keys.size() == 1;
for (Map.Entry<Integer, InconsistentMapping> entry : data.data.entrySet()) { // Once.
Integer key = entry.getKey();
Integer latest = entry.getValue().latest;
Integer res =
raw ?
async ?
cache.withReadRepair().getEntryAsync(key).get().getValue() :
cache.withReadRepair().getEntry(key).getValue() :
async ?
cache.withReadRepair().getAsync(key).get() :
cache.withReadRepair().get(key);
assertEquals(latest, res);
checkEvent(data);
}
};
/**
*
*/
protected static final Consumer<ReadRepairData> GETALL_CHECK_AND_FIX = (data) -> {
IgniteCache<Integer, Integer> cache = data.cache;
Set<Integer> keys = data.data.keySet();
boolean raw = data.raw;
boolean async = data.async;
assert !keys.isEmpty();
if (raw) {
Collection<CacheEntry<Integer, Integer>> res =
async ?
cache.withReadRepair().getEntriesAsync(keys).get() :
cache.withReadRepair().getEntries(keys);
for (CacheEntry<Integer, Integer> entry : res)
assertEquals(data.data.get(entry.getKey()).latest, entry.getValue());
}
else {
Map<Integer, Integer> res =
async ?
cache.withReadRepair().getAllAsync(keys).get() :
cache.withReadRepair().getAll(keys);
for (Map.Entry<Integer, Integer> entry : res.entrySet())
assertEquals(data.data.get(entry.getKey()).latest, entry.getValue());
}
checkEvent(data);
};
/**
*
*/
protected static final Consumer<ReadRepairData> GET_NULL = (data) -> {
IgniteCache<Integer, Integer> cache = data.cache;
Set<Integer> keys = data.data.keySet();
boolean raw = data.raw;
boolean async = data.async;
assert keys.size() == 1;
for (Map.Entry<Integer, InconsistentMapping> entry : data.data.entrySet()) { // Once.
Integer key = entry.getKey();
Integer missed = key * -1; // Negative to gain null.
Object res =
raw ?
async ?
cache.withReadRepair().getEntryAsync(missed).get() :
cache.withReadRepair().getEntry(missed) :
async ?
cache.withReadRepair().getAsync(missed).get() :
cache.withReadRepair().get(missed);
assertEquals(null, res);
checkEvent( // Checking on null expectations.
new ReadRepairData(
cache,
Collections.singletonMap(
missed,
new InconsistentMapping(new HashMap<>(), null, null)),
raw,
async));
}
};
/**
*
*/
protected static final Consumer<ReadRepairData> CONTAINS_CHECK_AND_FIX = (data) -> {
IgniteCache<Integer, Integer> cache = data.cache;
Set<Integer> keys = data.data.keySet();
boolean async = data.async;
assert keys.size() == 1;
for (Map.Entry<Integer, InconsistentMapping> entry : data.data.entrySet()) { // Once.
Integer key = entry.getKey();
boolean res = async ?
cache.withReadRepair().containsKeyAsync(key).get() :
cache.withReadRepair().containsKey(key);
assertEquals(true, res);
checkEvent(data);
}
};
/**
*
*/
protected static final Consumer<ReadRepairData> CONTAINS_ALL_CHECK_AND_FIX = (data) -> {
IgniteCache<Integer, Integer> cache = data.cache;
Set<Integer> keys = data.data.keySet();
boolean async = data.async;
boolean res = async ?
cache.withReadRepair().containsKeysAsync(keys).get() :
cache.withReadRepair().containsKeys(keys);
assertEquals(true, res);
checkEvent(data);
};
/**
*
*/
protected static final Consumer<ReadRepairData> ENSURE_FIXED = (data) -> {
IgniteCache<Integer, Integer> cache = data.cache;
boolean raw = data.raw;
for (Map.Entry<Integer, InconsistentMapping> entry : data.data.entrySet()) {
Integer key = entry.getKey();
Integer latest = entry.getValue().latest;
Integer res = raw ?
cache.getEntry(key).getValue() :
cache.get(key);
assertEquals(latest, res);
}
};
/**
*
*/
@Test
public void test() throws Exception {
for (Ignite node : G.allGrids()) {
testGetVariations(node);
testGetNull(node);
testContainsVariations(node);
}
}
/**
*
*/
private void testGetVariations(Ignite initiator) throws Exception {
testGet(initiator, 1, false); // just get
testGet(initiator, 1, true); // 1 (all keys available at primary)
testGet(initiator, 2, true); // less than backups
testGet(initiator, 3, true); // equals to backups
testGet(initiator, 4, true); // equals to backups + primary
testGet(initiator, 10, true); // more than backups
}
/**
*
*/
private void testContainsVariations(Ignite initiator) throws Exception {
testContains(initiator, 1, false); // just contains
testContains(initiator, 1, true); // 1 (all keys available at primary)
testContains(initiator, 2, true); // less than backups
testContains(initiator, 3, true); // equals to backups
testContains(initiator, 4, true); // equals to backups + primary
testContains(initiator, 10, true); // more than backups
}
/**
*
*/
protected abstract void testGet(Ignite initiator, Integer cnt, boolean all) throws Exception;
/**
*
*/
protected abstract void testGetNull(Ignite initiator) throws Exception;
/**
*
*/
protected abstract void testContains(Ignite initiator, Integer cnt, boolean all) throws Exception;
}