blob: 892ce878dfbae11e1d8634dde72576fc13cbd9de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.Lock;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.configuration.Factory;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import com.google.common.collect.Sets;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheExistsException;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheInterceptorAdapter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory;
import org.apache.ignite.internal.util.collection.IntMap;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.gridfunc.ContainsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.Nullable;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
* TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11820 https://issues.apache.org/jira/browse/IGNITE-11797
*/
@SuppressWarnings({"unchecked", "ThrowableNotThrown"})
public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
/** */
private static final String GROUP1 = "grp1";
/** */
private static final String GROUP2 = "grp2";
/** */
private static final String GROUP3 = "grp3";
/** */
private static final String CACHE1 = "cache1";
/** */
private static final String CACHE2 = "cache2";
/** */
private static final int ASYNC_TIMEOUT = 5000;
/** */
private CacheConfiguration[] ccfgs;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
if (ccfgs != null) {
cfg.setCacheConfiguration(ccfgs);
ccfgs = null;
}
return cfg;
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 10 * 60_000;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
super.afterTest();
}
/**
* @throws Exception If failed.
*/
@Test
public void testCloseCache1() throws Exception {
startGrid(0);
Ignite client = startClientGrid(1);
IgniteCache c1 = client.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 0, false));
checkCacheGroup(0, GROUP1, true);
checkCacheGroup(0, GROUP1, true);
checkCache(0, "c1", 10);
checkCache(1, "c1", 10);
c1.close();
checkCacheGroup(0, GROUP1, true);
checkCacheGroup(1, GROUP1, false);
checkCache(0, "c1", 10);
assertNotNull(client.cache("c1"));
checkCacheGroup(0, GROUP1, true);
checkCacheGroup(1, GROUP1, true);
checkCache(0, "c1", 10);
checkCache(1, "c1", 10);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCreateDestroyCaches1() throws Exception {
createDestroyCaches(1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCreateDestroyCaches2() throws Exception {
createDestroyCaches(5);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCreateCacheWithSameNameInAnotherGroup() throws Exception {
startGridsMultiThreaded(2);
final Ignite ignite = ignite(0);
ignite.createCache(cacheConfiguration(GROUP1, CACHE1, PARTITIONED, ATOMIC, 2, false));
GridTestUtils.assertThrows(null, new GridPlainCallable<Void>() {
@Override public Void call() throws Exception {
ignite(1).createCache(cacheConfiguration(GROUP2, CACHE1, PARTITIONED, ATOMIC, 2, false));
return null;
}
}, CacheExistsException.class, "a cache with the same name is already started");
}
/**
* @throws Exception If failed.
*/
@Test
public void testCreateDestroyCachesAtomicPartitioned() throws Exception {
createDestroyCaches(PARTITIONED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCreateDestroyCachesTxPartitioned() throws Exception {
createDestroyCaches(PARTITIONED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCreateDestroyCachesMvccTxPartitioned() throws Exception {
createDestroyCaches(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCreateDestroyCachesAtomicReplicated() throws Exception {
createDestroyCaches(REPLICATED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCreateDestroyCachesTxReplicated() throws Exception {
createDestroyCaches(REPLICATED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCreateDestroyCachesMvccTxReplicated() throws Exception {
createDestroyCaches(REPLICATED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testScanQueryAtomicPartitioned() throws Exception {
scanQuery(PARTITIONED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testScanQueryTxPartitioned() throws Exception {
scanQuery(PARTITIONED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testScanQueryMvccTxPartitioned() throws Exception {
scanQuery(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testScanQueryAtomicReplicated() throws Exception {
scanQuery(REPLICATED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testScanQueryTxReplicated() throws Exception {
scanQuery(REPLICATED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testScanQueryMvccTxReplicated() throws Exception {
scanQuery(REPLICATED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEntriesTtlAtomicPartitioned() throws Exception {
entriesTtl(PARTITIONED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEntriesTtlTxPartitioned() throws Exception {
entriesTtl(PARTITIONED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-7311")
@Test
public void testEntriesTtlMvccTxPartitioned() throws Exception {
entriesTtl(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEntriesTtlAtomicReplicated() throws Exception {
entriesTtl(REPLICATED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEntriesTtlTxReplicated() throws Exception {
entriesTtl(REPLICATED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-7311")
@Test
public void testEntriesTtlMvccTxReplicated() throws Exception {
entriesTtl(REPLICATED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheIteratorAtomicPartitioned() throws Exception {
cacheIterator(PARTITIONED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheIteratorTxPartitioned() throws Exception {
cacheIterator(PARTITIONED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheIteratorMvccTxPartitioned() throws Exception {
cacheIterator(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheIteratorAtomicReplicated() throws Exception {
cacheIterator(REPLICATED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheIteratorTxReplicated() throws Exception {
cacheIterator(REPLICATED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheIteratorMvccTxReplicated() throws Exception {
cacheIterator(REPLICATED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testScanQueryMultiplePartitionsAtomicPartitioned() throws Exception {
scanQueryMultiplePartitions(PARTITIONED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testScanQueryMultiplePartitionsTxPartitioned() throws Exception {
scanQueryMultiplePartitions(PARTITIONED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testScanQueryMultiplePartitionsMvccTxPartitioned() throws Exception {
scanQueryMultiplePartitions(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testScanQueryMultiplePartitionsAtomicReplicated() throws Exception {
scanQueryMultiplePartitions(REPLICATED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testScanQueryMultiplePartitionsTxReplicated() throws Exception {
scanQueryMultiplePartitions(REPLICATED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testScanQueryMultiplePartitionsMvccTxReplicated() throws Exception {
scanQueryMultiplePartitions(REPLICATED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testContinuousQueryTxReplicated() throws Exception {
continuousQuery(REPLICATED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testContinuousQueryMvccTxReplicated() throws Exception {
continuousQuery(REPLICATED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testContinuousQueryTxPartitioned() throws Exception {
continuousQuery(PARTITIONED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testContinuousQueryMvccTxPartitioned() throws Exception {
continuousQuery(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testContinuousQueryAtomicReplicated() throws Exception {
continuousQuery(REPLICATED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testContinuousQueryAtomicPartitioned() throws Exception {
continuousQuery(PARTITIONED, ATOMIC);
}
/**
* @param cacheMode Cache mode.
* @param atomicityMode Cache atomicity mode.
* @throws Exception If failed.
*/
private void scanQuery(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
int keys = 10_000;
Integer[] data1 = generateData(keys);
Integer[] data2 = generateData(keys);
startGridsMultiThreaded(4);
Ignite srv0 = ignite(0);
srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false));
srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false));
IgniteCache<Integer, Integer> cache1;
IgniteCache<Integer, Integer> cache2;
if (atomicityMode == TRANSACTIONAL) {
Ignite ignite = ignite(1);
cache1 = ignite.cache(CACHE1);
cache2 = ignite.cache(CACHE2);
try (Transaction tx = ignite.transactions().txStart()) {
for (int i = 0; i < keys; i++) {
cache1.put(i, data1[i]);
cache2.put(i, data2[i]);
}
tx.commit();
}
}
else {
// Async put ops.
int ldrs = 4;
List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
for (int i = 0; i < ldrs; i++) {
cls.add(putOperation(1, ldrs, i, CACHE1, data1));
cls.add(putOperation(2, ldrs, i, CACHE2, data2));
}
GridTestUtils.runMultiThreaded(cls, "loaders");
}
ScanQuery<Integer, Integer> qry = new ScanQuery<>();
Set<Integer> keysSet = sequence(keys);
for (Cache.Entry<Integer, Integer> entry : ignite(3).cache(CACHE1).query(qry)) {
assertTrue(keysSet.remove(entry.getKey()));
assertEquals(data1[entry.getKey()], entry.getValue());
}
assertTrue(keysSet.isEmpty());
srv0.destroyCache(CACHE1);
keysSet = sequence(keys);
for (Cache.Entry<Integer, Integer> entry : ignite(3).cache(CACHE2).query(qry)) {
assertTrue(keysSet.remove(entry.getKey()));
assertEquals(data2[entry.getKey()], entry.getValue());
}
assertTrue(keysSet.isEmpty());
}
/**
* @param cacheMode Cache mode.
* @param atomicityMode Cache atomicity mode.
* @throws Exception If failed.
*/
private void continuousQuery(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
final int keys = 10_000;
Integer[] data1 = generateData(keys);
Integer[] data2 = generateData(keys);
startGridsMultiThreaded(4);
Ignite srv0 = ignite(0);
srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false));
srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false));
final AtomicInteger cntr1 = new AtomicInteger();
final AtomicInteger cntr2 = new AtomicInteger();
CacheEntryUpdatedListener lsnr1 = new CacheEntryUpdatedListener<Integer, Integer>() {
@Override public void onUpdated(
Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
for (CacheEntryEvent<? extends Integer, ? extends Integer> ignored : evts)
cntr1.incrementAndGet();
}
};
CacheEntryUpdatedListener lsnr2 = new CacheEntryUpdatedListener<Integer, Integer>() {
@Override public void onUpdated(
Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
for (CacheEntryEvent<? extends Integer, ? extends Integer> ignored : evts)
cntr2.incrementAndGet();
}
};
QueryCursor qry1 = ignite(2).cache(CACHE1).query(new ContinuousQuery<>().setLocalListener(lsnr1));
QueryCursor qry2 = ignite(3).cache(CACHE2).query(new ContinuousQuery<>().setLocalListener(lsnr2));
if (atomicityMode == TRANSACTIONAL) {
Ignite ignite = ignite(1);
IgniteCache<Integer, Integer> cache1 = ignite.cache(CACHE1);
IgniteCache<Integer, Integer> cache2 = ignite.cache(CACHE2);
try (Transaction tx = ignite.transactions().txStart()) {
for (int i = 0; i < keys; i++) {
cache1.put(i, data1[i]);
cache2.put(i, data2[i]);
}
tx.commit();
}
}
else {
int ldrs = 4;
List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
for (int i = 0; i < ldrs; i++) {
cls.add(putOperation(1, ldrs, i, CACHE1, data1));
cls.add(putOperation(2, ldrs, i, CACHE2, data2));
}
GridTestUtils.runMultiThreaded(cls, "loaders");
}
GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
return cntr1.get() == keys && cntr2.get() == keys;
}
}, 2000);
assertEquals(cntr1.get(), keys);
assertEquals(cntr2.get(), keys);
qry1.close();
Map<Integer, Integer> map = generateDataMap(10);
srv0.cache(CACHE1).putAll(map);
srv0.cache(CACHE2).putAll(map);
GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
return cntr2.get() == keys + 10;
}
}, 2000);
assertEquals(keys + 10, cntr2.get());
assertEquals(keys, cntr1.get());
qry2.close();
}
/**
* @param cacheMode Cache mode.
* @param atomicityMode Cache atomicity mode.
* @throws Exception If failed.
*/
private void scanQueryMultiplePartitions(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
int keys = 10000;
Integer[] data1 = generateData(keys);
Integer[] data2 = generateData(keys);
startGridsMultiThreaded(4);
Ignite srv0 = ignite(0);
srv0.createCache(
cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false)
.setAffinity(new RendezvousAffinityFunction().setPartitions(32)));
srv0.createCache(
cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false)
.setAffinity(new RendezvousAffinityFunction().setPartitions(32)));
awaitPartitionMapExchange();
IgniteCache<Integer, Integer> cache1;
IgniteCache<Integer, Integer> cache2;
if (atomicityMode == TRANSACTIONAL) {
Ignite ignite = ignite(1);
cache1 = ignite.cache(CACHE1);
cache2 = ignite.cache(CACHE2);
try (Transaction tx = ignite.transactions().txStart()) {
for (int i = 0; i < keys; i++) {
cache1.put(i, data1[i]);
cache2.put(i, data2[i]);
}
tx.commit();
}
}
else {
// Async put ops.
int ldrs = 4;
List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
for (int i = 0; i < ldrs; i++) {
cls.add(putOperation(1, ldrs, i, CACHE1, data1));
cls.add(putOperation(2, ldrs, i, CACHE2, data2));
}
GridTestUtils.runMultiThreaded(cls, "loaders");
}
int p = ThreadLocalRandom.current().nextInt(32);
ScanQuery<Integer, Integer> qry = new ScanQuery().setPartition(p);
Set<Integer> keysSet = new TreeSet<>();
cache1 = ignite(3).cache(CACHE1);
Affinity<Integer> aff = affinity(cache1);
for (int i = 0; i < keys; i++) {
if (aff.partition(i) == p)
keysSet.add(i);
}
for (Cache.Entry<Integer, Integer> entry : cache1.query(qry)) {
assertTrue(keysSet.remove(entry.getKey()));
assertEquals(data1[entry.getKey()], entry.getValue());
}
assertTrue(keysSet.isEmpty());
srv0.destroyCache(CACHE1);
keysSet = new TreeSet<>();
cache2 = ignite(3).cache(CACHE2);
aff = affinity(cache2);
for (int i = 0; i < keys; i++) {
if (aff.partition(i) == p)
keysSet.add(i);
}
for (Cache.Entry<Integer, Integer> entry : cache2.query(qry)) {
assertTrue(keysSet.remove(entry.getKey()));
assertEquals(data2[entry.getKey()], entry.getValue());
}
assertTrue(keysSet.isEmpty());
}
/**
* @param cacheMode Cache mode.
* @param atomicityMode Cache atomicity mode.
* @throws Exception If failed.
*/
private void cacheIterator(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
int keys = 10000;
Integer[] data1 = generateData(keys);
Integer[] data2 = generateData(keys);
startGridsMultiThreaded(4);
Ignite srv0 = ignite(0);
srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false));
srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false));
awaitPartitionMapExchange();
if (atomicityMode == TRANSACTIONAL) {
Ignite ignite = ignite(1);
IgniteCache cache1 = ignite.cache(CACHE1);
IgniteCache cache2 = ignite.cache(CACHE2);
try (Transaction tx = ignite.transactions().txStart()) {
for (int i = 0; i < keys; i++) {
cache1.put(i, data1[i]);
cache2.put(i, data2[i]);
}
tx.commit();
}
}
else {
// Async put ops.
int ldrs = 4;
List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
for (int i = 0; i < ldrs; i++) {
cls.add(putOperation(1, ldrs, i, CACHE1, data1));
cls.add(putOperation(2, ldrs, i, CACHE2, data2));
}
GridTestUtils.runMultiThreaded(cls, "loaders");
}
Set<Integer> keysSet = sequence(keys);
for (Cache.Entry<Integer, Integer> entry : ignite(3).<Integer, Integer>cache(CACHE1)) {
assertTrue(keysSet.remove(entry.getKey()));
assertEquals(data1[entry.getKey()], entry.getValue());
}
assertTrue(keysSet.isEmpty());
srv0.destroyCache(CACHE1);
keysSet = sequence(keys);
for (Cache.Entry<Integer, Integer> entry : ignite(3).<Integer, Integer>cache(CACHE2)) {
assertTrue(keysSet.remove(entry.getKey()));
assertEquals(data2[entry.getKey()], entry.getValue());
}
assertTrue(keysSet.isEmpty());
}
/**
* @param cacheMode Cache mode.
* @param atomicityMode Cache atomicity mode.
* @throws Exception If failed.
*/
private void entriesTtl(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
int keys = 10000;
final int ttl = 10000;
Integer[] data1 = generateData(keys);
Integer[] data2 = generateData(keys);
startGridsMultiThreaded(4);
Ignite srv0 = ignite(0);
srv0.createCache(
cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false)
// -1 = ETERNAL just created entries are not expiring
// -2 = NOT_CHANGED not to change ttl on entry update
.setExpiryPolicyFactory(new PlatformExpiryPolicyFactory(-1, -2, ttl)).setEagerTtl(true)
);
srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false));
awaitPartitionMapExchange();
if (atomicityMode == TRANSACTIONAL) {
Ignite ignite = ignite(1);
IgniteCache cache1 = ignite.cache(CACHE1);
IgniteCache cache2 = ignite.cache(CACHE2);
try (Transaction tx = ignite.transactions().txStart()) {
for (int i = 0; i < keys; i++) {
cache1.put(i, data1[i]);
cache2.put(i, data2[i]);
}
tx.commit();
}
}
else {
// async put ops
int ldrs = 4;
List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
for (int i = 0; i < ldrs; i++) {
cls.add(putOperation(1, ldrs, i, CACHE1, data1));
cls.add(putOperation(2, ldrs, i, CACHE2, data2));
}
GridTestUtils.runMultiThreaded(cls, "loaders");
}
checkData(3, CACHE1, data1);
checkData(3, CACHE2, data2);
srv0.destroyCache(CACHE2);
checkData(3, CACHE1, data1);
// Wait for expiration
Thread.sleep((long)(ttl * 1.2));
assertEquals(0, ignite(3).cache(CACHE1).size());
}
/**
* @param cacheMode Cache mode.
* @param atomicityMode Cache atomicity mode.
* @throws Exception If failed.
*/
private void createDestroyCaches(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
int keys = 10000;
Integer[] data1 = generateData(keys);
Integer[] data2 = generateData(keys);
startGridsMultiThreaded(4);
Ignite srv0 = ignite(0);
srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false));
srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false));
awaitPartitionMapExchange();
if (atomicityMode == TRANSACTIONAL) {
Ignite ignite = ignite(1);
IgniteCache cache1 = ignite.cache(CACHE1);
IgniteCache cache2 = ignite.cache(CACHE2);
try (Transaction tx = ignite.transactions().txStart()) {
for (int i = 0; i < keys; i++) {
cache1.put(i, data1[i]);
cache2.put(i, data2[i]);
}
tx.commit();
}
}
else {
int ldrs = 4;
List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
for (int i = 0; i < ldrs; i++) {
cls.add(putOperation(1, ldrs, i, CACHE1, data1));
cls.add(putOperation(2, ldrs, i, CACHE2, data2));
}
GridTestUtils.runMultiThreaded(cls, "loaders");
}
checkLocalData(3, CACHE1, data1);
checkLocalData(0, CACHE2, data2);
checkData(0, CACHE1, data1);
checkData(3, CACHE2, data2);
ignite(1).destroyCache(CACHE2);
startGrid(5);
awaitPartitionMapExchange();
checkData(5, CACHE1, data1);
checkLocalData(5, CACHE1, data1);
ignite(1).destroyCache(CACHE1);
checkCacheGroup(5, GROUP1, false);
}
/**
* @param idx Node index.
* @param ldrs Loaders count.
* @param ldrIdx Loader index.
* @param cacheName Cache name.
* @param data Data.
* @return Callable for put operation.
*/
private Callable<Void> putOperation(
final int idx,
final int ldrs,
final int ldrIdx,
final String cacheName,
final Integer[] data) {
return new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteCache cache = ignite(idx).cache(cacheName);
for (int j = 0, size = data.length; j < size; j++) {
if (j % ldrs == ldrIdx)
cache.put(j, data[j]);
}
return null;
}
};
}
/**
* Creates an array of random integers.
*
* @param cnt Array length.
* @return Array of random integers.
*/
private Integer[] generateData(int cnt) {
Random rnd = ThreadLocalRandom.current();
Integer[] data = new Integer[cnt];
for (int i = 0; i < data.length; i++)
data[i] = rnd.nextInt();
return data;
}
/**
* Creates a map with random integers.
*
* @param cnt Map size length.
* @return Map with random integers.
*/
private Map<Integer, Integer> generateDataMap(int cnt) {
return generateDataMap(0, cnt);
}
/**
* Creates a map with random integers.
*
* @param startKey Start key.
* @param cnt Map size length.
* @return Map with random integers.
*/
private Map<Integer, Integer> generateDataMap(int startKey, int cnt) {
Random rnd = ThreadLocalRandom.current();
Map<Integer, Integer> data = new TreeMap<>();
for (int i = 0; i < cnt; i++)
data.put(startKey++, rnd.nextInt());
return data;
}
/**
* @param cnt Sequence length.
* @return Sequence of integers.
*/
private Set<Integer> sequence(int cnt) {
Set<Integer> res = new TreeSet<>();
for (int i = 0; i < cnt; i++)
res.add(i);
return res;
}
/**
* @param idx Node index.
* @param cacheName Cache name.
* @param data Expected data.
* @throws Exception If failed.
*/
private void checkData(int idx, String cacheName, Integer[] data) throws Exception {
Set<Integer> keys = sequence(data.length);
Set<Map.Entry<Integer, Integer>> entries =
ignite(idx).<Integer, Integer>cache(cacheName).getAll(keys).entrySet();
for (Map.Entry<Integer, Integer> entry : entries) {
assertTrue(keys.remove(entry.getKey()));
assertEquals(data[entry.getKey()], entry.getValue());
}
assertTrue(keys.isEmpty());
}
/**
* @param idx Node index.
* @param cacheName Cache name.
* @param data Expected data.
* @throws Exception If failed.
*/
private void checkLocalData(int idx, String cacheName, Integer[] data) throws Exception {
Ignite ignite = ignite(idx);
ClusterNode node = ignite.cluster().localNode();
IgniteCache cache = ignite.<Integer, Integer>cache(cacheName);
Affinity aff = affinity(cache);
Set<Integer> locKeys = new TreeSet<>();
for (int key = 0; key < data.length; key++) {
if (aff.isPrimaryOrBackup(node, key))
locKeys.add(key);
}
Iterable<Cache.Entry<Integer, Integer>> locEntries = cache.localEntries(CachePeekMode.OFFHEAP);
for (Cache.Entry<Integer, Integer> entry : locEntries) {
assertTrue(locKeys.remove(entry.getKey()));
assertEquals(data[entry.getKey()], entry.getValue());
}
assertTrue(locKeys.isEmpty());
}
/**
* @param srvs Number of server nodes.
* @throws Exception If failed.
*/
private void createDestroyCaches(int srvs) throws Exception {
startGridsMultiThreaded(srvs);
checkCacheDiscoveryDataConsistent();
Ignite srv0 = ignite(0);
for (int i = 0; i < srvs; i++)
checkCacheGroup(i, GROUP1, false);
for (int iter = 0; iter < 3; iter++) {
log.info("Iteration: " + iter);
srv0.createCache(cacheConfiguration(GROUP1, CACHE1, PARTITIONED, ATOMIC, 2, false));
checkCacheDiscoveryDataConsistent();
for (int i = 0; i < srvs; i++) {
checkCacheGroup(i, GROUP1, true);
checkCache(i, CACHE1, 10);
}
srv0.createCache(cacheConfiguration(GROUP1, CACHE2, PARTITIONED, ATOMIC, 2, false));
checkCacheDiscoveryDataConsistent();
for (int i = 0; i < srvs; i++) {
checkCacheGroup(i, GROUP1, true);
checkCache(i, CACHE2, 10);
}
srv0.destroyCache(CACHE1);
checkCacheDiscoveryDataConsistent();
for (int i = 0; i < srvs; i++) {
checkCacheGroup(i, GROUP1, true);
checkCache(i, CACHE2, 10);
}
srv0.destroyCache(CACHE2);
checkCacheDiscoveryDataConsistent();
for (int i = 0; i < srvs; i++)
checkCacheGroup(i, GROUP1, false);
}
}
/**
* @param idx Node index.
* @param cacheName Cache name.
* @param ops Number of operations to execute.
*/
private void checkCache(int idx, String cacheName, int ops) {
IgniteCache cache = ignite(idx).cache(cacheName);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < ops; i++) {
Integer key = rnd.nextInt();
cache.put(key, i);
assertEquals(i, cache.get(key));
}
}
/**
* @param cache3 {@code True} if add last cache.
* @return Cache configurations.
*/
private CacheConfiguration[] staticConfigurations1(boolean cache3) {
CacheConfiguration[] ccfgs = new CacheConfiguration[cache3 ? 3 : 2];
ccfgs[0] = cacheConfiguration(null, "cache1", PARTITIONED, ATOMIC, 2, false);
ccfgs[1] = cacheConfiguration(GROUP1, "cache2", PARTITIONED, ATOMIC, 2, false);
if (cache3)
ccfgs[2] = cacheConfiguration(GROUP1, "cache3", PARTITIONED, ATOMIC, 2, false);
return ccfgs;
}
/**
* @throws Exception If failed.
*/
@Test
public void testDiscoveryDataConsistency1() throws Exception {
ccfgs = staticConfigurations1(true);
Ignite srv0 = startGrid(0);
ccfgs = staticConfigurations1(true);
startGrid(1);
checkCacheDiscoveryDataConsistent();
ccfgs = null;
startGrid(2);
checkCacheDiscoveryDataConsistent();
srv0.createCache(cacheConfiguration(null, "cache4", PARTITIONED, ATOMIC, 2, false));
checkCacheDiscoveryDataConsistent();
ccfgs = staticConfigurations1(true);
startGrid(3);
checkCacheDiscoveryDataConsistent();
srv0.createCache(cacheConfiguration(GROUP1, "cache5", PARTITIONED, ATOMIC, 2, false));
ccfgs = staticConfigurations1(true);
startGrid(4);
checkCacheDiscoveryDataConsistent();
for (int i = 0; i < 5; i++)
checkCacheGroup(i, GROUP1, true);
srv0.destroyCache("cache1");
srv0.destroyCache("cache2");
srv0.destroyCache("cache3");
checkCacheDiscoveryDataConsistent();
ccfgs = staticConfigurations1(true);
startGrid(5);
checkCacheDiscoveryDataConsistent();
for (int i = 0; i < 6; i++)
checkCacheGroup(i, GROUP1, true);
srv0.destroyCache("cache1");
srv0.destroyCache("cache2");
srv0.destroyCache("cache3");
srv0.destroyCache("cache4");
srv0.destroyCache("cache5");
ccfgs = staticConfigurations1(true);
startGrid(6);
checkCacheDiscoveryDataConsistent();
srv0.createCache(cacheConfiguration(null, "cache4", PARTITIONED, ATOMIC, 2, false));
srv0.createCache(cacheConfiguration(GROUP1, "cache5", PARTITIONED, ATOMIC, 2, false));
checkCacheDiscoveryDataConsistent();
ccfgs = staticConfigurations1(false);
startGrid(7);
checkCacheDiscoveryDataConsistent();
awaitPartitionMapExchange();
}
/**
* @param cnt Caches number.
* @param grp Cache groups.
* @param baseName Caches name prefix.
* @return Cache configurations.
*/
private CacheConfiguration[] cacheConfigurations(int cnt, String grp, String baseName) {
CacheConfiguration[] ccfgs = new CacheConfiguration[cnt];
for (int i = 0; i < cnt; i++) {
ccfgs[i] = cacheConfiguration(grp,
baseName + i, PARTITIONED,
i % 2 == 0 ? TRANSACTIONAL : ATOMIC,
2,
false).setAffinity(new RendezvousAffinityFunction(false, 256));
}
return ccfgs;
}
/**
* @throws Exception If failed.
*/
@Test
public void testStartManyCaches() throws Exception {
final int CACHES = SF.apply(5_000);
final int NODES = 4;
for (int i = 0; i < NODES; i++) {
ccfgs = cacheConfigurations(CACHES, GROUP1, "testCache1-");
boolean client = i == NODES - 1;
if (client)
startClientGrid(i);
else
startGrid(i);
}
Ignite client = ignite(NODES - 1);
client.createCaches(Arrays.asList(cacheConfigurations(CACHES, GROUP2, "testCache2-")));
checkCacheDiscoveryDataConsistent();
for (int i = 0; i < NODES; i++) {
log.info("Check node: " + i);
for (int c = 0; c < 10; c++) {
int cache = ThreadLocalRandom.current().nextInt(CACHES);
checkCache(i, "testCache1-" + cache, 1);
checkCache(i, "testCache2-" + cache, 1);
}
}
log.info("Stop nodes.");
GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
@Override public void apply(Integer idx) {
stopGrid(idx);
}
}, NODES, "stopThread");
}
/**
* @throws Exception If failed.
*/
@Test
public void testRebalance1() throws Exception {
Ignite srv0 = startGrid(0);
IgniteCache<Object, Object> srv0Cache1 =
srv0.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false));
IgniteCache<Object, Object> srv0Cache2 =
srv0.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false));
IgniteCache<Object, Object> srv0Cache3 =
srv0.createCache(cacheConfiguration(GROUP2, "c3", PARTITIONED, TRANSACTIONAL, 2, false));
IgniteCache<Object, Object> srv0Cache4 =
srv0.createCache(cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 2, false));
IgniteCache<Object, Object> srv0Cache5 =
srv0.createCache(cacheConfiguration(GROUP3, "c5", PARTITIONED, TRANSACTIONAL_SNAPSHOT, 2, false));
IgniteCache<Object, Object> srv0Cache6 =
srv0.createCache(cacheConfiguration(GROUP3, "c6", PARTITIONED, TRANSACTIONAL_SNAPSHOT, 2, false));
final int ITEMS = 1_000;
for (int i = 0; i < ITEMS; i++) {
srv0Cache1.put(new Key1(i), i);
srv0Cache3.put(new Key1(i), i);
srv0Cache4.put(new Key1(i), -i);
srv0Cache5.put(new Key1(i), i);
srv0Cache6.put(new Key1(i), -i);
}
assertEquals(ITEMS, srv0Cache1.size());
assertEquals(ITEMS, srv0Cache1.localSize());
assertEquals(0, srv0Cache2.size());
assertEquals(ITEMS, srv0Cache3.size());
assertEquals(ITEMS, srv0Cache4.localSize());
assertEquals(ITEMS, srv0Cache5.size());
assertEquals(ITEMS, srv0Cache6.localSize());
startGrid(1);
awaitPartitionMapExchange();
for (int i = 0; i < 2; i++) {
Ignite node = ignite(i);
IgniteCache<Object, Object> cache1 = node.cache("c1");
IgniteCache<Object, Object> cache2 = node.cache("c2");
IgniteCache<Object, Object> cache3 = node.cache("c3");
IgniteCache<Object, Object> cache4 = node.cache("c4");
IgniteCache<Object, Object> cache5 = node.cache("c5");
IgniteCache<Object, Object> cache6 = node.cache("c6");
assertEquals(ITEMS * 2, cache1.size(CachePeekMode.ALL));
assertEquals(ITEMS, cache1.localSize(CachePeekMode.ALL));
assertEquals(0, cache2.size(CachePeekMode.ALL));
assertEquals(0, cache2.localSize(CachePeekMode.ALL));
assertEquals(ITEMS * 2, cache3.size(CachePeekMode.ALL));
assertEquals(ITEMS, cache3.localSize(CachePeekMode.ALL));
assertEquals(ITEMS * 2, cache4.size(CachePeekMode.ALL));
assertEquals(ITEMS, cache4.localSize(CachePeekMode.ALL));
assertEquals(ITEMS * 2, cache5.size(CachePeekMode.ALL));
assertEquals(ITEMS, cache5.localSize(CachePeekMode.ALL));
assertEquals(ITEMS * 2, cache6.size(CachePeekMode.ALL));
assertEquals(ITEMS, cache6.localSize(CachePeekMode.ALL));
for (int k = 0; k < ITEMS; k++) {
assertEquals(i, cache1.localPeek(new Key1(i)));
assertNull(cache2.localPeek(new Key1(i)));
assertEquals(i, cache3.localPeek(new Key1(i)));
assertEquals(-i, cache4.localPeek(new Key1(i)));
assertEquals(i, cache5.localPeek(new Key1(i)));
assertEquals(-i, cache6.localPeek(new Key1(i)));
}
}
for (int i = 0; i < ITEMS * 2; i++)
srv0Cache2.put(new Key1(i), i + 1);
Ignite srv2 = startGrid(2);
awaitPartitionMapExchange();
for (int i = 0; i < 3; i++) {
Ignite node = ignite(i);
IgniteCache<Object, Object> cache1 = node.cache("c1");
IgniteCache<Object, Object> cache2 = node.cache("c2");
IgniteCache<Object, Object> cache3 = node.cache("c3");
IgniteCache<Object, Object> cache4 = node.cache("c4");
IgniteCache<Object, Object> cache5 = node.cache("c5");
IgniteCache<Object, Object> cache6 = node.cache("c6");
assertEquals(ITEMS * 3, cache1.size(CachePeekMode.ALL));
assertEquals(ITEMS, cache1.localSize(CachePeekMode.ALL));
assertEquals(ITEMS * 6, cache2.size(CachePeekMode.ALL));
assertEquals(ITEMS * 2, cache2.localSize(CachePeekMode.ALL));
assertEquals(ITEMS, cache3.localSize(CachePeekMode.ALL));
assertEquals(ITEMS, cache4.localSize(CachePeekMode.ALL));
assertEquals(ITEMS, cache5.localSize(CachePeekMode.ALL));
assertEquals(ITEMS, cache6.localSize(CachePeekMode.ALL));
}
IgniteCache<Object, Object> srv2Cache1 = srv2.cache("c1");
IgniteCache<Object, Object> srv2Cache2 = srv2.cache("c2");
for (int i = 0; i < ITEMS; i++)
assertEquals(i, srv2Cache1.localPeek(new Key1(i)));
for (int i = 0; i < ITEMS * 2; i++)
assertEquals(i + 1, srv2Cache2.localPeek(new Key1(i)));
}
/**
* @throws Exception If failed.
*/
@Test
public void testRebalance2() throws Exception {
Ignite srv0 = startGrid(0);
IgniteCache<Object, Object> srv0Cache1 =
srv0.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 0, false));
IgniteCache<Object, Object> srv0Cache2 =
srv0.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 0, false));
Affinity aff = srv0.affinity("c1");
final int ITEMS = 2_000;
Map<Integer, Integer> c1Data = new HashMap<>();
Map<Integer, Integer> c2Data = new HashMap<>();
for (int i = 0; i < ITEMS; i++) {
srv0Cache1.put(i, i);
c1Data.put(i, i);
if (i % 2 == 0) {
srv0Cache2.put(i, i);
c2Data.put(i, i);
}
}
assertEquals(ITEMS, srv0Cache1.size());
assertEquals(ITEMS / 2, srv0Cache2.size());
Ignite srv1 = startGrid(1);
awaitPartitionMapExchange();
assertEquals(ITEMS, srv0Cache1.size());
assertEquals(ITEMS / 2, srv0Cache2.size());
checkCacheData(c1Data, "c1");
checkCacheData(c2Data, "c2");
Set<Integer> srv1Parts = new HashSet<>();
for (Integer p : aff.primaryPartitions(srv1.cluster().localNode()))
srv1Parts.add(p);
CacheGroupContext grpSrv0 = cacheGroup(srv0, GROUP1);
CacheGroupContext grpSrv1 = cacheGroup(srv1, GROUP1);
for (int p = 0; p < aff.partitions(); p++) {
if (srv1Parts.contains(p)) {
GridIterator<CacheDataRow> it = grpSrv0.offheap().partitionIterator(p);
assertFalse(it.hasNext());
it = grpSrv1.offheap().partitionIterator(p);
assertTrue(it.hasNext());
}
else {
GridIterator<CacheDataRow> it = grpSrv0.offheap().partitionIterator(p);
assertTrue(it.hasNext());
it = grpSrv1.offheap().partitionIterator(p);
assertFalse(it.hasNext());
}
}
c1Data = new HashMap<>();
c2Data = new HashMap<>();
for (int i = 0; i < ITEMS; i++) {
srv0Cache1.put(i, i + 1);
c1Data.put(i, i + 1);
if (i % 2 == 0) {
srv0Cache2.put(i, i + 1);
c2Data.put(i, i + 1);
}
}
checkCacheData(c1Data, "c1");
checkCacheData(c2Data, "c2");
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoKeyIntersectTx() throws Exception {
testNoKeyIntersect(TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoKeyIntersectMvccTx() throws Exception {
testNoKeyIntersect(TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoKeyIntersectAtomic() throws Exception {
testNoKeyIntersect(ATOMIC);
}
/**
* @param atomicityMode Atomicity mode.
* @throws Exception If failed.
*/
private void testNoKeyIntersect(CacheAtomicityMode atomicityMode) throws Exception {
startGrid(0);
testNoKeyIntersect(atomicityMode, false);
testNoKeyIntersect(atomicityMode, true);
startGridsMultiThreaded(1, 4);
testNoKeyIntersect(atomicityMode, false);
testNoKeyIntersect(atomicityMode, true);
}
/**
* @param keys Keys.
* @param rnd Random.
* @return Added key.
*/
private Integer addKey(Set<Integer> keys, ThreadLocalRandom rnd) {
for (;;) {
Integer key = rnd.nextInt(100_000);
if (keys.add(key))
return key;
}
}
/**
* @param atomicityMode Cache atomicity mode.
* @param heapCache On heap cache flag.
* @throws Exception If failed.
*/
private void testNoKeyIntersect(CacheAtomicityMode atomicityMode, boolean heapCache) throws Exception {
Ignite srv0 = ignite(0);
try {
IgniteCache cache1 = srv0.
createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, atomicityMode, 1, heapCache));
Set<Integer> keys = new LinkedHashSet<>(30);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < 10; i++) {
Integer key = addKey(keys, rnd);
cache1.put(key, key);
cache1.put(new Key1(key), new Value1(key));
cache1.put(new Key2(key), new Value2(key));
}
assertEquals(30, cache1.size());
IgniteCache cache2 = srv0.
createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, atomicityMode, 1, heapCache));
assertEquals(30, cache1.size());
assertEquals(0, cache2.size());
for (Integer key : keys) {
assertNull(cache2.get(key));
assertNull(cache2.get(new Key1(key)));
assertNull(cache2.get(new Key2(key)));
cache2.put(key, key + 1);
cache2.put(new Key1(key), new Value1(key + 1));
cache2.put(new Key2(key), new Value2(key + 1));
}
assertEquals(30, cache1.size());
assertEquals(30, cache2.size());
for (int i = 0; i < 10; i++) {
Integer key = addKey(keys, rnd);
cache2.put(key, key + 1);
cache2.put(new Key1(key), new Value1(key + 1));
cache2.put(new Key2(key), new Value2(key + 1));
}
assertEquals(30, cache1.size());
assertEquals(60, cache2.size());
int i = 0;
for (Integer key : keys) {
if (i++ < 10) {
assertEquals(key, cache1.get(key));
assertEquals(new Value1(key), cache1.get(new Key1(key)));
assertEquals(new Value2(key), cache1.get(new Key2(key)));
}
else {
assertNull(cache1.get(key));
assertNull(cache1.get(new Key1(key)));
assertNull(cache1.get(new Key2(key)));
}
assertEquals(key + 1, cache2.get(key));
assertEquals(new Value1(key + 1), cache2.get(new Key1(key)));
assertEquals(new Value2(key + 1), cache2.get(new Key2(key)));
}
IgniteCache cache3 = srv0.
createCache(cacheConfiguration(GROUP1, "c3", PARTITIONED, atomicityMode, 1, heapCache));
assertEquals(30, cache1.size());
assertEquals(60, cache2.size());
assertEquals(0, cache3.size());
for (Integer key : keys) {
assertNull(cache3.get(key));
assertNull(cache3.get(new Key1(key)));
assertNull(cache3.get(new Key2(key)));
}
for (Integer key : keys) {
cache3.put(key, key);
cache3.put(new Key1(key), new Value1(key));
cache3.put(new Key2(key), new Value2(key));
}
i = 0;
for (Integer key : keys) {
if (i++ < 10) {
assertEquals(key, cache1.get(key));
assertEquals(new Value1(key), cache1.get(new Key1(key)));
assertEquals(new Value2(key), cache1.get(new Key2(key)));
}
else {
assertNull(cache1.get(key));
assertNull(cache1.get(new Key1(key)));
assertNull(cache1.get(new Key2(key)));
}
assertEquals(key + 1, cache2.get(key));
assertEquals(new Value1(key + 1), cache2.get(new Key1(key)));
assertEquals(new Value2(key + 1), cache2.get(new Key2(key)));
assertEquals(key, cache3.get(key));
assertEquals(new Value1(key), cache3.get(new Key1(key)));
assertEquals(new Value2(key), cache3.get(new Key2(key)));
}
i = 0;
for (Integer key : keys) {
if (i++ == 3)
break;
cache1.remove(key);
cache1.remove(new Key1(key));
cache1.remove(new Key2(key));
assertNull(cache1.get(key));
assertNull(cache1.get(new Key1(key)));
assertNull(cache1.get(new Key2(key)));
assertEquals(key + 1, cache2.get(key));
assertEquals(new Value1(key + 1), cache2.get(new Key1(key)));
assertEquals(new Value2(key + 1), cache2.get(new Key2(key)));
assertEquals(key, cache3.get(key));
assertEquals(new Value1(key), cache3.get(new Key1(key)));
assertEquals(new Value2(key), cache3.get(new Key2(key)));
}
cache1.removeAll();
for (Integer key : keys) {
assertNull(cache1.get(key));
assertNull(cache1.get(new Key1(key)));
assertNull(cache1.get(new Key2(key)));
assertEquals(key + 1, cache2.get(key));
assertEquals(new Value1(key + 1), cache2.get(new Key1(key)));
assertEquals(new Value2(key + 1), cache2.get(new Key2(key)));
assertEquals(key, cache3.get(key));
assertEquals(new Value1(key), cache3.get(new Key1(key)));
assertEquals(new Value2(key), cache3.get(new Key2(key)));
}
cache2.removeAll();
for (Integer key : keys) {
assertNull(cache1.get(key));
assertNull(cache1.get(new Key1(key)));
assertNull(cache1.get(new Key2(key)));
assertNull(cache2.get(key));
assertNull(cache2.get(new Key1(key)));
assertNull(cache2.get(new Key2(key)));
assertEquals(key, cache3.get(key));
assertEquals(new Value1(key), cache3.get(new Key1(key)));
assertEquals(new Value2(key), cache3.get(new Key2(key)));
}
if (atomicityMode == TRANSACTIONAL)
testNoKeyIntersectTxLocks(cache1, cache2);
}
finally {
srv0.destroyCaches(Arrays.asList("c1", "c2", "c3"));
}
}
/**
* @param cache1 Cache1.
* @param cache2 Cache2.
* @throws Exception If failed.
*/
private void testNoKeyIntersectTxLocks(final IgniteCache cache1, final IgniteCache cache2) throws Exception {
final Ignite node = (Ignite)cache1.unwrap(Ignite.class);
for (int i = 0; i < 5; i++) {
final Integer key = ThreadLocalRandom.current().nextInt(1000);
Lock lock = cache1.lock(key);
lock.lock();
try {
IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
@Override public Object call() throws Exception {
Lock lock1 = cache1.lock(key);
assertFalse(lock1.tryLock());
Lock lock2 = cache2.lock(key);
assertTrue(lock2.tryLock());
lock2.unlock();
return null;
}
}, "lockThread");
fut.get(10_000);
}
finally {
lock.unlock();
}
try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache1.put(key, 1);
IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
@Override public Object call() throws Exception {
try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache2.put(key, 2);
tx.commit();
}
assertEquals(2, cache2.get(key));
return null;
}
}, "txThread");
fut.get(10_000);
tx.commit();
}
assertEquals(1, cache1.get(key));
assertEquals(2, cache2.get(key));
try (Transaction tx = node.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = (Integer)cache1.get(key);
cache1.put(key, val + 10);
IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
@Override public Object call() throws Exception {
try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache2.put(key, 3);
tx.commit();
}
assertEquals(3, cache2.get(key));
return null;
}
}, "txThread");
fut.get(10_000);
tx.commit();
}
assertEquals(11, cache1.get(key));
assertEquals(3, cache2.get(key));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheApiTxPartitioned() throws Exception {
cacheApiTest(PARTITIONED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-7952")
@Test
public void testCacheApiMvccTxPartitioned() throws Exception {
cacheApiTest(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheApiTxReplicated() throws Exception {
cacheApiTest(REPLICATED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-7952")
@Test
public void testCacheApiMvccTxReplicated() throws Exception {
cacheApiTest(REPLICATED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheApiAtomicPartitioned() throws Exception {
cacheApiTest(PARTITIONED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheApiAtomicReplicated() throws Exception {
cacheApiTest(REPLICATED, ATOMIC);
}
/**
* @param cacheMode Cache mode.
* @param atomicityMode Atomicity mode.
* @throws Exception If failed.
*/
private void cacheApiTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
startGridsMultiThreaded(4);
startClientGrid(4);
int[] backups = cacheMode == REPLICATED ? new int[]{Integer.MAX_VALUE} : new int[]{0, 1, 2, 3};
for (int backups0 : backups)
cacheApiTest(cacheMode, atomicityMode, backups0, false, false, false);
int backups0 = cacheMode == REPLICATED ? Integer.MAX_VALUE :
backups[ThreadLocalRandom.current().nextInt(backups.length)];
cacheApiTest(cacheMode, atomicityMode, backups0, true, false, false);
if (cacheMode == PARTITIONED) {
// Here the f variable is used as a bit set where 2 last bits
// determine whether a near cache is used on server/client side.
// The case without near cache is already tested at this point.
for (int f : new int[]{1, 2, 3}) {
cacheApiTest(cacheMode, atomicityMode, backups0, false, nearSrv(f), nearClient(f));
cacheApiTest(cacheMode, atomicityMode, backups0, true, nearSrv(f), nearClient(f));
}
}
}
/**
* @param flag Flag.
* @return {@code True} if near cache should be used on a client side.
*/
private boolean nearClient(int flag) {
return (flag & 0b01) == 0b01;
}
/**
* @param flag Flag.
* @return {@code True} if near cache should be used on a server side.
*/
private boolean nearSrv(int flag) {
return (flag & 0b10) == 0b10;
}
/**
* @param cacheMode Cache mode.
* @param atomicityMode Atomicity mode.
* @param backups Number of backups.
* @param heapCache On heap cache flag.
* @param nearSrv {@code True} if near cache should be used on a server side.
* @param nearClient {@code True} if near cache should be used on a client side.
* @throws Exception If failed.
*/
private void cacheApiTest(CacheMode cacheMode,
CacheAtomicityMode atomicityMode,
int backups,
boolean heapCache,
boolean nearSrv,
boolean nearClient) throws Exception {
Ignite srv0 = ignite(0);
NearCacheConfiguration nearCfg = nearSrv ? new NearCacheConfiguration() : null;
srv0.createCache(cacheConfiguration(GROUP1, "cache-0", cacheMode, atomicityMode, backups, heapCache)
.setNearConfiguration(nearCfg));
srv0.createCache(cacheConfiguration(GROUP1, "cache-1", cacheMode, atomicityMode, backups, heapCache));
srv0.createCache(cacheConfiguration(GROUP2, "cache-2", cacheMode, atomicityMode, backups, heapCache)
.setNearConfiguration(nearCfg));
srv0.createCache(cacheConfiguration(null, "cache-3", cacheMode, atomicityMode, backups, heapCache));
awaitCacheOnClient(ignite(4), "cache-3");
if (nearClient) {
Ignite clientNode = ignite(4);
clientNode.createNearCache("cache-0", new NearCacheConfiguration());
clientNode.createNearCache("cache-2", new NearCacheConfiguration());
}
try {
for (final Ignite node : Ignition.allGrids()) {
List<Callable<?>> ops = new ArrayList<>();
for (int i = 0; i < 4; i++)
ops.add(testSet(node.cache("cache-" + i), cacheMode, atomicityMode, backups, heapCache, node));
// Async operations.
GridTestUtils.runMultiThreaded(ops, "cacheApiTest");
}
}
finally {
for (int i = 0; i < 4; i++)
srv0.destroyCache("cache-" + i);
}
}
/**
* @param cache Cache.
* @param cacheMode Cache mode.
* @param atomicityMode Atomicity mode.
* @param backups Number of backups.
* @param heapCache On heap cache flag.
* @param node Ignite node.
* @return Callable for the test operations.
*/
private Callable<?> testSet(
final IgniteCache<Object, Object> cache,
final CacheMode cacheMode,
final CacheAtomicityMode atomicityMode,
final int backups,
final boolean heapCache,
final Ignite node) {
return new Callable<Void>() {
@Override public Void call() throws Exception {
log.info("Test cache [node=" + node.name() +
", cache=" + cache.getName() +
", mode=" + cacheMode +
", atomicity=" + atomicityMode +
", backups=" + backups +
", heapCache=" + heapCache +
']');
cacheApiTest(cache);
return null;
}
};
}
/**
* @param cache Cache.
* @throws Exception If failed.
*/
private void cacheApiTest(IgniteCache cache) throws Exception {
cachePutAllGetAllContainsAll(cache);
cachePutAllGetAllContainsAllAsync(cache);
cachePutRemove(cache);
cachePutRemoveAsync(cache);
cachePutGetContains(cache);
cachePutGetContainsAsync(cache);
cachePutGetAndPut(cache);
cachePutGetAndPutAsync(cache);
cachePutGetAndRemove(cache);
cachePutGetAndRemoveAsync(cache);
cachePutGetAndReplace(cache);
cachePutGetAndReplaceAsync(cache);
cachePutIfAbsent(cache);
cachePutIfAbsentAsync(cache);
cachePutGetAndPutIfAbsent(cache);
cachePutGetAndPutIfAbsentAsync(cache);
cacheQuery(cache);
cacheInvokeAll(cache);
cacheInvoke(cache);
cacheInvokeAllAsync(cache);
cacheInvokeAsync(cache);
cacheDataStreamer(cache);
}
/**
* @param cache Cache.
*/
private void tearDown(IgniteCache cache) {
cache.clear();
cache.removeAll();
}
/**
* @param cache Cache.
* @throws Exception If failed.
*/
private void cacheDataStreamer(final IgniteCache cache) throws Exception {
final int keys = 400;
final int loaders = 4;
final Integer[] data = generateData(keys * loaders);
// Stream through a client node.
Ignite clientNode = ignite(4);
List<Callable<?>> cls = new ArrayList<>(loaders);
for (final int i : sequence(loaders)) {
final IgniteDataStreamer ldr = clientNode.dataStreamer(cache.getName());
ldr.allowOverwrite(true); // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11793
ldr.autoFlushFrequency(0);
cls.add(new Callable<Void>() {
@Override public Void call() throws Exception {
List<IgniteFuture> futs = new ArrayList<>(keys);
for (int j = 0, size = keys * loaders; j < size; j++) {
if (j % loaders == i)
futs.add(ldr.addData(j, data[j]));
if (j % (100 * loaders) == 0)
ldr.flush();
}
ldr.flush();
for (IgniteFuture fut : futs)
fut.get();
return null;
}
});
}
GridTestUtils.runMultiThreaded(cls, "loaders");
Set<Integer> keysSet = sequence(data.length);
for (Cache.Entry<Integer, Integer> entry : (IgniteCache<Integer, Integer>)cache) {
assertTrue(keysSet.remove(entry.getKey()));
assertEquals(data[entry.getKey()], entry.getValue());
}
assertTrue(keysSet.isEmpty());
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutAllGetAllContainsAll(IgniteCache cache) {
int keys = 100;
Map<Integer, Integer> data = generateDataMap(keys);
cache.putAll(data);
Map data0 = cache.getAll(data.keySet());
assertEquals(data.size(), data0.size());
for (Map.Entry<Integer, Integer> entry : data.entrySet())
assertEquals(entry.getValue(), data0.get(entry.getKey()));
assertTrue(cache.containsKeys(data.keySet()));
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutAllGetAllContainsAllAsync(IgniteCache cache) {
int keys = 100;
Map<Integer, Integer> data = generateDataMap(keys);
cache.putAllAsync(data).get(ASYNC_TIMEOUT);
Map data0 = (Map)cache.getAllAsync(data.keySet()).get(ASYNC_TIMEOUT);
assertEquals(data.size(), data0.size());
for (Map.Entry<Integer, Integer> entry : data.entrySet())
assertEquals(entry.getValue(), data0.get(entry.getKey()));
assertTrue((Boolean)cache.containsKeysAsync(data.keySet()).get(ASYNC_TIMEOUT));
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutRemove(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val = rnd.nextInt();
cache.put(key, val);
assertTrue(cache.remove(key));
assertNull(cache.get(key));
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutRemoveAsync(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val = rnd.nextInt();
cache.putAsync(key, val).get(ASYNC_TIMEOUT);
assertTrue((Boolean)cache.removeAsync(key).get(ASYNC_TIMEOUT));
assertNull(cache.get(key));
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutGetContains(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val = rnd.nextInt();
cache.put(key, val);
Object val0 = cache.get(key);
assertEquals(val, val0);
assertTrue(cache.containsKey(key));
Integer key2 = rnd.nextInt();
while (key2.equals(key))
key2 = rnd.nextInt();
assertFalse(cache.containsKey(key2));
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutGetContainsAsync(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val = rnd.nextInt();
cache.putAsync(key, val).get(ASYNC_TIMEOUT);
Object val0 = cache.getAsync(key).get(ASYNC_TIMEOUT);
assertEquals(val, val0);
assertTrue((Boolean)cache.containsKeyAsync(key).get(ASYNC_TIMEOUT));
Integer key2 = rnd.nextInt();
while (key2.equals(key))
key2 = rnd.nextInt();
assertFalse((Boolean)cache.containsKeyAsync(key2).get(ASYNC_TIMEOUT));
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutGetAndPut(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val1 = rnd.nextInt();
Integer val2 = rnd.nextInt();
cache.put(key, val1);
Object val0 = cache.getAndPut(key, val2);
assertEquals(val1, val0);
val0 = cache.get(key);
assertEquals(val2, val0);
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutGetAndPutAsync(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val1 = rnd.nextInt();
Integer val2 = rnd.nextInt();
cache.put(key, val1);
Object val0 = cache.getAndPutAsync(key, val2).get(ASYNC_TIMEOUT);
assertEquals(val1, val0);
val0 = cache.get(key);
assertEquals(val2, val0);
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutGetAndReplace(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val1 = rnd.nextInt();
Integer val2 = rnd.nextInt();
Object val0 = cache.getAndReplace(key, val1);
assertEquals(null, val0);
val0 = cache.get(key);
assertEquals(null, val0);
cache.put(key, val1);
val0 = cache.getAndReplace(key, val2);
assertEquals(val1, val0);
val0 = cache.get(key);
assertEquals(val2, val0);
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutGetAndReplaceAsync(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val1 = rnd.nextInt();
Integer val2 = rnd.nextInt();
Object val0 = cache.getAndReplaceAsync(key, val1).get(ASYNC_TIMEOUT);
assertEquals(null, val0);
val0 = cache.get(key);
assertEquals(null, val0);
cache.put(key, val1);
val0 = cache.getAndReplaceAsync(key, val2).get(ASYNC_TIMEOUT);
assertEquals(val1, val0);
val0 = cache.get(key);
assertEquals(val2, val0);
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutGetAndRemove(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val = rnd.nextInt();
cache.put(key, val);
Object val0 = cache.getAndRemove(key);
assertEquals(val, val0);
val0 = cache.get(key);
assertNull(val0);
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutGetAndRemoveAsync(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val = rnd.nextInt();
cache.put(key, val);
Object val0 = cache.getAndRemoveAsync(key).get(ASYNC_TIMEOUT);
assertEquals(val, val0);
val0 = cache.get(key);
assertNull(val0);
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutIfAbsent(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val1 = rnd.nextInt();
Integer val2 = rnd.nextInt();
assertTrue(cache.putIfAbsent(key, val1));
Object val0 = cache.get(key);
assertEquals(val1, val0);
assertFalse(cache.putIfAbsent(key, val2));
val0 = cache.get(key);
assertEquals(val1, val0);
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutIfAbsentAsync(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val1 = rnd.nextInt();
Integer val2 = rnd.nextInt();
assertTrue((Boolean)cache.putIfAbsentAsync(key, val1).get(ASYNC_TIMEOUT));
Object val0 = cache.get(key);
assertEquals(val1, val0);
assertFalse((Boolean)cache.putIfAbsentAsync(key, val2).get(ASYNC_TIMEOUT));
val0 = cache.get(key);
assertEquals(val1, val0);
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutGetAndPutIfAbsent(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val1 = rnd.nextInt();
Integer val2 = rnd.nextInt();
cache.put(key, val1);
Object val0 = cache.getAndPutIfAbsent(key, val2);
assertEquals(val1, val0);
val0 = cache.get(key);
assertEquals(val1, val0);
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cachePutGetAndPutIfAbsentAsync(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val1 = rnd.nextInt();
Integer val2 = rnd.nextInt();
cache.put(key, val1);
Object val0 = cache.getAndPutIfAbsentAsync(key, val2).get(ASYNC_TIMEOUT);
assertEquals(val1, val0);
val0 = cache.get(key);
assertEquals(val1, val0);
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cacheQuery(IgniteCache cache) {
int keys = 100;
Map<Integer, Integer> data = generateDataMap(keys);
cache.putAll(data);
ScanQuery<Integer, Integer> qry = new ScanQuery<>(new IgniteBiPredicate<Integer, Integer>() {
@Override public boolean apply(Integer key, Integer val) {
return key % 2 == 0;
}
});
List<Cache.Entry<Integer, Integer>> all = cache.query(qry).getAll();
assertEquals(all.size(), data.size() / 2);
for (Cache.Entry<Integer, Integer> entry : all) {
assertEquals(0, entry.getKey() % 2);
assertEquals(entry.getValue(), data.get(entry.getKey()));
}
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cacheInvokeAll(IgniteCache cache) {
int keys = 100;
Map<Integer, Integer> data = generateDataMap(keys);
cache.putAll(data);
Random rnd = ThreadLocalRandom.current();
int one = rnd.nextInt();
int two = rnd.nextInt();
Map<Integer, CacheInvokeResult<Integer>> res = cache.invokeAll(data.keySet(), new CacheEntryProcessor<Integer, Integer, Integer>() {
@Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException {
Object expected = ((Map)arguments[0]).get(entry.getKey());
assertEquals(expected, entry.getValue());
// Some calculation.
return (Integer)arguments[1] + (Integer)arguments[2];
}
}, data, one, two);
assertEquals(keys, res.size());
assertEquals(one + two, (Object)res.get(0).get());
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cacheInvoke(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val = rnd.nextInt();
cache.put(key, val);
int one = rnd.nextInt();
int two = rnd.nextInt();
Object res = cache.invoke(key, new CacheEntryProcessor<Integer, Integer, Integer>() {
@Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException {
assertEquals(arguments[0], entry.getValue());
// Some calculation.
return (Integer)arguments[1] + (Integer)arguments[2];
}
}, val, one, two);
assertEquals(one + two, res);
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cacheInvokeAllAsync(IgniteCache cache) {
int keys = 100;
Map<Integer, Integer> data = generateDataMap(keys);
cache.putAll(data);
Random rnd = ThreadLocalRandom.current();
int one = rnd.nextInt();
int two = rnd.nextInt();
Object res0 = cache.invokeAllAsync(data.keySet(), new CacheEntryProcessor<Integer, Integer, Integer>() {
@Override public Integer process(MutableEntry<Integer, Integer> entry,
Object... arguments) throws EntryProcessorException {
Object expected = ((Map)arguments[0]).get(entry.getKey());
assertEquals(expected, entry.getValue());
// Some calculation.
return (Integer)arguments[1] + (Integer)arguments[2];
}
}, data, one, two).get(ASYNC_TIMEOUT);
Map<Integer, CacheInvokeResult<Integer>> res = (Map<Integer, CacheInvokeResult<Integer>>)res0;
assertEquals(keys, res.size());
assertEquals(one + two, (Object)res.get(0).get());
tearDown(cache);
}
/**
* @param cache Cache.
*/
private void cacheInvokeAsync(IgniteCache cache) {
Random rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt();
Integer val = rnd.nextInt();
cache.put(key, val);
int one = rnd.nextInt();
int two = rnd.nextInt();
Object res = cache.invokeAsync(key, new CacheEntryProcessor<Integer, Integer, Integer>() {
@Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException {
assertEquals(arguments[0], entry.getValue());
// Some calculation.
return (Integer)arguments[1] + (Integer)arguments[2];
}
}, val, one, two).get(ASYNC_TIMEOUT);
assertEquals(one + two, res);
tearDown(cache);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLoadCacheAtomicPartitioned() throws Exception {
loadCache(PARTITIONED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLoadCacheAtomicReplicated() throws Exception {
loadCache(REPLICATED, ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLoadCacheTxPartitioned() throws Exception {
loadCache(PARTITIONED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-7954")
@Test
public void testLoadCacheMvccTxPartitioned() throws Exception {
loadCache(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLoadCacheTxReplicated() throws Exception {
loadCache(REPLICATED, TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-7954")
@Test
public void testLoadCacheMvccTxReplicated() throws Exception {
loadCache(REPLICATED, TRANSACTIONAL_SNAPSHOT);
}
/**
* @param cacheMode Cache mode.
* @param atomicityMode Atomicity mode.
* @throws Exception If failed.
*/
private void loadCache(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
int keys = 100;
Map<Integer, Integer> data1 = generateDataMap(keys);
Map<Integer, Integer> data2 = generateDataMap(keys);
Factory<? extends CacheStore<Integer, Integer>> fctr1 =
FactoryBuilder.factoryOf(new MapBasedStore<>(data1));
Factory<? extends CacheStore<Integer, Integer>> fctr2 =
FactoryBuilder.factoryOf(new MapBasedStore<>(data2));
CacheConfiguration ccfg1 = cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 1, false)
.setCacheStoreFactory(fctr1);
CacheConfiguration ccfg2 = cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 1, false)
.setCacheStoreFactory(fctr2);
Ignite node = startGrids(4);
node.createCaches(F.asList(ccfg1, ccfg2));
IgniteCache<Integer, Integer> cache1 = node.cache(CACHE1);
IgniteCache<Integer, Integer> cache2 = node.cache(CACHE2);
cache1.loadCache(null);
checkCacheData(data1, CACHE1);
assertEquals(0, cache2.size());
cache2.loadCache(null);
checkCacheData(data2, CACHE2);
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentOperationsSameKeys() throws Exception {
final int SRVS = 4;
final int CLIENTS = 4;
final int NODES = SRVS + CLIENTS;
startGrid(0);
Ignite srv0 = startGridsMultiThreaded(1, SRVS - 1);
startClientGridsMultiThreaded(SRVS, CLIENTS);
srv0.createCache(cacheConfiguration(GROUP1, "a0", PARTITIONED, ATOMIC, 1, false));
srv0.createCache(cacheConfiguration(GROUP1, "a1", PARTITIONED, ATOMIC, 1, false));
srv0.createCache(cacheConfiguration(GROUP1, "t0", PARTITIONED, TRANSACTIONAL, 1, false));
srv0.createCache(cacheConfiguration(GROUP1, "t1", PARTITIONED, TRANSACTIONAL, 1, false));
F.view(G.allGrids(), ignite -> ignite.cluster().localNode().isClient())
.forEach(ignite -> awaitCacheOnClient(ignite, "t1"));
final List<Integer> keys = new ArrayList<>();
for (int i = 0; i < 50; i++)
keys.add(i);
final AtomicBoolean err = new AtomicBoolean();
final AtomicBoolean stop = new AtomicBoolean();
IgniteInternalFuture fut1 = updateFuture(NODES, "a0", keys, false, stop, err);
IgniteInternalFuture fut2 = updateFuture(NODES, "a1", keys, true, stop, err);
IgniteInternalFuture fut3 = updateFuture(NODES, "t0", keys, false, stop, err);
IgniteInternalFuture fut4 = updateFuture(NODES, "t1", keys, true, stop, err);
try {
for (int i = 0; i < 15 && !stop.get(); i++)
U.sleep(1_000);
}
finally {
stop.set(true);
}
fut1.get();
fut2.get();
fut3.get();
fut4.get();
assertFalse("Unexpected error, see log for details", err.get());
}
/**
* @param nodes Total number of nodes.
* @param cacheName Cache name.
* @param keys Keys to update.
* @param reverse {@code True} if update in reverse order.
* @param stop Stop flag.
* @param err Error flag.
* @return Update future.
*/
private IgniteInternalFuture updateFuture(final int nodes,
final String cacheName,
final List<Integer> keys,
final boolean reverse,
final AtomicBoolean stop,
final AtomicBoolean err) {
final AtomicInteger idx = new AtomicInteger();
return GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
try {
Ignite node = ignite(idx.getAndIncrement() % nodes);
log.info("Start thread [node=" + node.name() + ']');
IgniteCache cache = node.cache(cacheName);
Map<Integer, Integer> map = new LinkedHashMap<>();
if (reverse) {
for (int i = keys.size() - 1; i >= 0; i--)
map.put(keys.get(i), 2);
}
else {
for (Integer key : keys)
map.put(key, 1);
}
while (!stop.get())
cache.putAll(map);
}
catch (Exception e) {
err.set(true);
log.error("Unexpected error: " + e, e);
stop.set(true);
}
return null;
}
}, nodes * 2, "update-" + cacheName + "-" + reverse);
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentOperationsAndCacheDestroy() throws Exception {
final int SRVS = 4;
final int CLIENTS = 4;
final int NODES = SRVS + CLIENTS;
startGrid(0);
Ignite srv0 = startGridsMultiThreaded(1, SRVS - 1);
startClientGridsMultiThreaded(SRVS, CLIENTS);
final int CACHES = 8;
final int grp1Backups = ThreadLocalRandom.current().nextInt(3);
final int grp2Backups = ThreadLocalRandom.current().nextInt(3);
log.info("Start test [grp1Backups=" + grp1Backups + ", grp2Backups=" + grp2Backups + ']');
for (int i = 0; i < CACHES; i++) {
srv0.createCache(
cacheConfiguration(GROUP1, GROUP1 + "-" + i, PARTITIONED, ATOMIC, grp1Backups, i % 2 == 0));
srv0.createCache(
cacheConfiguration(GROUP2, GROUP2 + "-" + i, PARTITIONED, TRANSACTIONAL, grp2Backups, i % 2 == 0));
// TODO IGNITE-7164: add Mvcc cache to test.
}
F.view(G.allGrids(), ignite -> ignite.cluster().localNode().isClient())
.forEach(ignite -> awaitCacheOnClient(ignite, GROUP2 + "-" + (CACHES - 1)));
final AtomicInteger idx = new AtomicInteger();
final AtomicBoolean err = new AtomicBoolean();
final AtomicBoolean stop = new AtomicBoolean();
IgniteInternalFuture opFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
try {
Ignite node = ignite(idx.getAndIncrement() % NODES);
log.info("Start thread [node=" + node.name() + ']');
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (!stop.get()) {
try {
String grp = rnd.nextBoolean() ? GROUP1 : GROUP2;
int cacheIdx = rnd.nextInt(CACHES);
IgniteCache cache = node.cache(grp + "-" + cacheIdx);
for (int i = 0; i < 10; i++)
cacheOperation(rnd, cache);
}
catch (Exception e) {
if (X.hasCause(e, CacheStoppedException.class)) {
// Cache operation can be blocked on
// awaiting new topology version and cancelled with CacheStoppedException cause.
continue;
}
throw e;
}
}
}
catch (Exception e) {
err.set(true);
log.error("Unexpected error(1): " + e, e);
stop.set(true);
}
}
}, (SRVS + CLIENTS) * 2, "op-thread");
IgniteInternalFuture cacheFut = GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
int cntr = 0;
while (!stop.get()) {
try {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
String grp;
int backups;
if (rnd.nextBoolean()) {
grp = GROUP1;
backups = grp1Backups;
}
else {
grp = GROUP2;
backups = grp2Backups;
}
Ignite node = ignite(rnd.nextInt(NODES));
log.info("Create cache [node=" + node.name() + ", grp=" + grp + ']');
IgniteCache cache = node.createCache(cacheConfiguration(grp, "tmpCache-" + cntr++,
PARTITIONED,
rnd.nextBoolean() ? ATOMIC : TRANSACTIONAL,
backups,
rnd.nextBoolean()));
for (int i = 0; i < 10; i++)
cacheOperation(rnd, cache);
log.info("Destroy cache [node=" + node.name() + ", grp=" + grp + ']');
node.destroyCache(cache.getName());
}
catch (Exception e) {
if (X.hasCause(e, CacheStoppedException.class)) {
// Cache operation can be blocked on
// awaiting new topology version and cancelled with CacheStoppedException cause.
continue;
}
err.set(true);
log.error("Unexpected error(2): " + e, e);
stop.set(true);
}
}
}
}, "cache-destroy-thread");
try {
for (int i = 0; i < 30 && !stop.get(); i++)
U.sleep(1_000);
}
finally {
stop.set(true);
}
opFut.get();
cacheFut.get();
assertFalse("Unexpected error, see log for details", err.get());
}
/**
* @throws Exception If failed.
*/
@Test
public void testStaticConfigurationsValidation() throws Exception {
ccfgs = new CacheConfiguration[2];
ccfgs[0] = new CacheConfiguration(CACHE1);
ccfgs[0].setGroupName(GROUP1);
ccfgs[0].setAffinity(new RendezvousAffinityFunction(false, 1024));
ccfgs[1] = new CacheConfiguration(CACHE2);
ccfgs[1].setGroupName(GROUP1);
ccfgs[1].setAffinity(new RendezvousAffinityFunction(false, 512));
try {
startGrid(0);
fail();
}
catch (IgniteCheckedException ignore) {
// Expected exception.
}
ccfgs = new CacheConfiguration[3];
ccfgs[0] = new CacheConfiguration(CACHE1);
ccfgs[0].setGroupName(GROUP1);
ccfgs[0].setAffinity(new RendezvousAffinityFunction(false, 16));
ccfgs[1] = new CacheConfiguration(CACHE2);
ccfgs[1].setGroupName(GROUP2);
ccfgs[1].setAffinity(new RendezvousAffinityFunction(false, 512));
ccfgs[2] = new CacheConfiguration("cache3");
ccfgs[2].setGroupName(GROUP1);
ccfgs[2].setAffinity(new RendezvousAffinityFunction(false, 16));
startGrid(0);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheIdConflict() throws Exception {
ccfgs = new CacheConfiguration[]{new CacheConfiguration("AaAaAa"), new CacheConfiguration("AaAaBB")};
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(0);
return null;
}
}, IgniteCheckedException.class, null);
ccfgs = new CacheConfiguration[]{new CacheConfiguration("AaAaAa")};
startGrid(0);
ccfgs = new CacheConfiguration[]{new CacheConfiguration("AaAaBB")};
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(1);
return null;
}
}, IgniteCheckedException.class, null);
assertFalse(ignite(0).cacheNames().contains("AaAaBB"));
final Ignite ignite1 = startGrid(1);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
ignite1.createCache(new CacheConfiguration("AaAaBB"));
return null;
}
}, CacheException.class, null);
assertFalse(ignite(0).cacheNames().contains("AaAaBB"));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheGroupIdConflict1() throws Exception {
ccfgs = new CacheConfiguration[]{new CacheConfiguration(CACHE1).setGroupName("AaAaAa"),
new CacheConfiguration(CACHE2).setGroupName("AaAaBB")};
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(0);
return null;
}
}, IgniteCheckedException.class, null);
ccfgs = new CacheConfiguration[]{new CacheConfiguration(CACHE1).setGroupName("AaAaAa")};
startGrid(0);
ccfgs = new CacheConfiguration[]{new CacheConfiguration(CACHE2).setGroupName("AaAaBB")};
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(1);
return null;
}
}, IgniteCheckedException.class, null);
assertFalse(ignite(0).cacheNames().contains(CACHE2));
final Ignite ignite1 = startGrid(1);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
ignite1.createCache(new CacheConfiguration(CACHE2).setGroupName("AaAaBB"));
return null;
}
}, CacheException.class, null);
assertFalse(ignite(0).cacheNames().contains(CACHE2));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheGroupIdConflict2() throws Exception {
ccfgs = new CacheConfiguration[]{new CacheConfiguration("AaAaAa"),
new CacheConfiguration(CACHE2).setGroupName("AaAaBB")};
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(0);
return null;
}
}, IgniteCheckedException.class, null);
ccfgs = new CacheConfiguration[]{new CacheConfiguration("AaAaAa")};
startGrid(0);
ccfgs = new CacheConfiguration[]{new CacheConfiguration(CACHE2).setGroupName("AaAaBB")};
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(1);
return null;
}
}, IgniteCheckedException.class, null);
assertFalse(ignite(0).cacheNames().contains(CACHE2));
final Ignite ignite1 = startGrid(1);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
ignite1.createCache(new CacheConfiguration(CACHE2).setGroupName("AaAaBB"));
return null;
}
}, CacheException.class, null);
assertFalse(ignite(0).cacheNames().contains(CACHE2));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheGroupIdConflict3() throws Exception {
ccfgs = new CacheConfiguration[]{new CacheConfiguration(CACHE2).setGroupName("AaAaBB"),
new CacheConfiguration("AaAaAa")};
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(0);
return null;
}
}, IgniteCheckedException.class, null);
ccfgs = new CacheConfiguration[]{new CacheConfiguration(CACHE2).setGroupName("AaAaBB")};
startGrid(0);
ccfgs = new CacheConfiguration[]{new CacheConfiguration("AaAaAa")};
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(1);
return null;
}
}, IgniteCheckedException.class, null);
assertFalse(ignite(0).cacheNames().contains("AaAaAa"));
final Ignite ignite1 = startGrid(1);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
ignite1.createCache(new CacheConfiguration("AaAaAa"));
return null;
}
}, CacheException.class, null);
assertFalse(ignite(0).cacheNames().contains("AaAaAa"));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheGroupNameConflict1() throws Exception {
ccfgs = new CacheConfiguration[]{new CacheConfiguration("cache1"), new CacheConfiguration("cache2").setGroupName("cache1")};
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(0);
return null;
}
}, IgniteCheckedException.class, null);
ccfgs = new CacheConfiguration[]{new CacheConfiguration("cache1")};
startGrid(0);
ccfgs = new CacheConfiguration[]{new CacheConfiguration("cache2").setGroupName("cache1")};
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(1);
return null;
}
}, IgniteCheckedException.class, null);
assertFalse(ignite(0).cacheNames().contains("cache2"));
final Ignite ignite1 = startGrid(1);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
ignite1.createCache(new CacheConfiguration("cache2").setGroupName("cache1"));
return null;
}
}, CacheException.class, null);
assertFalse(ignite(0).cacheNames().contains("cache2"));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheGroupNameConflict2() throws Exception {
ccfgs = new CacheConfiguration[]{new CacheConfiguration("cache2").setGroupName("cache1"), new CacheConfiguration("cache1")};
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(0);
return null;
}
}, IgniteCheckedException.class, null);
ccfgs = new CacheConfiguration[]{new CacheConfiguration("cache2").setGroupName("cache1")};
startGrid(0);
ccfgs = new CacheConfiguration[]{new CacheConfiguration("cache1")};
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(1);
return null;
}
}, IgniteCheckedException.class, null);
assertFalse(ignite(0).cacheNames().contains("cache1"));
final Ignite ignite1 = startGrid(1);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
ignite1.createCache(new CacheConfiguration("cache1"));
return null;
}
}, CacheException.class, null);
assertFalse(ignite(0).cacheNames().contains("cache1"));
}
/**
* @throws Exception If failed.
*/
@Test
public void testConfigurationConsistencyValidation() throws Exception {
startGrids(2);
startClientGrid(2);
ignite(0).createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, false));
for (int i = 0; i < 3; i++) {
try {
ignite(i).createCache(cacheConfiguration(GROUP1, "c2", REPLICATED, ATOMIC, Integer.MAX_VALUE, false));
fail();
}
catch (CacheException e) {
assertTrue("Unexpected message: " + e.getMessage(),
e.getMessage().contains("Cache mode mismatch for caches related to the same group [groupName=grp1"));
}
try {
ignite(i).createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false));
fail();
}
catch (CacheException e) {
assertTrue("Unexpected message: " + e.getMessage(),
e.getMessage().contains("Backups mismatch for caches related to the same group [groupName=grp1"));
}
try {
ignite(i).createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, TRANSACTIONAL_SNAPSHOT, 1, false));
fail();
}
catch (CacheException e) {
assertTrue("Unexpected message: " + e.getMessage(),
e.getMessage().contains("Atomicity mode mismatch for caches related to the same group [groupName=grp1"));
}
}
}
/**
* @return Cache configurations.
*/
private CacheConfiguration[] interceptorConfigurations() {
CacheConfiguration[] ccfgs = new CacheConfiguration[6];
ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).setInterceptor(new Interceptor1());
ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).setInterceptor(new Interceptor2());
ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).setInterceptor(new Interceptor1());
ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).setInterceptor(new Interceptor2());
ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false);
ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false);
//TODO IGNITE-9323: Check Mvcc mode.
return ccfgs;
}
/**
* Tests caches in the same group with different {@link CacheInterceptor}s.
*
* @throws Exception If failed.
*/
@Test
public void testInterceptors() throws Exception {
for (int i = 0; i < 4; i++) {
ccfgs = interceptorConfigurations();
startGrid(i);
}
Ignite node = ignite(0);
checkInterceptorPut(node.cache("c1"), "v1");
checkInterceptorPut(node.cache("c2"), "v2");
checkInterceptorPut(node.cache("c3"), "v1");
checkInterceptorPut(node.cache("c4"), "v2");
checkCache(0, "c5", 10);
checkCache(0, "c6", 10);
}
/**
* @param cache Cache.
* @param expVal Expected value.
*/
private void checkInterceptorPut(IgniteCache cache, String expVal) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < 10; i++) {
Integer key = rnd.nextInt();
cache.put(key, i);
assertEquals(expVal, cache.get(key));
}
}
/**
* @return Cache configurations.
*/
private CacheConfiguration[] cacheStoreConfigurations() {
CacheConfiguration[] ccfgs = new CacheConfiguration[6];
ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).
setCacheStoreFactory(new StoreFactory1()).setReadThrough(true).setWriteThrough(true);
ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).
setCacheStoreFactory(new StoreFactory2()).setReadThrough(true).setWriteThrough(true);
ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).
setCacheStoreFactory(new StoreFactory1()).setReadThrough(true).setWriteThrough(true);
ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).
setCacheStoreFactory(new StoreFactory2()).setReadThrough(true).setWriteThrough(true);
ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false);
ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false);
//TODO IGNITE-8582: Check Mvcc mode.
return ccfgs;
}
/**
* Tests caches in the same group with different {@link CacheStore}s.
*
* @throws Exception If failed.
*/
@Test
public void testCacheStores() throws Exception {
for (int i = 0; i < 4; i++) {
ccfgs = cacheStoreConfigurations();
startGrid(i);
}
Ignite node = ignite(0);
checkStorePut(node.cache("c1"), Store1.map);
assertTrue(Store2.map.isEmpty());
checkStorePut(node.cache("c3"), Store1.map);
assertTrue(Store2.map.isEmpty());
Store1.map.clear();
checkStorePut(node.cache("c2"), Store2.map);
assertTrue(Store1.map.isEmpty());
checkStorePut(node.cache("c4"), Store2.map);
assertTrue(Store1.map.isEmpty());
Store2.map.clear();
checkCache(0, "c5", 10);
checkCache(0, "c6", 10);
assertTrue(Store1.map.isEmpty());
assertTrue(Store2.map.isEmpty());
}
/**
* @param cache Cache.
* @param storeMap Cache store data.
*/
private void checkStorePut(IgniteCache cache, ConcurrentHashMap<Object, Object> storeMap) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < 10; i++) {
Integer key = rnd.nextInt();
storeMap.put(key, i);
assertEquals(i, cache.get(key));
cache.put(key, 10_000);
assertEquals(10_000, cache.get(key));
assertEquals(10_000, storeMap.get(key));
}
}
/**
* @return Cache configurations.
*/
private CacheConfiguration[] mapperConfigurations() {
CacheConfiguration[] ccfgs = new CacheConfiguration[6];
ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).setAffinityMapper(new Mapper1());
ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).setAffinityMapper(new Mapper2());
ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).setAffinityMapper(new Mapper1());
ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).setAffinityMapper(new Mapper2());
ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false);
ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false);
return ccfgs;
}
/**
* @throws Exception If failed.
*/
@Test
public void testAffinityMappers() throws Exception {
for (int i = 0; i < 4; i++) {
ccfgs = mapperConfigurations();
startGrid(i);
}
for (int i = 0; i < 4; i++)
checkAffinityMappers(ignite(i));
startClientGrid(4);
checkAffinityMappers(ignite(4));
for (int i = 0; i < 5; i++) {
checkCache(i, "c1", 10);
checkCache(i, "c2", 10);
checkCache(i, "c3", 10);
checkCache(i, "c4", 10);
checkCache(i, "c5", 10);
checkCache(i, "c6", 10);
}
}
/**
* @param node Node.
*/
private void checkAffinityMappers(Ignite node) {
Affinity aff1 = node.affinity("c1");
Affinity aff2 = node.affinity("c2");
Affinity aff3 = node.affinity("c3");
Affinity aff4 = node.affinity("c4");
Affinity aff5 = node.affinity("c5");
Affinity aff6 = node.affinity("c6");
RendezvousAffinityFunction func = new RendezvousAffinityFunction();
for (int i = 0; i < 100; i++) {
MapperTestKey1 k = new MapperTestKey1(i, i + 10);
assertEquals(i, aff1.partition(k));
assertEquals(i, aff3.partition(k));
assertEquals(i + 10, aff2.partition(k));
assertEquals(i + 10, aff4.partition(k));
int part;
if (node.configuration().getMarshaller() instanceof BinaryMarshaller)
part = func.partition(node.binary().toBinary(k));
else
part = func.partition(k);
assertEquals(part, aff5.partition(k));
assertEquals(part, aff6.partition(k));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testContinuousQueriesMultipleGroups1() throws Exception {
continuousQueriesMultipleGroups(1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testContinuousQueriesMultipleGroups2() throws Exception {
continuousQueriesMultipleGroups(4);
}
/**
* @param srvs Number of server nodes.
* @throws Exception If failed.
*/
private void continuousQueriesMultipleGroups(int srvs) throws Exception {
Ignite srv0 = startGrids(srvs);
Ignite client = startClientGrid(srvs);
client.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, false));
client.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, TRANSACTIONAL, 1, false));
client.createCache(cacheConfiguration(GROUP1, "c3", PARTITIONED, ATOMIC, 1, false));
client.createCache(cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 1, false));
client.createCache(cacheConfiguration(GROUP2, "c5", PARTITIONED, ATOMIC, 1, false));
client.createCache(cacheConfiguration(GROUP2, "c6", PARTITIONED, TRANSACTIONAL, 1, false));
client.createCache(cacheConfiguration(null, "c7", PARTITIONED, ATOMIC, 1, false));
client.createCache(cacheConfiguration(null, "c8", PARTITIONED, TRANSACTIONAL, 1, false));
client.createCache(cacheConfiguration(GROUP3, "c9", PARTITIONED, TRANSACTIONAL_SNAPSHOT, 1, false));
String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9"};
AtomicInteger c1 = registerListener(client, "c1");
for (String cache : cacheNames)
srv0.cache(cache).put(1, 1);
waitForEvents(c1, 1);
for (String cache : cacheNames)
srv0.cache(cache).put(1, 1);
waitForEvents(c1, 1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCacheIdSort() throws Exception {
Ignite node = startGrid(0);
final List<IgniteCache> caches = new ArrayList<>(3);
caches.add(node.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, false)
.setAffinity(new RendezvousAffinityFunction(false, 8))));
caches.add(node.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 1, false)
.setAffinity(new RendezvousAffinityFunction(false, 8))));
caches.add(node.createCache(cacheConfiguration(GROUP1, "c3", PARTITIONED, ATOMIC, 1, false)
.setAffinity(new RendezvousAffinityFunction(false, 8))));
Affinity aff = node.affinity("c1");
final List<Integer> keys = new ArrayList<>();
for (int i = 0; i < 1_000_000; i++) {
if (aff.partition(i) == 0) {
keys.add(i);
if (keys.size() >= 10_000)
break;
}
}
assertEquals(10_000, keys.size());
final long stopTime = System.currentTimeMillis() + 10_000;
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (System.currentTimeMillis() < stopTime) {
for (int i = 0; i < 100; i++) {
IgniteCache cache = caches.get(rnd.nextInt(3));
Integer key = keys.get(rnd.nextInt(10_000));
if (rnd.nextFloat() > 0.8f)
cache.remove(key);
else
cache.put(key, key);
}
}
return null;
}
}, 5, "update-thread");
CacheGroupContext grp = cacheGroup(node, GROUP1);
Integer cacheId = null;
GridIterator<CacheDataRow> it = grp.offheap().partitionIterator(0);
int c = 0;
while (it.hasNext()) {
CacheDataRow row = it.next();
if (cacheId == null || cacheId != row.cacheId()) {
cacheId = row.cacheId();
c++;
}
}
assertEquals(3, c);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDataCleanup() throws Exception {
Ignite node = startGrid(0);
IgniteCache cache0 = node.createCache(cacheConfiguration(GROUP1, "c0", PARTITIONED, ATOMIC, 1, false));
for (int i = 0; i < 100; i++)
assertNull(cache0.get(i));
for (int i = 0; i < 100; i++)
cache0.put(i, i);
List<CacheConfiguration> ccfgs = new ArrayList<>();
ccfgs.add(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, false));
ccfgs.add(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, true));
ccfgs.add(cacheConfiguration(GROUP1, "c1", PARTITIONED, TRANSACTIONAL, 1, false));
ccfgs.add(cacheConfiguration(GROUP1, "c1", PARTITIONED, TRANSACTIONAL, 1, true));
for (CacheConfiguration ccfg : ccfgs) {
IgniteCache cache = node.createCache(ccfg);
for (int i = 0; i < 100; i++)
assertNull(cache.get(i));
for (int i = 0; i < 100; i++)
cache.put(i, i);
for (int i = 0; i < 100; i++)
assertEquals(i, cache.get(i));
node.destroyCache(ccfg.getName());
cache = node.createCache(ccfg);
for (int i = 0; i < 100; i++)
assertNull(cache.get(i));
node.destroyCache(ccfg.getName());
}
for (int i = 0; i < 100; i++)
assertEquals(i, cache0.get(i));
node.destroyCache(cache0.getName());
cache0 = node.createCache(cacheConfiguration(GROUP1, "c0", PARTITIONED, ATOMIC, 1, false));
for (int i = 0; i < 100; i++)
assertNull(cache0.get(i));
for (int i = 0; i < 100; i++)
cache0.put(i, i);
for (int i = 0; i < 100; i++)
assertEquals(i, cache0.get(i));
}
/**
* @throws Exception If failed.
*/
@Test
public void testRestartsAndCacheCreateDestroy() throws Exception {
final int SRVS = 5;
startGrids(SRVS);
final Ignite clientNode = startClientGrid(SRVS);
final int CACHES = SF.applyLB(10, 2);
final AtomicReferenceArray<IgniteCache> caches = new AtomicReferenceArray<>(CACHES);
for (int i = 0; i < CACHES; i++) {
CacheAtomicityMode atomicityMode = i % 2 == 0 ? ATOMIC : TRANSACTIONAL;
caches.set(i,
clientNode.createCache(cacheConfiguration(GROUP1, "c" + i, PARTITIONED, atomicityMode, 0, false)));
}
final AtomicBoolean stop = new AtomicBoolean();
final AtomicInteger cacheCntr = new AtomicInteger();
try {
final int ITERATIONS_COUNT = SF.applyLB(10, 1);
for (int i = 0; i < ITERATIONS_COUNT; i++) {
stop.set(false);
final AtomicReference<Exception> err = new AtomicReference<>();
log.info("Iteration: " + i);
IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
try {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (!stop.get()) {
int node = rnd.nextInt(SRVS);
log.info("Stop node: " + node);
stopGrid(node);
U.sleep(500);
log.info("Start node: " + node);
startGrid(node);
try {
if (rnd.nextBoolean())
awaitPartitionMapExchange();
}
catch (Exception ignore) {
// No-op.
}
}
}
catch (Exception e) {
log.error("Unexpected error: " + e, e);
err.set(e);
stop.set(true);
}
}
});
IgniteInternalFuture<?> cacheFut = GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
try {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (!stop.get()) {
int idx = rnd.nextInt(CACHES);
IgniteCache cache = caches.get(idx);
if (cache != null && caches.compareAndSet(idx, cache, null)) {
log.info("Destroy cache: " + cache.getName());
clientNode.destroyCache(cache.getName());
CacheAtomicityMode atomicityMode = rnd.nextBoolean() ? ATOMIC : TRANSACTIONAL;
String name = "newName-" + cacheCntr.incrementAndGet();
cache = clientNode.createCache(
cacheConfiguration(GROUP1, name, PARTITIONED, atomicityMode, 0, false));
caches.set(idx, cache);
}
}
}
catch (Exception e) {
log.error("Unexpected error: " + e, e);
err.set(e);
stop.set(true);
}
}
});
IgniteInternalFuture opFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (!stop.get()) {
try {
int idx = rnd.nextInt(CACHES);
IgniteCache cache = caches.get(idx);
if (cache != null && caches.compareAndSet(idx, cache, null)) {
try {
for (int i = 0; i < 10; i++)
cacheOperation(rnd, cache);
}
catch (Exception e) {
if (X.hasCause(e, CacheStoppedException.class) ||
(X.hasCause(e, CacheInvalidStateException.class) &&
X.hasCause(e, TransactionRollbackException.class))
) {
// Cache operation can be blocked on
// awaiting new topology version and cancelled with CacheStoppedException cause.
// Cache operation can failed
// if a node was stopped during transaction.
continue;
}
throw e;
}
finally {
caches.set(idx, cache);
}
}
}
catch (Exception e) {
err.set(e);
log.error("Unexpected error: " + e, e);
stop.set(true);
}
}
}
}, 8, "op-thread");
Thread.sleep(SF.applyLB(10_000, 1_000));
stop.set(true);
restartFut.get();
cacheFut.get();
opFut.get();
assertNull("Unexpected error during test, see log for details", err.get());
awaitPartitionMapExchange();
Set<Integer> cacheIds = new HashSet<>();
for (int c = 0; c < CACHES; c++) {
IgniteCache cache = caches.get(c);
assertNotNull(cache);
assertTrue(cacheIds.add(CU.cacheId(cache.getName())));
}
for (int n = 0; n < SRVS; n++) {
CacheGroupContext grp = cacheGroup(ignite(n), GROUP1);
assertNotNull(grp);
for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) {
IntMap<Object> cachesMap = GridTestUtils.getFieldValue(part, "cacheMaps");
assertTrue(cachesMap.size() <= cacheIds.size());
cachesMap.forEach((cacheId, v) -> assertTrue(cachesMap.containsKey(cacheId)));
}
}
}
}
finally {
stop.set(true);
}
}
/**
* @param cntr Counter.
* @param expEvts Expected events number.
* @throws Exception If failed.
*/
private void waitForEvents(final AtomicInteger cntr, final int expEvts) throws Exception {
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
if (cntr.get() < expEvts)
log.info("Wait for events [rcvd=" + cntr.get() + ", exp=" + expEvts + ']');
return false;
}
}, 5000);
assertEquals(expEvts, cntr.get());
assertTrue(cntr.compareAndSet(expEvts, 0));
}
/**
* @param node Node.
* @param cacheName Cache name.
* @return Received events counter.
*/
private AtomicInteger registerListener(Ignite node, String cacheName) {
ContinuousQuery qry = new ContinuousQuery();
final AtomicInteger cntr = new AtomicInteger();
qry.setLocalListener(new CacheEntryUpdatedListener() {
@Override public void onUpdated(Iterable iterable) {
for (Object evt : iterable)
cntr.incrementAndGet();
}
});
node.cache(cacheName).query(qry);
return cntr;
}
/**
*
*/
static class Mapper1 implements AffinityKeyMapper {
/** {@inheritDoc} */
@Override public Object affinityKey(Object key) {
if (key instanceof MapperTestKey1)
return ((MapperTestKey1)key).p1;
else if (key instanceof BinaryObject)
((BinaryObject)key).field("p1");
return key;
}
/** {@inheritDoc} */
@Override public void reset() {
// No-op.
}
}
/**
*
*/
static class Mapper2 implements AffinityKeyMapper {
/** {@inheritDoc} */
@Override public Object affinityKey(Object key) {
if (key instanceof MapperTestKey1)
return ((MapperTestKey1)key).p2;
else if (key instanceof BinaryObject)
((BinaryObject)key).field("p2");
return key;
}
/** {@inheritDoc} */
@Override public void reset() {
// No-op.
}
}
/**
*
*/
static class MapperTestKey1 {
/** */
final int p1;
/** */
final int p2;
/**
* @param p1 Field1.
* @param p2 Field2.
*/
public MapperTestKey1(int p1, int p2) {
this.p1 = p1;
this.p2 = p2;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
MapperTestKey1 testKey1 = (MapperTestKey1)o;
return p1 == testKey1.p1 && p2 == testKey1.p2;
}
/** {@inheritDoc} */
@Override public int hashCode() {
int res = p1;
res = 31 * res + p2;
return res;
}
}
/**
*
*/
static class Interceptor1 extends CacheInterceptorAdapter<Object, Object> {
/** {@inheritDoc} */
@Override public Object onBeforePut(Cache.Entry<Object, Object> entry, Object newVal) {
return "v1";
}
}
/**
*
*/
static class Interceptor2 extends CacheInterceptorAdapter<Object, Object> {
/** {@inheritDoc} */
@Override public Object onBeforePut(Cache.Entry<Object, Object> entry, Object newVal) {
return "v2";
}
}
/**
*
*/
static class StoreFactory1 implements Factory<CacheStore> {
/** {@inheritDoc} */
@Override public CacheStore create() {
return new Store1();
}
}
/**
*
*/
static class Store1 extends CacheStoreAdapter {
/** */
static ConcurrentHashMap<Object, Object> map = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override public Object load(Object key) throws CacheLoaderException {
return map.get(key);
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry entry) throws CacheWriterException {
map.put(entry.getKey(), entry.getValue());
}
/** {@inheritDoc} */
@Override public void delete(Object key) throws CacheWriterException {
map.remove(key);
}
}
/**
*
*/
static class StoreFactory2 implements Factory<CacheStore> {
/** {@inheritDoc} */
@Override public CacheStore create() {
return new Store2();
}
}
/**
*
*/
static class Store2 extends CacheStoreAdapter {
/** */
static ConcurrentHashMap<Object, Object> map = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override public Object load(Object key) throws CacheLoaderException {
return map.get(key);
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry entry) throws CacheWriterException {
map.put(entry.getKey(), entry.getValue());
}
/** {@inheritDoc} */
@Override public void delete(Object key) throws CacheWriterException {
map.remove(key);
}
}
/**
* @param rnd Random.
* @param cache Cache.
*/
private void cacheOperation(ThreadLocalRandom rnd, IgniteCache cache) {
final int KEYS = 10_000;
Integer key = rnd.nextInt(KEYS);
switch (rnd.nextInt(6)) {
case 0:
cache.put(key, 1);
break;
case 1:
cache.get(key);
break;
case 2:
cache.remove(key);
break;
case 3:
cache.localPeek(key);
break;
case 4:
Set<Integer> keys = new HashSet<>();
for (int i = 0; i < 5; i++)
keys.add(rnd.nextInt(KEYS));
cache.getAll(keys);
break;
case 5:
Map<Integer, Integer> map = new TreeMap<>();
for (int i = 0; i < 5; i++)
map.put(rnd.nextInt(KEYS), i);
cache.putAll(map);
break;
}
}
/**
*
*/
static class Key1 implements Serializable {
/** */
private int id;
/**
* @param id ID.
*/
Key1(int id) {
this.id = id;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Key1 key = (Key1)o;
return id == key.id;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return id;
}
}
/**
*
*/
static class Key2 implements Serializable {
/** */
private int id;
/**
* @param id ID.
*/
Key2(int id) {
this.id = id;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Key2 key = (Key2)o;
return id == key.id;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return id;
}
}
/**
*
*/
static class Value1 implements Serializable {
/** */
private int val;
/**
* @param val Value.
*/
public Value1(int val) {
this.val = val;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Value1 val1 = (Value1)o;
return val == val1.val;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return val;
}
}
/**
*
*/
static class Value2 implements Serializable {
/** */
private int val;
/**
* @param val Value.
*/
public Value2(int val) {
this.val = val;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Value2 val2 = (Value2)o;
return val == val2.val;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return val;
}
}
/**
* @param grpName Cache group name.
* @param name Cache name.
* @param cacheMode Cache mode.
* @param atomicityMode Atomicity mode.
* @param backups Backups number.
* @param heapCache On heap cache flag.
* @return Cache configuration.
*/
private CacheConfiguration cacheConfiguration(
String grpName,
String name,
CacheMode cacheMode,
CacheAtomicityMode atomicityMode,
int backups,
boolean heapCache
) {
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setName(name);
ccfg.setGroupName(grpName);
ccfg.setAtomicityMode(atomicityMode);
ccfg.setBackups(backups);
ccfg.setCacheMode(cacheMode);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setOnheapCacheEnabled(heapCache);
return ccfg;
}
/**
* @param idx Node index.
* @param grpName Cache group name.
* @param expGrp {@code True} if cache group should be created.
* @throws IgniteCheckedException If failed.
*/
private void checkCacheGroup(int idx, final String grpName, final boolean expGrp) throws IgniteCheckedException {
final IgniteKernal node = (IgniteKernal)ignite(idx);
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return expGrp == (cacheGroup(node, grpName) != null);
}
}, 1000));
assertNotNull(node.context().cache().cache(CU.UTILITY_CACHE_NAME));
}
/**
* @param node Node.
* @param grpName Cache group name.
* @return Cache group.
*/
private CacheGroupContext cacheGroup(Ignite node, String grpName) {
for (CacheGroupContext grp : ((IgniteKernal)node).context().cache().cacheGroups()) {
if (grpName.equals(grp.name()))
return grp;
}
return null;
}
/**
*
*/
static class MapBasedStore<K, V> implements CacheStore<K, V>, Serializable {
/** */
private final Map<K, V> src;
/**
* @param src Source map.
*/
MapBasedStore(Map<K, V> src) {
this.src = src;
}
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) throws CacheLoaderException {
for (Map.Entry<K, V> e : src.entrySet())
clo.apply(e.getKey(), e.getValue());
}
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) throws CacheWriterException {
// No-op
}
/** {@inheritDoc} */
@Override public V load(K key) throws CacheLoaderException {
return src.get(key);
}
/** {@inheritDoc} */
@Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException {
return F.view(src, new ContainsPredicate<>(Sets.newHashSet(keys)));
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public void delete(Object key) throws CacheWriterException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
throw new UnsupportedOperationException();
}
}
}