blob: cafe88b7503170658cca7afa6e9e346da6929064 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import com.google.common.collect.Lists;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.store.CacheLocalStore;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
*
*/
public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbstractTest {
/** */
public static final TestLocalStore<Integer, Integer> LOCAL_STORE_1 = new TestLocalStore<>();
/** */
public static final TestLocalStore<Integer, Integer> LOCAL_STORE_2 = new TestLocalStore<>();
/** */
public static final TestLocalStore<Integer, Integer> LOCAL_STORE_3 = new TestLocalStore<>();
/** */
public static final TestLocalStore<Integer, Integer> LOCAL_STORE_4 = new TestLocalStore<>();
/** */
public static final TestLocalStore<Integer, Integer> LOCAL_STORE_5 = new TestLocalStore<>();
/** */
public static final TestLocalStore<Integer, Integer> LOCAL_STORE_6 = new TestLocalStore<>();
/** */
public static final int KEYS = 1025;
/** */
public static final String BACKUP_CACHE_1 = "backup_1";
/** */
public static final String BACKUP_CACHE_2 = "backup_2";
/** */
public static volatile boolean near = false;
/**
*
*/
public GridCacheAbstractLocalStoreSelfTest() {
super(false /* doesn't start grid */);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheConfiguration cacheCfg = cache(igniteInstanceName, DEFAULT_CACHE_NAME, 0);
cacheCfg.setAffinity(new RendezvousAffinityFunction());
CacheConfiguration cacheBackup1Cfg = cache(igniteInstanceName, BACKUP_CACHE_1, 1);
cacheBackup1Cfg.setAffinity(new RendezvousAffinityFunction());
CacheConfiguration cacheBackup2Cfg = cache(igniteInstanceName, BACKUP_CACHE_2, 2);
cacheBackup2Cfg.setAffinity(new RendezvousAffinityFunction());
cfg.setCacheConfiguration(cacheCfg, cacheBackup1Cfg, cacheBackup2Cfg);
cfg.setIncludeEventTypes(EventType.EVTS_ALL);
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
LOCAL_STORE_1.clear();
LOCAL_STORE_2.clear();
LOCAL_STORE_3.clear();
LOCAL_STORE_4.clear();
LOCAL_STORE_5.clear();
LOCAL_STORE_6.clear();
}
/**
* @param igniteInstanceName Ignite instance name.
* @param cacheName Cache name.
* @param backups Number of backups.
* @return Configuration.
*/
@SuppressWarnings("unchecked")
private CacheConfiguration cache(String igniteInstanceName, String cacheName, int backups) {
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setName(cacheName);
cacheCfg.setCacheMode(getCacheMode());
cacheCfg.setAtomicityMode(getAtomicMode());
cacheCfg.setNearConfiguration(nearConfiguration());
cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
cacheCfg.setRebalanceMode(SYNC);
cacheCfg.setCacheStoreFactory(new StoreFactory());
cacheCfg.setWriteThrough(true);
cacheCfg.setReadThrough(true);
cacheCfg.setBackups(backups);
return cacheCfg;
}
/**
* @return Distribution mode.
*/
protected NearCacheConfiguration nearConfiguration() {
return near ? new NearCacheConfiguration() : null;
}
/**
* @return Cache atomicity mode.
*/
protected abstract CacheAtomicityMode getAtomicMode();
/**
* @return Cache mode.
*/
protected abstract CacheMode getCacheMode();
/**
* @throws Exception If failed.
*/
@Test
public void testEvict() throws Exception {
Ignite ignite1 = startGrid(1);
IgniteCache<Object, Object> cache = ignite1.cache(DEFAULT_CACHE_NAME).withExpiryPolicy(new CreatedExpiryPolicy(
new Duration(TimeUnit.MILLISECONDS, 100L)));
// Putting entry.
for (int i = 0; i < KEYS; i++)
cache.put(i, i);
// Wait when entry
U.sleep(200);
// Check that entry is evicted from cache, but local store does contain it.
for (int i = 0; i < KEYS; i++) {
cache.localEvict(Arrays.asList(i));
assertNull(cache.localPeek(i));
assertEquals(i, (int)LOCAL_STORE_1.load(i).get1());
assertEquals(i, cache.get(i));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPrimaryNode() throws Exception {
Ignite ignite1 = startGrid(1);
IgniteCache<Object, Object> cache = ignite1.cache(DEFAULT_CACHE_NAME);
// Populate cache and check that local store has all value.
for (int i = 0; i < KEYS; i++)
cache.put(i, i);
checkLocalStore(ignite1, LOCAL_STORE_1, DEFAULT_CACHE_NAME);
final AtomicInteger evtCnt = new AtomicInteger(0);
if (getCacheMode() != REPLICATED) {
ignite1.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
evtCnt.incrementAndGet();
return true;
}
}, EventType.EVT_CACHE_REBALANCE_PART_UNLOADED);
}
final Ignite ignite2 = startGrid(2);
if (getCacheMode() != REPLICATED) {
boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
// Partition count which must be transferred to 2'nd node.
int parts = ignite2.affinity(DEFAULT_CACHE_NAME).allPartitions(ignite2.cluster().localNode()).length;
return evtCnt.get() >= parts;
}
}, 5000);
assertTrue(wait);
}
assertEquals(Ignition.allGrids().size(), 2);
checkLocalStore(ignite1, LOCAL_STORE_1, DEFAULT_CACHE_NAME);
checkLocalStore(ignite2, LOCAL_STORE_2, DEFAULT_CACHE_NAME);
}
/**
* @throws Exception If failed.
*/
@Test
public void testBackupRestorePrimary() throws Exception {
testBackupRestore();
}
/**
* @throws Exception If failed.
*/
@Test
public void testBackupRestore() throws Exception {
final IgniteEx ignite1 = startGrid(1);
Ignite ignite2 = startGrid(2);
awaitPartitionMapExchange();
// We need a backup key on grid 1, so we must wait for late affinity assignment to change.
AffinityTopologyVersion waitTopVer = new AffinityTopologyVersion(2, 1);
grid(1).context().cache().context().exchange().affinityReadyFuture(waitTopVer).get();
grid(2).context().cache().context().exchange().affinityReadyFuture(waitTopVer).get();
final String name = BACKUP_CACHE_2;
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
AffinityTopologyVersion topVer = ignite1.context().cache().context().cacheContext(CU.cacheId(name))
.affinity().affinityTopologyVersion();
return topVer.topologyVersion() == 2 && topVer.minorTopologyVersion() == 1;
// return ignite1.affinity(name).primaryPartitions(ignite1.cluster().localNode()).length <
// ignite1.affinity(name).partitions();
}
}, 10_000));
int key1 = -1;
int key2 = -1;
for (int i = 0; i < KEYS; i++) {
if (ignite1.affinity(name).isPrimary(ignite1.cluster().localNode(), i)) {
key1 = i;
break;
}
}
for (int i = 0; i < KEYS; i++) {
if (!ignite1.affinity(name).isPrimary(ignite1.cluster().localNode(), i)) {
key2 = i;
break;
}
}
assertTrue(key1 >= 0);
assertTrue(key2 >= 0);
assertNotSame(key1, key2);
assertEquals(0, ignite1.cache(name).size());
assertEquals(0, LOCAL_STORE_1.map.size());
assertEquals(0, LOCAL_STORE_2.map.size());
IgniteCache<Integer, Integer> cache = ignite1.cache(name).withAllowAtomicOpsInTx();
try (Transaction tx = ignite1.transactions().txStart()) {
cache.put(key1, key1);
cache.put(key2, key2);
Map<Integer, Integer> m = new TreeMap<>();
for (int i = KEYS; i < KEYS + 100; i++)
m.put(i, i);
cache.putAll(m);
tx.commit();
}
assertEquals(102, LOCAL_STORE_1.map.size());
assertEquals(102, LOCAL_STORE_2.map.size());
stopGrid(1);
assertEquals(1, G.allGrids().size());
assertEquals(key1, ignite2.cache(name).get(key1));
assertEquals(key2, ignite2.cache(name).get(key2));
for (int i = KEYS; i < KEYS + 100; i++)
assertEquals(i, ignite2.cache(name).get(i));
assertEquals(102, LOCAL_STORE_1.map.size());
assertEquals(102, LOCAL_STORE_2.map.size());
assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return ignite(2).cache(name).size() == 102;
}
}, 5000);
stopGrid(2);
assertEquals(0, G.allGrids().size());
assertEquals(102, LOCAL_STORE_1.map.size());
assertEquals(102, LOCAL_STORE_2.map.size());
// Node restart. One node should have everything at local store.
ignite2 = startGrid(2);
assertEquals(1, G.allGrids().size());
assertEquals(key1, ignite2.cache(name).get(key1));
assertEquals(key2, ignite2.cache(name).get(key2));
for (int i = KEYS; i < KEYS + 100; i++)
assertEquals(i, ignite2.cache(name).get(i));
assertEquals(102, ignite2.cache(name).size());
assertEquals(102, LOCAL_STORE_1.map.size());
assertEquals(102, LOCAL_STORE_2.map.size());
}
/**
* @throws Exception If failed.
*/
@Test
public void testLocalStoreCorrespondsAffinityWithBackups() throws Exception {
testLocalStoreCorrespondsAffinity(BACKUP_CACHE_2);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLocalStoreCorrespondsAffinityWithBackup() throws Exception {
testLocalStoreCorrespondsAffinity(BACKUP_CACHE_1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLocalStoreCorrespondsAffinityNoBackups() throws Exception {
testLocalStoreCorrespondsAffinity(DEFAULT_CACHE_NAME);
}
/**
* @throws Exception If failed.
*/
private void testLocalStoreCorrespondsAffinity(String name) throws Exception {
near = true;
try {
for (int i = 1; i <= 6; i++)
startGrid(i);
assertTrue(((IgniteCacheProxy)grid(1).cache(name)).context().isNear() ||
getCacheMode() == REPLICATED);
awaitPartitionMapExchange();
Random rn = new Random();
for (int i = 1; i <= 6; i++)
assertEquals(0, grid(i).cache(name).localSize(CachePeekMode.NEAR));
for (int i = 0; i < KEYS; i++) {
Ignite ignite = grid(rn.nextInt(6) + 1);
IgniteCache<Integer, Integer> cache = ignite.cache(name).withAllowAtomicOpsInTx();
try (Transaction tx = ignite.transactions().txStart()) {
cache.put(i, i);
for (int j = 0; j < 5; j++)
cache.get(rn.nextInt(KEYS));
Map<Integer, Integer> m = new TreeMap<>();
for (int j = 0; j < 5; j++) {
Integer key = rn.nextInt(KEYS);
m.put(key, key);
}
cache.putAll(m);
tx.commit();
}
}
for (int i = 1; i <= 6; i++) {
assertTrue(grid(i).cache(name).localSize(CachePeekMode.NEAR) > 0 ||
getCacheMode() == REPLICATED);
}
checkLocalStore(grid(1), LOCAL_STORE_1, name);
checkLocalStore(grid(2), LOCAL_STORE_2, name);
checkLocalStore(grid(3), LOCAL_STORE_3, name);
checkLocalStore(grid(4), LOCAL_STORE_4, name);
checkLocalStore(grid(5), LOCAL_STORE_5, name);
checkLocalStore(grid(6), LOCAL_STORE_6, name);
int fullStoreSize = LOCAL_STORE_1.map.size() +
LOCAL_STORE_2.map.size() +
LOCAL_STORE_3.map.size() +
LOCAL_STORE_4.map.size() +
LOCAL_STORE_5.map.size() +
LOCAL_STORE_6.map.size();
CacheConfiguration ccfg = grid(1).cache(name).getConfiguration(CacheConfiguration.class);
assertEquals(
getCacheMode() == REPLICATED ?
KEYS * 6 :
ccfg.getBackups() * KEYS + KEYS,
fullStoreSize);
}
finally {
near = false;
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testLocalStoreWithNearKeysPrimary() throws Exception {
try {
System.setProperty(IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY, "true");
testLocalStoreWithNearKeys();
}
finally {
System.setProperty(IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY, "false");
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testLocalStoreWithNearKeysPrimaryAndBackups() throws Exception {
testLocalStoreWithNearKeys();
}
/**
* @throws Exception If failed.
*/
@Test
public void testLocalStoreWithNearKeys() throws Exception {
if (getCacheMode() == REPLICATED)
return;
near = true;
try {
for (int i = 1; i <= 3; i++)
startGrid(i);
awaitPartitionMapExchange();
Ignite ignite = grid(1);
int k = 0;
for (int i = 1; i <= 3; i++) {
int kP = -1;
int kB = -1;
int kN = -1;
while (kP == -1 || kB == -1 || kN == -1) {
if (ignite.affinity(BACKUP_CACHE_1).isPrimary(grid(1).cluster().localNode(), k))
kP = k;
if (ignite.affinity(BACKUP_CACHE_1).isBackup(grid(1).cluster().localNode(), k) &&
ignite.affinity(BACKUP_CACHE_1).isPrimary(grid(2).cluster().localNode(), k))
kB = k;
if (!ignite.affinity(BACKUP_CACHE_1).isPrimaryOrBackup(grid(1).cluster().localNode(), k) &&
ignite.affinity(BACKUP_CACHE_1).isPrimary(grid(3).cluster().localNode(), k))
kN = k;
k++;
}
assertTrue(kP != kB && kB != kN && kN != kP);
Map<Integer, Integer> m = new TreeMap<>();
m.put(kP, kP);
m.put(kB, kB);
m.put(kN, kN);
IgniteCache<Integer, Integer> cache = grid(i).cache(BACKUP_CACHE_1).withAllowAtomicOpsInTx();
try (Transaction tx = grid(i).transactions().txStart()) {
cache.putAll(m);
tx.commit();
}
boolean locStoreBackups = !IgniteSystemProperties.getBoolean(IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY);
checkLocalStore(grid(1), LOCAL_STORE_1, BACKUP_CACHE_1, m.keySet(), locStoreBackups);
checkLocalStore(grid(2), LOCAL_STORE_2, BACKUP_CACHE_1, m.keySet(), locStoreBackups);
checkLocalStore(grid(3), LOCAL_STORE_3, BACKUP_CACHE_1, m.keySet(), locStoreBackups);
}
grid(1).cache(BACKUP_CACHE_1).removeAll();
Random rn = new Random();
for (int i = 1; i <= 3; i++) {
IgniteCache<Integer, Integer> cache = grid(i).cache(BACKUP_CACHE_1)
.withSkipStore().withAllowAtomicOpsInTx();
try (Transaction tx = grid(i).transactions().txStart()) {
Map<Integer, Integer> m = new TreeMap<>();
for (int j = 0; j < 50; j++)
m.put(rn.nextInt(1000), 1000);
cache.putAll(m);
tx.commit();
}
assertTrue(LOCAL_STORE_1.map.isEmpty());
assertTrue(LOCAL_STORE_2.map.isEmpty());
assertTrue(LOCAL_STORE_3.map.isEmpty());
}
}
finally {
near = false;
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testBackupNode() throws Exception {
Ignite ignite1 = startGrid(1);
IgniteCache<Object, Object> cache = ignite1.cache(BACKUP_CACHE_2);
for (int i = 0; i < KEYS; i++)
cache.put(i, i);
for (int i = 0; i < KEYS; i++) {
assertEquals(LOCAL_STORE_1.load(i).get1().intValue(), i);
assertNull(LOCAL_STORE_2.load(i));
assertNull(LOCAL_STORE_3.load(i));
}
startGrid(2);
assertEquals(2, Ignition.allGrids().size());
for (int i = 0; i < KEYS; i++) {
assertEquals(LOCAL_STORE_1.load(i).get1().intValue(), i);
assertEquals(LOCAL_STORE_2.load(i).get1().intValue(), i);
assertNull(LOCAL_STORE_3.load(i));
}
startGrid(3);
assertEquals(Ignition.allGrids().size(), 3);
for (int i = 0; i < KEYS; i++)
cache.put(i, i * 3);
for (int i = 0; i < KEYS; i++) {
assertEquals(LOCAL_STORE_1.load(i).get1().intValue(), i * 3);
assertEquals(LOCAL_STORE_2.load(i).get1().intValue(), i * 3);
assertEquals(LOCAL_STORE_3.load(i).get1().intValue(), i * 3);
}
// Stop 3'nd node.
stopGrid(3, true);
for (int i = 0; i < KEYS; i++)
cache.put(i, i * 7);
assertEquals(Ignition.allGrids().size(), 2);
for (int i = 0; i < KEYS; i++) {
assertEquals(LOCAL_STORE_1.load(i).get1().intValue(), i * 7);
assertEquals(LOCAL_STORE_2.load(i).get1().intValue(), i * 7);
assertEquals(LOCAL_STORE_3.load(i).get1().intValue(), i * 3);
}
startGrid(3);
assertEquals(Ignition.allGrids().size(), 3);
for (int i = 0; i < KEYS; i++) {
assertEquals(LOCAL_STORE_1.load(i).get1().intValue(), i * 7);
assertEquals(LOCAL_STORE_2.load(i).get1().intValue(), i * 7);
assertEquals(LOCAL_STORE_3.load(i).get1().intValue(), i * 7);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testSwap() throws Exception {
Ignite ignite1 = startGrid(1);
IgniteCache<Object, Object> cache = ignite1.cache(DEFAULT_CACHE_NAME);
// Populate cache and check that local store has all value.
for (int i = 0; i < KEYS; i++)
cache.put(i, i);
checkLocalStore(ignite1, LOCAL_STORE_1, DEFAULT_CACHE_NAME);
// Push entry to swap.
for (int i = 0; i < KEYS; i++)
cache.localEvict(Lists.newArrayList(i));
for (int i = 0; i < KEYS; i++)
assertNull(cache.localPeek(i, CachePeekMode.ONHEAP));
final AtomicInteger evtCnt = new AtomicInteger(0);
if (getCacheMode() != REPLICATED) {
ignite1.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
evtCnt.incrementAndGet();
return true;
}
}, EventType.EVT_CACHE_REBALANCE_PART_UNLOADED);
}
final Ignite ignite2 = startGrid(2);
if (getCacheMode() != REPLICATED) {
boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
// Partition count which must be transferred to 2'nd node.
int parts = ignite2.affinity(DEFAULT_CACHE_NAME).allPartitions(ignite2.cluster().localNode()).length;
return evtCnt.get() >= parts;
}
}, 5000);
assertTrue(wait);
}
assertEquals(Ignition.allGrids().size(), 2);
checkLocalStore(ignite1, LOCAL_STORE_1, DEFAULT_CACHE_NAME);
checkLocalStore(ignite2, LOCAL_STORE_2, DEFAULT_CACHE_NAME);
}
/**
* Checks that local stores contains only primary entry.
* @param ignite Ignite.
* @param store Store.
* @param name Cache name.
*/
private void checkLocalStore(Ignite ignite, CacheStore<Integer, IgniteBiTuple<Integer, ?>> store, String name) {
for (int i = 0; i < KEYS; i++) {
if (ignite.affinity(name).isPrimaryOrBackup(ignite.cluster().localNode(), i))
assertEquals(store.load(i).get1().intValue(), i);
else
assertNull(store.load(i));
}
}
/**
* Checks that local stores contains primary and backup entries.
* @param ignite Ignite.
* @param store Store.
* @param name Cache name.
* @param keys keys.
*/
private void checkLocalStore(Ignite ignite, CacheStore<Integer, IgniteBiTuple<Integer, ?>> store, String name,
Set<Integer> keys) {
checkLocalStore(ignite, store, name, keys, true);
}
/**
* Checks that local stores contains primary and backup or only primary entries.
*
* @param ignite Ignite.
* @param store Store.
* @param name Cache name.
* @param keys keys.
*/
private void checkLocalStore(Ignite ignite, CacheStore<Integer, IgniteBiTuple<Integer, ?>> store, String name,
Set<Integer> keys, boolean withBackups) {
for (int key : keys) {
if (withBackups) {
if (ignite.affinity(name).isPrimaryOrBackup(ignite.cluster().localNode(), key))
assertEquals(store.load(key).get1().intValue(), key);
else
assertNull(store.load(key));
}
else {
if (ignite.affinity(name).isPrimary(ignite.cluster().localNode(), key))
assertEquals(store.load(key).get1().intValue(), key);
else
assertNull(store.load(key));
}
}
}
/**
*
*/
@CacheLocalStore
public static class TestLocalStore<K, V> implements CacheStore<K, IgniteBiTuple<V, ?>> {
/** */
private final Map<K, IgniteBiTuple<V, ?>> map = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<K, IgniteBiTuple<V, ?>> clo, @Nullable Object... args)
throws CacheLoaderException {
// No-op.
}
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) throws CacheWriterException {
// No-op.
}
/** {@inheritDoc} */
@Override public IgniteBiTuple<V, ?> load(K key) throws CacheLoaderException {
return map.get(key);
}
/** {@inheritDoc} */
@Override public Map<K, IgniteBiTuple<V, ?>> loadAll(Iterable<? extends K> keys) throws CacheLoaderException {
Map<K, IgniteBiTuple<V, ?>> res = new TreeMap<>();
for (K key : keys) {
IgniteBiTuple<V, ?> val = map.get(key);
if (val != null)
res.put(key, val);
}
return res;
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry<? extends K, ? extends IgniteBiTuple<V, ?>> entry)
throws CacheWriterException {
map.put(entry.getKey(), entry.getValue());
}
/** {@inheritDoc} */
@Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends IgniteBiTuple<V, ?>>> entries)
throws CacheWriterException {
for (Cache.Entry<? extends K, ? extends IgniteBiTuple<V, ?>> e : entries)
map.put(e.getKey(), e.getValue());
}
/** {@inheritDoc} */
@Override public void delete(Object key) throws CacheWriterException {
map.remove(key);
}
/** {@inheritDoc} */
@Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
for (Object key : keys)
map.remove(key);
}
/**
* Clear store.
*/
public void clear() {
map.clear();
}
}
/**
*
*/
static class StoreFactory implements Factory<CacheStore> {
/** */
@IgniteInstanceResource
private Ignite node;
/** {@inheritDoc} */
@Override public CacheStore create() {
String igniteInstanceName = node.configuration().getIgniteInstanceName();
if (igniteInstanceName.endsWith("1"))
return LOCAL_STORE_1;
else if (igniteInstanceName.endsWith("2"))
return LOCAL_STORE_2;
else if (igniteInstanceName.endsWith("3"))
return LOCAL_STORE_3;
else if (igniteInstanceName.endsWith("4"))
return LOCAL_STORE_4;
else if (igniteInstanceName.endsWith("5"))
return LOCAL_STORE_5;
else
return LOCAL_STORE_6;
}
}
}