blob: f7fef0fdd368015e1078b4cdff2d04d2d098f83b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.expiry.TouchedExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.CacheNameResource;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.ServiceResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CachePeekMode.ALL;
import static org.apache.ignite.cache.CachePeekMode.ONHEAP;
import static org.apache.ignite.cache.CachePeekMode.PRIMARY;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_LOCKED;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
/**
* Full API cache test.
*/
@SuppressWarnings("TransientFieldInNonSerializableClass")
public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstractSelfTest {
/** Test timeout */
private static final long TEST_TIMEOUT = 60 * 1000;
/** Service name. */
private static final String SERVICE_NAME1 = "testService1";
/** */
public static final CacheEntryProcessor<String, Integer, String> ERR_PROCESSOR =
new CacheEntryProcessor<String, Integer, String>() {
/** */
private static final long serialVersionUID = 0L;
@Override public String process(MutableEntry<String, Integer> e, Object... args) {
throw new RuntimeException("Failed!");
}
};
/** Increment processor for invoke operations. */
public static final EntryProcessor<String, Integer, String> INCR_PROCESSOR = new IncrementEntryProcessor();
/** Increment processor for invoke operations with IgniteEntryProcessor. */
public static final CacheEntryProcessor<String, Integer, String> INCR_IGNITE_PROCESSOR =
new CacheEntryProcessor<String, Integer, String>() {
/** */
private static final long serialVersionUID = 0L;
@Override public String process(MutableEntry<String, Integer> e, Object... args) {
return INCR_PROCESSOR.process(e, args);
}
};
/** Increment processor for invoke operations. */
public static final EntryProcessor<String, Integer, String> RMV_PROCESSOR = new RemoveEntryProcessor();
/** Increment processor for invoke operations with IgniteEntryProcessor. */
public static final CacheEntryProcessor<String, Integer, String> RMV_IGNITE_PROCESSOR =
new CacheEntryProcessor<String, Integer, String>() {
/** */
private static final long serialVersionUID = 0L;
@Override public String process(MutableEntry<String, Integer> e, Object... args) {
return RMV_PROCESSOR.process(e, args);
}
};
/** Dflt grid. */
protected static transient Ignite dfltIgnite;
/** */
private static Map<String, CacheConfiguration[]> cacheCfgMap;
/** */
@Before
public void beforeGridCacheAbstractFullApiSelfTest() {
Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-9543", MvccFeatureChecker.forcedMvcc());
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return TEST_TIMEOUT;
}
/** {@inheritDoc} */
@Override protected int gridCount() {
return 1;
}
/** {@inheritDoc} */
@Override protected boolean swapEnabled() {
return true;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-9543", MvccFeatureChecker.forcedMvcc());
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
cfg.setIncludeEventTypes(
EVT_CACHE_OBJECT_READ,
EVT_CACHE_OBJECT_LOCKED,
EVT_CACHE_OBJECT_UNLOCKED);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
initStoreStrategy();
if (cacheStartType() == CacheStartMode.STATIC)
super.beforeTestsStarted();
else {
cacheCfgMap = Collections.synchronizedMap(new HashMap<String, CacheConfiguration[]>());
if (cacheStartType() == CacheStartMode.NODES_THEN_CACHES) {
super.beforeTestsStarted();
for (Map.Entry<String, CacheConfiguration[]> entry : cacheCfgMap.entrySet()) {
Ignite ignite = grid(entry.getKey());
for (CacheConfiguration cfg : entry.getValue())
ignite.getOrCreateCache(cfg);
}
awaitPartitionMapExchange();
}
else {
int cnt = gridCount();
assert cnt >= 1 : "At least one grid must be started";
for (int i = 0; i < cnt; i++) {
Ignite ignite = startGrid(i);
CacheConfiguration[] cacheCfgs = cacheCfgMap.get(ignite.name());
for (CacheConfiguration cfg : cacheCfgs)
ignite.createCache(cfg);
}
if (cnt > 1)
checkTopology(cnt);
awaitPartitionMapExchange();
}
cacheCfgMap = null;
}
for (int i = 0; i < gridCount(); i++)
info("Grid " + i + ": " + grid(i).localNode().id());
}
/**
* Checks that any invoke returns result.
*
* @throws Exception if something goes bad.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-4380")
@Test
public void testInvokeAllMultithreaded() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
final int threadCnt = 4;
final int cnt = 5000;
final Set<String> keys = Collections.singleton("myKey");
GridTestUtils.runMultiThreaded(new Runnable() {
@Override public void run() {
for (int i = 0; i < cnt; i++) {
final Map<String, EntryProcessorResult<String>> res = cache.invokeAll(keys, INCR_PROCESSOR);
assertEquals(1, res.size());
}
}
}, threadCnt, "testInvokeAllMultithreaded");
assertEquals(cnt * threadCnt, (int)cache.get("myKey"));
}
/**
* Checks that skipStore flag gets overridden inside a transaction.
*/
@Test
public void testWriteThroughTx() {
String key = "writeThroughKey";
storeStgy.removeFromStore(key);
IgniteCache<String, Integer> cache = jcache(0);
try (final Transaction transaction = grid(0).transactions().txStart()) {
// retrieve market type from the grid
Integer old = cache.withSkipStore().get(key);
assertNull(old);
// update the grid
cache.put(key, 2);
// finally commit the transaction
transaction.commit();
}
assertEquals(2, storeStgy.getFromStore(key));
}
/**
* Checks that skipStore flag gets overridden inside a transaction.
*/
@Test
public void testNoReadThroughTx() {
String key = "writeThroughKey";
IgniteCache<String, Integer> cache = jcache(0);
storeStgy.resetStore();
cache.put(key, 1);
storeStgy.putToStore(key, 2);
try (final Transaction transaction = grid(0).transactions().txStart()) {
Integer old = cache.get(key);
assertEquals((Integer)1, old);
// update the grid
cache.put(key, 2);
// finally commit the transaction
transaction.commit();
}
assertEquals(0, storeStgy.getReads());
}
/** {@inheritDoc} */
@Override protected Ignite startGrid(String igniteInstanceName, GridSpringResourceContext ctx) throws Exception {
if (cacheCfgMap == null)
return super.startGrid(igniteInstanceName, ctx);
IgniteConfiguration cfg = getConfiguration(igniteInstanceName);
cacheCfgMap.put(igniteInstanceName, cfg.getCacheConfiguration());
cfg.setCacheConfiguration();
if (!isRemoteJvm(igniteInstanceName))
return IgnitionEx.start(optimize(cfg), ctx);
else
return startRemoteGrid(igniteInstanceName, optimize(cfg), ctx);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
IgniteCache<String, Integer> cache = jcache();
assertEquals(0, cache.localSize());
assertEquals(0, cache.size());
super.beforeTest();
assertEquals(0, cache.localSize());
assertEquals(0, cache.size());
dfltIgnite = grid(0);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
IgniteCache<String, Integer> cache = jcache();
assertEquals(0, cache.localSize());
assertEquals(0, cache.size());
assertEquals(0, cache.size(ONHEAP));
dfltIgnite = null;
}
/**
* @return A not near-only cache.
*/
protected IgniteCache<String, Integer> fullCache() {
return jcache();
}
/**
* @throws Exception In case of error.
*/
@Test
public void testSize() throws Exception {
assert jcache().localSize() == 0;
int size = 10;
final Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < size; i++)
map.put("key" + i, i);
// Put in primary nodes to avoid near readers which will prevent entry from being cleared.
Map<ClusterNode, Collection<String>> mapped = grid(0).<String>affinity(DEFAULT_CACHE_NAME).mapKeysToNodes(map.keySet());
for (int i = 0; i < gridCount(); i++) {
Collection<String> keys = mapped.get(grid(i).localNode());
if (!F.isEmpty(keys)) {
for (String key : keys)
jcache(i).put(key, map.get(key));
}
}
map.remove("key0");
mapped = grid(0).<String>affinity(DEFAULT_CACHE_NAME).mapKeysToNodes(map.keySet());
for (int i = 0; i < gridCount(); i++) {
// Will actually delete entry from map.
CU.invalidate(jcache(i), "key0");
assertNull("Failed check for grid: " + i, jcache(i).localPeek("key0", ONHEAP));
Collection<String> keysCol = mapped.get(grid(i).localNode());
assert jcache(i).localSize() != 0 || F.isEmpty(keysCol);
}
for (int i = 0; i < gridCount(); i++)
executeOnLocalOrRemoteJvm(i, new CheckCacheSizeTask(map));
for (int i = 0; i < gridCount(); i++) {
Collection<String> keysCol = mapped.get(grid(i).localNode());
assertEquals("Failed check for grid: " + i, !F.isEmpty(keysCol) ? keysCol.size() : 0,
jcache(i).localSize(PRIMARY));
}
int globalPrimarySize = map.size();
for (int i = 0; i < gridCount(); i++)
assertEquals(globalPrimarySize, jcache(i).size(PRIMARY));
// Check how many instances of any given key there is in the cluster.
int globalSize = 0;
for (String key : map.keySet())
globalSize += affinity(jcache()).mapKeyToPrimaryAndBackups(key).size();
for (int i = 0; i < gridCount(); i++)
assertEquals(globalSize, jcache(i).size(ALL));
}
/**
* @throws Exception In case of error.
*/
@Test
public void testContainsKey() throws Exception {
jcache().put("testContainsKey", 1);
checkContainsKey(true, "testContainsKey");
checkContainsKey(false, "testContainsKeyWrongKey");
}
/**
* @throws Exception If failed.
*/
@Test
public void testContainsKeyTx() throws Exception {
if (!txEnabled())
return;
IgniteCache<String, Integer> cache = jcache();
IgniteTransactions txs = ignite(0).transactions();
for (int i = 0; i < 10; i++) {
String key = String.valueOf(i);
try (Transaction tx = txs.txStart()) {
assertNull(key, cache.get(key));
assertFalse(cache.containsKey(key));
tx.commit();
}
try (Transaction tx = txs.txStart()) {
assertNull(key, cache.get(key));
cache.put(key, i);
assertTrue(cache.containsKey(key));
tx.commit();
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testContainsKeysTx() throws Exception {
if (!txEnabled())
return;
IgniteCache<String, Integer> cache = jcache();
IgniteTransactions txs = ignite(0).transactions();
Set<String> keys = new HashSet<>();
for (int i = 0; i < 10; i++) {
String key = String.valueOf(i);
keys.add(key);
}
try (Transaction tx = txs.txStart()) {
for (String key : keys)
assertNull(key, cache.get(key));
assertFalse(cache.containsKeys(keys));
tx.commit();
}
try (Transaction tx = txs.txStart()) {
for (String key : keys)
assertNull(key, cache.get(key));
for (String key : keys)
cache.put(key, 0);
assertTrue(cache.containsKeys(keys));
tx.commit();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testRemoveInExplicitLocks() throws Exception {
if (lockingEnabled()) {
IgniteCache<String, Integer> cache = jcache();
cache.put("a", 1);
Lock lock = cache.lockAll(ImmutableSet.of("a", "b", "c", "d"));
lock.lock();
try {
cache.remove("a");
// Make sure single-key operation did not remove lock.
cache.putAll(F.asMap("b", 2, "c", 3, "d", 4));
}
finally {
lock.unlock();
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testRemoveAllSkipStore() throws Exception {
IgniteCache<String, Integer> jcache = jcache();
jcache.putAll(F.asMap("1", 1, "2", 2, "3", 3));
jcache.withSkipStore().removeAll();
assertEquals((Integer)1, jcache.get("1"));
assertEquals((Integer)2, jcache.get("2"));
assertEquals((Integer)3, jcache.get("3"));
}
/**
* @throws IgniteCheckedException If failed.
*/
@Test
public void testAtomicOps() throws IgniteCheckedException {
IgniteCache<String, Integer> c = jcache();
final int cnt = 10;
for (int i = 0; i < cnt; i++)
assertNull(c.getAndPutIfAbsent("k" + i, i));
for (int i = 0; i < cnt; i++) {
boolean wrong = i % 2 == 0;
String key = "k" + i;
boolean res = c.replace(key, wrong ? i + 1 : i, -1);
assertEquals(wrong, !res);
}
for (int i = 0; i < cnt; i++) {
boolean success = i % 2 != 0;
String key = "k" + i;
boolean res = c.remove(key, -1);
assertTrue(success == res);
}
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGet() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
assert cache.get("key1") == 1;
assert cache.get("key2") == 2;
assert cache.get("wrongKey") == null;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetEntry() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
CacheEntry<String, Integer> key1e = cache.getEntry("key1");
CacheEntry<String, Integer> key2e = cache.getEntry("key2");
CacheEntry<String, Integer> wrongKeye = cache.getEntry("wrongKey");
assert key1e.getValue() == 1;
assert key1e.getKey().equals("key1");
assert key1e.version() != null;
assert key2e.getValue() == 2;
assert key2e.getKey().equals("key2");
assert key2e.version() != null;
assert wrongKeye == null;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAsync() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
IgniteFuture<Integer> fut1 = cache.getAsync("key1");
IgniteFuture<Integer> fut2 = cache.getAsync("key2");
IgniteFuture<Integer> fut3 = cache.getAsync("wrongKey");
assert fut1.get() == 1;
assert fut2.get() == 2;
assert fut3.get() == null;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAsyncOld() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cacheAsync.get("key1");
IgniteFuture<Integer> fut1 = cacheAsync.future();
cacheAsync.get("key2");
IgniteFuture<Integer> fut2 = cacheAsync.future();
cacheAsync.get("wrongKey");
IgniteFuture<Integer> fut3 = cacheAsync.future();
assert fut1.get() == 1;
assert fut2.get() == 2;
assert fut3.get() == null;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAll() throws Exception {
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
final IgniteCache<String, Integer> cache = jcache();
try {
cache.put("key1", 1);
cache.put("key2", 2);
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.getAll(null).isEmpty();
return null;
}
}, NullPointerException.class, null);
assert cache.getAll(Collections.<String>emptySet()).isEmpty();
Map<String, Integer> map1 = cache.getAll(ImmutableSet.of("key1", "key2", "key9999"));
info("Retrieved map1: " + map1);
assert 2 == map1.size() : "Invalid map: " + map1;
assertEquals(1, (int)map1.get("key1"));
assertEquals(2, (int)map1.get("key2"));
assertNull(map1.get("key9999"));
Map<String, Integer> map2 = cache.getAll(ImmutableSet.of("key1", "key2", "key9999"));
info("Retrieved map2: " + map2);
assert 2 == map2.size() : "Invalid map: " + map2;
assertEquals(1, (int)map2.get("key1"));
assertEquals(2, (int)map2.get("key2"));
assertNull(map2.get("key9999"));
// Now do the same checks but within transaction.
if (txShouldBeUsed()) {
try (Transaction tx0 = transactions().txStart()) {
assert cache.getAll(Collections.<String>emptySet()).isEmpty();
map1 = cache.getAll(ImmutableSet.of("key1", "key2", "key9999"));
info("Retrieved map1: " + map1);
assert 2 == map1.size() : "Invalid map: " + map1;
assertEquals(1, (int)map1.get("key1"));
assertEquals(2, (int)map1.get("key2"));
assertNull(map1.get("key9999"));
map2 = cache.getAll(ImmutableSet.of("key1", "key2", "key9999"));
info("Retrieved map2: " + map2);
assert 2 == map2.size() : "Invalid map: " + map2;
assertEquals(1, (int)map2.get("key1"));
assertEquals(2, (int)map2.get("key2"));
assertNull(map2.get("key9999"));
tx0.commit();
}
}
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetEntries() throws Exception {
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
final IgniteCache<String, Integer> cache = jcache();
try {
cache.put("key1", 1);
cache.put("key2", 2);
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.getEntries(null).isEmpty();
return null;
}
}, NullPointerException.class, null);
assert cache.getEntries(Collections.<String>emptySet()).isEmpty();
Collection<CacheEntry<String, Integer>> c1 = cache.getEntries(ImmutableSet.of("key1", "key2", "key9999"));
info("Retrieved c1: " + c1);
assert 2 == c1.size() : "Invalid collection: " + c1;
boolean b1 = false;
boolean b2 = false;
for (CacheEntry<String, Integer> e : c1) {
if (e.getKey().equals("key1") && e.getValue().equals(1))
b1 = true;
if (e.getKey().equals("key2") && e.getValue().equals(2))
b2 = true;
}
assertTrue(b1 && b2);
Collection<CacheEntry<String, Integer>> c2 = cache.getEntries(ImmutableSet.of("key1", "key2", "key9999"));
info("Retrieved c2: " + c2);
assert 2 == c2.size() : "Invalid collection: " + c2;
b1 = false;
b2 = false;
for (CacheEntry<String, Integer> e : c2) {
if (e.getKey().equals("key1") && e.getValue().equals(1))
b1 = true;
if (e.getKey().equals("key2") && e.getValue().equals(2))
b2 = true;
}
assertTrue(b1 && b2);
// Now do the same checks but within transaction.
if (txShouldBeUsed()) {
try (Transaction tx0 = transactions().txStart()) {
assert cache.getEntries(Collections.<String>emptySet()).isEmpty();
c1 = cache.getEntries(ImmutableSet.of("key1", "key2", "key9999"));
info("Retrieved c1: " + c1);
assert 2 == c1.size() : "Invalid collection: " + c1;
b1 = false;
b2 = false;
for (CacheEntry<String, Integer> e : c1) {
if (e.getKey().equals("key1") && e.getValue().equals(1))
b1 = true;
if (e.getKey().equals("key2") && e.getValue().equals(2))
b2 = true;
}
assertTrue(b1 && b2);
c2 = cache.getEntries(ImmutableSet.of("key1", "key2", "key9999"));
info("Retrieved c2: " + c2);
assert 2 == c2.size() : "Invalid collection: " + c2;
b1 = false;
b2 = false;
for (CacheEntry<String, Integer> e : c2) {
if (e.getKey().equals("key1") && e.getValue().equals(1))
b1 = true;
if (e.getKey().equals("key2") && e.getValue().equals(2))
b2 = true;
}
assertTrue(b1 && b2);
tx0.commit();
}
}
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAllWithLastNull() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
final Set<String> c = new LinkedHashSet<>();
c.add("key1");
c.add(null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.getAll(c);
return null;
}
}, NullPointerException.class, null);
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAllWithFirstNull() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
final Set<String> c = new LinkedHashSet<>();
c.add(null);
c.add("key1");
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.getAll(c);
return null;
}
}, NullPointerException.class, null);
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAllWithInTheMiddle() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
final Set<String> c = new LinkedHashSet<>();
c.add("key1");
c.add(null);
c.add("key2");
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.getAll(c);
return null;
}
}, NullPointerException.class, null);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetTxNonExistingKey() throws Exception {
if (txShouldBeUsed()) {
try (Transaction ignored = transactions().txStart()) {
assert jcache().get("key999123") == null;
}
}
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAllAsync() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.getAllAsync(null);
return null;
}
}, NullPointerException.class, null);
IgniteFuture<Map<String, Integer>> fut2 = cache.getAllAsync(Collections.<String>emptySet());
IgniteFuture<Map<String, Integer>> fut3 = cache.getAllAsync(ImmutableSet.of("key1", "key2"));
assert fut2.get().isEmpty();
assert fut3.get().size() == 2 : "Invalid map: " + fut3.get();
assert fut3.get().get("key1") == 1;
assert fut3.get().get("key2") == 2;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAllAsyncOld() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
final IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cache.put("key1", 1);
cache.put("key2", 2);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cacheAsync.getAll(null);
return null;
}
}, NullPointerException.class, null);
cacheAsync.getAll(Collections.<String>emptySet());
IgniteFuture<Map<String, Integer>> fut2 = cacheAsync.future();
cacheAsync.getAll(ImmutableSet.of("key1", "key2"));
IgniteFuture<Map<String, Integer>> fut3 = cacheAsync.future();
assert fut2.get().isEmpty();
assert fut3.get().size() == 2 : "Invalid map: " + fut3.get();
assert fut3.get().get("key1") == 1;
assert fut3.get().get("key2") == 2;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPut() throws Exception {
IgniteCache<String, Integer> cache = jcache();
assert cache.getAndPut("key1", 1) == null;
assert cache.getAndPut("key2", 2) == null;
// Check inside transaction.
assert cache.get("key1") == 1;
assert cache.get("key2") == 2;
// Put again to check returned values.
assert cache.getAndPut("key1", 1) == 1;
assert cache.getAndPut("key2", 2) == 2;
checkContainsKey(true, "key1");
checkContainsKey(true, "key2");
assert cache.get("key1") != null;
assert cache.get("key2") != null;
assert cache.get("wrong") == null;
// Check outside transaction.
checkContainsKey(true, "key1");
checkContainsKey(true, "key2");
assert cache.get("key1") == 1;
assert cache.get("key2") == 2;
assert cache.get("wrong") == null;
assertEquals((Integer)1, cache.getAndPut("key1", 10));
assertEquals((Integer)2, cache.getAndPut("key2", 11));
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutTx() throws Exception {
if (txShouldBeUsed()) {
IgniteCache<String, Integer> cache = jcache();
try (Transaction tx = transactions().txStart()) {
assert cache.getAndPut("key1", 1) == null;
assert cache.getAndPut("key2", 2) == null;
// Check inside transaction.
assert cache.get("key1") == 1;
assert cache.get("key2") == 2;
// Put again to check returned values.
assert cache.getAndPut("key1", 1) == 1;
assert cache.getAndPut("key2", 2) == 2;
assert cache.get("key1") != null;
assert cache.get("key2") != null;
assert cache.get("wrong") == null;
tx.commit();
}
// Check outside transaction.
checkContainsKey(true, "key1");
checkContainsKey(true, "key2");
assert cache.get("key1") == 1;
assert cache.get("key2") == 2;
assert cache.get("wrong") == null;
assertEquals((Integer)1, cache.getAndPut("key1", 10));
assertEquals((Integer)2, cache.getAndPut("key2", 11));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformOptimisticReadCommitted() throws Exception {
checkTransform(OPTIMISTIC, READ_COMMITTED);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformOptimisticRepeatableRead() throws Exception {
checkTransform(OPTIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformPessimisticReadCommitted() throws Exception {
checkTransform(PESSIMISTIC, READ_COMMITTED);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformPessimisticRepeatableRead() throws Exception {
checkTransform(PESSIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIgniteTransformOptimisticReadCommitted() throws Exception {
checkIgniteTransform(OPTIMISTIC, READ_COMMITTED);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIgniteTransformOptimisticRepeatableRead() throws Exception {
checkIgniteTransform(OPTIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIgniteTransformPessimisticReadCommitted() throws Exception {
checkIgniteTransform(PESSIMISTIC, READ_COMMITTED);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIgniteTransformPessimisticRepeatableRead() throws Exception {
checkIgniteTransform(PESSIMISTIC, REPEATABLE_READ);
}
/**
* @param concurrency Concurrency.
* @param isolation Isolation.
* @throws Exception If failed.
*/
private void checkIgniteTransform(TransactionConcurrency concurrency, TransactionIsolation isolation)
throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key2", 1);
cache.put("key3", 3);
Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, isolation) : null;
try {
assertEquals("null", cache.invoke("key1", INCR_IGNITE_PROCESSOR));
assertEquals("1", cache.invoke("key2", INCR_IGNITE_PROCESSOR));
assertEquals("3", cache.invoke("key3", RMV_IGNITE_PROCESSOR));
if (tx != null)
tx.commit();
}
catch (Exception e) {
e.printStackTrace();
throw e;
}
finally {
if (tx != null)
tx.close();
}
assertEquals((Integer)1, cache.get("key1"));
assertEquals((Integer)2, cache.get("key2"));
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", ONHEAP));
cache.remove("key1");
cache.put("key2", 1);
cache.put("key3", 3);
assertEquals("null", cache.invoke("key1", INCR_IGNITE_PROCESSOR));
assertEquals("1", cache.invoke("key2", INCR_IGNITE_PROCESSOR));
assertEquals("3", cache.invoke("key3", RMV_IGNITE_PROCESSOR));
assertEquals((Integer)1, cache.get("key1"));
assertEquals((Integer)2, cache.get("key2"));
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
assertNull(jcache(i).localPeek("key3", ONHEAP));
}
/**
* @param concurrency Concurrency.
* @param isolation Isolation.
* @throws Exception If failed.
*/
private void checkTransform(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key2", 1);
cache.put("key3", 3);
Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, isolation) : null;
try {
assertEquals("null", cache.invoke("key1", INCR_PROCESSOR));
assertEquals("1", cache.invoke("key2", INCR_PROCESSOR));
assertEquals("3", cache.invoke("key3", RMV_PROCESSOR));
if (tx != null)
tx.commit();
}
catch (Exception e) {
e.printStackTrace();
throw e;
}
finally {
if (tx != null)
tx.close();
}
assertEquals((Integer)1, cache.get("key1"));
assertEquals((Integer)2, cache.get("key2"));
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", ONHEAP));
cache.remove("key1");
cache.put("key2", 1);
cache.put("key3", 3);
assertEquals("null", cache.invoke("key1", INCR_PROCESSOR));
assertEquals("1", cache.invoke("key2", INCR_PROCESSOR));
assertEquals("3", cache.invoke("key3", RMV_PROCESSOR));
assertEquals((Integer)1, cache.get("key1"));
assertEquals((Integer)2, cache.get("key2"));
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
assertNull(jcache(i).localPeek("key3", ONHEAP));
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformAllOptimisticReadCommitted() throws Exception {
checkTransformAll(OPTIMISTIC, READ_COMMITTED);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformAllOptimisticRepeatableRead() throws Exception {
checkTransformAll(OPTIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformAllPessimisticReadCommitted() throws Exception {
checkTransformAll(PESSIMISTIC, READ_COMMITTED);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformAllPessimisticRepeatableRead() throws Exception {
checkTransformAll(PESSIMISTIC, REPEATABLE_READ);
}
/**
* @param concurrency Transaction concurrency.
* @param isolation Transaction isolation.
* @throws Exception If failed.
*/
private void checkTransformAll(TransactionConcurrency concurrency, TransactionIsolation isolation)
throws Exception {
final IgniteCache<String, Integer> cache = jcache();
cache.put("key2", 1);
cache.put("key3", 3);
if (txShouldBeUsed()) {
Map<String, EntryProcessorResult<String>> res;
try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) {
res = cache.invokeAll(F.asSet("key1", "key2", "key3"), INCR_PROCESSOR);
tx.commit();
}
assertEquals((Integer)1, cache.get("key1"));
assertEquals((Integer)2, cache.get("key2"));
assertEquals((Integer)4, cache.get("key3"));
assertEquals("null", res.get("key1").get());
assertEquals("1", res.get("key2").get());
assertEquals("3", res.get("key3").get());
assertEquals(3, res.size());
cache.remove("key1");
cache.put("key2", 1);
cache.put("key3", 3);
}
Map<String, EntryProcessorResult<String>> res = cache.invokeAll(F.asSet("key1", "key2", "key3"), RMV_PROCESSOR);
for (int i = 0; i < gridCount(); i++) {
assertNull(jcache(i).localPeek("key1", ONHEAP));
assertNull(jcache(i).localPeek("key2", ONHEAP));
assertNull(jcache(i).localPeek("key3", ONHEAP));
}
assertEquals("null", res.get("key1").get());
assertEquals("1", res.get("key2").get());
assertEquals("3", res.get("key3").get());
assertEquals(3, res.size());
cache.remove("key1");
cache.put("key2", 1);
cache.put("key3", 3);
res = cache.invokeAll(F.asSet("key1", "key2", "key3"), INCR_PROCESSOR);
assertEquals((Integer)1, cache.get("key1"));
assertEquals((Integer)2, cache.get("key2"));
assertEquals((Integer)4, cache.get("key3"));
assertEquals("null", res.get("key1").get());
assertEquals("1", res.get("key2").get());
assertEquals("3", res.get("key3").get());
assertEquals(3, res.size());
cache.remove("key1");
cache.put("key2", 1);
cache.put("key3", 3);
res = cache.invokeAll(F.asMap("key1", INCR_PROCESSOR, "key2", INCR_PROCESSOR, "key3", INCR_PROCESSOR));
assertEquals((Integer)1, cache.get("key1"));
assertEquals((Integer)2, cache.get("key2"));
assertEquals((Integer)4, cache.get("key3"));
assertEquals("null", res.get("key1").get());
assertEquals("1", res.get("key2").get());
assertEquals("3", res.get("key3").get());
assertEquals(3, res.size());
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformAllWithNulls() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.invokeAll((Set<String>)null, INCR_PROCESSOR);
return null;
}
}, NullPointerException.class, null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.invokeAll(F.asSet("key1"), null);
return null;
}
}, NullPointerException.class, null);
{
final Set<String> keys = new LinkedHashSet<>(2);
keys.add("key1");
keys.add(null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.invokeAll(keys, INCR_PROCESSOR);
return null;
}
}, NullPointerException.class, null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.invokeAll(F.asSet("key1"), null);
return null;
}
}, NullPointerException.class, null);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformSequentialOptimisticNoStart() throws Exception {
checkTransformSequential0(false, OPTIMISTIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformSequentialPessimisticNoStart() throws Exception {
checkTransformSequential0(false, PESSIMISTIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformSequentialOptimisticWithStart() throws Exception {
checkTransformSequential0(true, OPTIMISTIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformSequentialPessimisticWithStart() throws Exception {
checkTransformSequential0(true, PESSIMISTIC);
}
/**
* @param startVal Whether to put value.
* @param concurrency Concurrency.
* @throws Exception If failed.
*/
private void checkTransformSequential0(boolean startVal, TransactionConcurrency concurrency)
throws Exception {
IgniteCache<String, Integer> cache = jcache();
final String key = primaryKeysForCache(cache, 1).get(0);
Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null;
try {
if (startVal)
cache.put(key, 2);
else
assertEquals(null, cache.get(key));
Integer expRes = startVal ? 2 : null;
assertEquals(String.valueOf(expRes), cache.invoke(key, INCR_PROCESSOR));
expRes = startVal ? 3 : 1;
assertEquals(String.valueOf(expRes), cache.invoke(key, INCR_PROCESSOR));
expRes++;
assertEquals(String.valueOf(expRes), cache.invoke(key, INCR_PROCESSOR));
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
Integer exp = (startVal ? 2 : 0) + 3;
assertEquals(exp, cache.get(key));
for (int i = 0; i < gridCount(); i++) {
if (ignite(i).affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(grid(i).localNode(), key))
assertEquals(exp, peek(jcache(i), key));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformAfterRemoveOptimistic() throws Exception {
checkTransformAfterRemove(OPTIMISTIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformAfterRemovePessimistic() throws Exception {
checkTransformAfterRemove(PESSIMISTIC);
}
/**
* @param concurrency Concurrency.
* @throws Exception If failed.
*/
private void checkTransformAfterRemove(TransactionConcurrency concurrency) throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key", 4);
Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null;
try {
cache.remove("key");
cache.invoke("key", INCR_PROCESSOR);
cache.invoke("key", INCR_PROCESSOR);
cache.invoke("key", INCR_PROCESSOR);
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
assertEquals((Integer)3, cache.get("key"));
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformReturnValueGetOptimisticReadCommitted() throws Exception {
checkTransformReturnValue(false, OPTIMISTIC, READ_COMMITTED);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformReturnValueGetOptimisticRepeatableRead() throws Exception {
checkTransformReturnValue(false, OPTIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformReturnValueGetPessimisticReadCommitted() throws Exception {
checkTransformReturnValue(false, PESSIMISTIC, READ_COMMITTED);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformReturnValueGetPessimisticRepeatableRead() throws Exception {
checkTransformReturnValue(false, PESSIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformReturnValuePutInTx() throws Exception {
checkTransformReturnValue(true, OPTIMISTIC, READ_COMMITTED);
}
/**
* @param put Whether to put value.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @throws Exception If failed.
*/
private void checkTransformReturnValue(boolean put,
TransactionConcurrency concurrency,
TransactionIsolation isolation)
throws Exception {
IgniteCache<String, Integer> cache = jcache();
if (!put)
cache.put("key", 1);
Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, isolation) : null;
try {
if (put)
cache.put("key", 1);
cache.invoke("key", INCR_PROCESSOR);
assertEquals((Integer)2, cache.get("key"));
if (tx != null) {
// Second get inside tx. Make sure read value is not transformed twice.
assertEquals((Integer)2, cache.get("key"));
tx.commit();
}
}
finally {
if (tx != null)
tx.close();
}
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAndPutAsyncOld() throws Exception {
IgniteCache<String, Integer> cache = jcache();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cache.put("key1", 1);
cache.put("key2", 2);
cacheAsync.getAndPut("key1", 10);
IgniteFuture<Integer> fut1 = cacheAsync.future();
cacheAsync.getAndPut("key2", 11);
IgniteFuture<Integer> fut2 = cacheAsync.future();
assertEquals((Integer)1, fut1.get(5000));
assertEquals((Integer)2, fut2.get(5000));
assertEquals((Integer)10, cache.get("key1"));
assertEquals((Integer)11, cache.get("key2"));
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAndPutAsync() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
IgniteFuture<Integer> fut1 = cache.getAndPutAsync("key1", 10);
IgniteFuture<Integer> fut2 = cache.getAndPutAsync("key2", 11);
assertEquals((Integer)1, fut1.get(5000));
assertEquals((Integer)2, fut2.get(5000));
assertEquals((Integer)10, cache.get("key1"));
assertEquals((Integer)11, cache.get("key2"));
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutAsyncOld0() throws Exception {
IgniteCache<String, Integer> cacheAsync = jcache().withAsync();
cacheAsync.getAndPut("key1", 0);
IgniteFuture<Integer> fut1 = cacheAsync.future();
cacheAsync.getAndPut("key2", 1);
IgniteFuture<Integer> fut2 = cacheAsync.future();
assert fut1.get(5000) == null;
assert fut2.get(5000) == null;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutAsync0() throws Exception {
IgniteCache<String, Integer> cache = jcache();
IgniteFuture<Integer> fut1 = cache.getAndPutAsync("key1", 0);
IgniteFuture<Integer> fut2 = cache.getAndPutAsync("key2", 1);
assert fut1.get(5000) == null;
assert fut2.get(5000) == null;
}
/**
* @throws Exception If failed.
*/
@Test
public void testInvokeAsyncOld() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key2", 1);
cache.put("key3", 3);
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
assertNull(cacheAsync.invoke("key1", INCR_PROCESSOR));
IgniteFuture<?> fut0 = cacheAsync.future();
assertNull(cacheAsync.invoke("key2", INCR_PROCESSOR));
IgniteFuture<?> fut1 = cacheAsync.future();
assertNull(cacheAsync.invoke("key3", RMV_PROCESSOR));
IgniteFuture<?> fut2 = cacheAsync.future();
fut0.get();
fut1.get();
fut2.get();
assertEquals((Integer)1, cache.get("key1"));
assertEquals((Integer)2, cache.get("key2"));
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
assertNull(jcache(i).localPeek("key3", ONHEAP));
}
/**
* @throws Exception If failed.
*/
@Test
public void testInvokeAsync() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key2", 1);
cache.put("key3", 3);
IgniteFuture<?> fut0 = cache.invokeAsync("key1", INCR_PROCESSOR);
IgniteFuture<?> fut1 = cache.invokeAsync("key2", INCR_PROCESSOR);
IgniteFuture<?> fut2 = cache.invokeAsync("key3", RMV_PROCESSOR);
fut0.get();
fut1.get();
fut2.get();
assertEquals((Integer)1, cache.get("key1"));
assertEquals((Integer)2, cache.get("key2"));
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
assertNull(jcache(i).localPeek("key3", ONHEAP));
}
/**
* @throws Exception If failed.
*/
@Test
public void testInvoke() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
assertEquals("null", cache.invoke("k0", INCR_PROCESSOR));
assertEquals((Integer)1, cache.get("k0"));
assertEquals("1", cache.invoke("k0", INCR_PROCESSOR));
assertEquals((Integer)2, cache.get("k0"));
cache.put("k1", 1);
assertEquals("1", cache.invoke("k1", INCR_PROCESSOR));
assertEquals((Integer)2, cache.get("k1"));
assertEquals("2", cache.invoke("k1", INCR_PROCESSOR));
assertEquals((Integer)3, cache.get("k1"));
EntryProcessor<String, Integer, Integer> c = new RemoveAndReturnNullEntryProcessor();
assertNull(cache.invoke("k1", c));
assertNull(cache.get("k1"));
for (int i = 0; i < gridCount(); i++)
assertNull(jcache(i).localPeek("k1", ONHEAP));
final EntryProcessor<String, Integer, Integer> errProcessor = new FailedEntryProcessor();
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.invoke("k1", errProcessor);
return null;
}
}, EntryProcessorException.class, "Test entry processor exception.");
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutx() throws Exception {
if (txShouldBeUsed())
checkPut(true);
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutxNoTx() throws Exception {
checkPut(false);
}
/**
* @param inTx Whether to start transaction.
* @throws Exception If failed.
*/
private void checkPut(boolean inTx) throws Exception {
Transaction tx = inTx ? transactions().txStart() : null;
IgniteCache<String, Integer> cache = jcache();
try {
cache.put("key1", 1);
cache.put("key2", 2);
// Check inside transaction.
assert cache.get("key1") == 1;
assert cache.get("key2") == 2;
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
checkSize(F.asSet("key1", "key2"));
// Check outside transaction.
checkContainsKey(true, "key1");
checkContainsKey(true, "key2");
checkContainsKey(false, "wrong");
assert cache.get("key1") == 1;
assert cache.get("key2") == 2;
assert cache.get("wrong") == null;
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutAsyncOld() throws Exception {
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
IgniteCache<String, Integer> cacheAsync = jcache().withAsync();
try {
jcache().put("key2", 1);
cacheAsync.put("key1", 10);
IgniteFuture<?> fut1 = cacheAsync.future();
cacheAsync.put("key2", 11);
IgniteFuture<?> fut2 = cacheAsync.future();
IgniteFuture<Transaction> f = null;
if (tx != null) {
tx = (Transaction)tx.withAsync();
tx.commit();
f = tx.future();
}
assertNull(fut1.get());
assertNull(fut2.get());
assert f == null || f.get().state() == COMMITTED;
}
finally {
if (tx != null)
tx.close();
}
checkSize(F.asSet("key1", "key2"));
assert jcache().get("key1") == 10;
assert jcache().get("key2") == 11;
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutAsync() throws Exception {
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
try {
jcache().put("key2", 1);
IgniteFuture<?> fut1 = jcache().putAsync("key1", 10);
IgniteFuture<?> fut2 = jcache().putAsync("key2", 11);
IgniteFuture<Void> f = null;
if (tx != null)
f = tx.commitAsync();
assertNull(fut1.get());
assertNull(fut2.get());
try {
if (f != null)
f.get();
}
catch (Throwable t) {
assert false : "Unexpected exception " + t;
}
}
finally {
if (tx != null)
tx.close();
}
checkSize(F.asSet("key1", "key2"));
assert jcache().get("key1") == 10;
assert jcache().get("key2") == 11;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutAll() throws Exception {
Map<String, Integer> map = F.asMap("key1", 1, "key2", 2);
IgniteCache<String, Integer> cache = jcache();
cache.putAll(map);
checkSize(F.asSet("key1", "key2"));
assert cache.get("key1") == 1;
assert cache.get("key2") == 2;
map.put("key1", 10);
map.put("key2", 20);
cache.putAll(map);
checkSize(F.asSet("key1", "key2"));
assert cache.get("key1") == 10;
assert cache.get("key2") == 20;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testNullInTx() throws Exception {
if (!txShouldBeUsed())
return;
final IgniteCache<String, Integer> cache = jcache();
for (int i = 0; i < 100; i++) {
final String key = "key-" + i;
assertNull(cache.get(key));
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteTransactions txs = transactions();
try (Transaction tx = txs.txStart()) {
cache.put(key, 1);
cache.put(null, 2);
tx.commit();
}
return null;
}
}, NullPointerException.class, null);
assertNull(cache.get(key));
cache.put(key, 1);
assertEquals(1, (int)cache.get(key));
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteTransactions txs = transactions();
try (Transaction tx = txs.txStart()) {
cache.put(key, 2);
cache.remove(null);
tx.commit();
}
return null;
}
}, NullPointerException.class, null);
assertEquals(1, (int)cache.get(key));
cache.put(key, 2);
assertEquals(2, (int)cache.get(key));
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteTransactions txs = transactions();
Map<String, Integer> map = new LinkedHashMap<>();
map.put("k1", 1);
map.put("k2", 2);
map.put(null, 3);
try (Transaction tx = txs.txStart()) {
cache.put(key, 1);
cache.putAll(map);
tx.commit();
}
return null;
}
}, NullPointerException.class, null);
assertNull(cache.get("k1"));
assertNull(cache.get("k2"));
assertEquals(2, (int)cache.get(key));
cache.put(key, 3);
assertEquals(3, (int)cache.get(key));
}
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutAllWithNulls() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
{
final Map<String, Integer> m = new LinkedHashMap<>(2);
m.put("key1", 1);
m.put(null, 2);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.putAll(m);
return null;
}
}, NullPointerException.class, null);
cache.put("key1", 1);
assertEquals(1, (int)cache.get("key1"));
}
{
final Map<String, Integer> m = new LinkedHashMap<>(2);
m.put("key3", 3);
m.put("key4", null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.putAll(m);
return null;
}
}, NullPointerException.class, null);
m.put("key4", 4);
cache.putAll(m);
assertEquals(3, (int)cache.get("key3"));
assertEquals(4, (int)cache.get("key4"));
}
assertThrows(log, new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
cache.put("key1", null);
return null;
}
}, NullPointerException.class, A.NULL_MSG_PREFIX);
assertThrows(log, new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
cache.getAndPut("key1", null);
return null;
}
}, NullPointerException.class, A.NULL_MSG_PREFIX);
assertThrows(log, new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
cache.put(null, 1);
return null;
}
}, NullPointerException.class, A.NULL_MSG_PREFIX);
assertThrows(log, new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
cache.replace(null, 1);
return null;
}
}, NullPointerException.class, A.NULL_MSG_PREFIX);
assertThrows(log, new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
cache.getAndReplace(null, 1);
return null;
}
}, NullPointerException.class, A.NULL_MSG_PREFIX);
assertThrows(log, new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
cache.replace("key", null);
return null;
}
}, NullPointerException.class, A.NULL_MSG_PREFIX);
assertThrows(log, new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
cache.getAndReplace("key", null);
return null;
}
}, NullPointerException.class, A.NULL_MSG_PREFIX);
assertThrows(log, new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
cache.replace(null, 1, 2);
return null;
}
}, NullPointerException.class, A.NULL_MSG_PREFIX);
assertThrows(log, new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
cache.replace("key", null, 2);
return null;
}
}, NullPointerException.class, A.NULL_MSG_PREFIX);
assertThrows(log, new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
cache.replace("key", 1, null);
return null;
}
}, NullPointerException.class, A.NULL_MSG_PREFIX);
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutAllAsyncOld() throws Exception {
Map<String, Integer> map = F.asMap("key1", 1, "key2", 2);
IgniteCache<String, Integer> cache = jcache();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cacheAsync.putAll(map);
IgniteFuture<?> f1 = cacheAsync.future();
map.put("key1", 10);
map.put("key2", 20);
cacheAsync.putAll(map);
IgniteFuture<?> f2 = cacheAsync.future();
assertNull(f2.get());
assertNull(f1.get());
checkSize(F.asSet("key1", "key2"));
assert cache.get("key1") == 10;
assert cache.get("key2") == 20;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutAllAsync() throws Exception {
Map<String, Integer> map = F.asMap("key1", 1, "key2", 2);
IgniteCache<String, Integer> cache = jcache();
IgniteFuture<?> f1 = cache.putAllAsync(map);
map.put("key1", 10);
map.put("key2", 20);
IgniteFuture<?> f2 = cache.putAllAsync(map);
assertNull(f2.get());
assertNull(f1.get());
checkSize(F.asSet("key1", "key2"));
assert cache.get("key1") == 10;
assert cache.get("key2") == 20;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAndPutIfAbsent() throws Exception {
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
IgniteCache<String, Integer> cache = jcache();
try {
assert cache.getAndPutIfAbsent("key", 1) == null;
assert cache.get("key") != null;
assert cache.get("key") == 1;
assert cache.getAndPutIfAbsent("key", 2) != null;
assert cache.getAndPutIfAbsent("key", 2) == 1;
assert cache.get("key") != null;
assert cache.get("key") == 1;
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
assert cache.getAndPutIfAbsent("key", 2) != null;
for (int i = 0; i < gridCount(); i++) {
info("Peek on node [i=" + i + ", id=" + grid(i).localNode().id() + ", val=" +
grid(i).cache(DEFAULT_CACHE_NAME).localPeek("key", ONHEAP) + ']');
}
assertEquals((Integer)1, cache.getAndPutIfAbsent("key", 2));
assert cache.get("key") != null;
assert cache.get("key") == 1;
// Check swap.
cache.put("key2", 1);
cache.localEvict(Collections.singleton("key2"));
assertEquals((Integer)1, cache.getAndPutIfAbsent("key2", 3));
// Check db.
if (!isMultiJvm()) {
storeStgy.putToStore("key3", 3);
assertEquals((Integer)3, cache.getAndPutIfAbsent("key3", 4));
assertEquals((Integer)3, cache.get("key3"));
}
assertEquals((Integer)1, cache.get("key2"));
cache.localEvict(Collections.singleton("key2"));
// Same checks inside tx.
tx = txShouldBeUsed() ? transactions().txStart() : null;
try {
assertEquals((Integer)1, cache.getAndPutIfAbsent("key2", 3));
if (tx != null)
tx.commit();
assertEquals((Integer)1, cache.get("key2"));
}
finally {
if (tx != null)
tx.close();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetAndPutIfAbsentAsyncOld() throws Exception {
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
IgniteCache<String, Integer> cache = jcache();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
try {
cacheAsync.getAndPutIfAbsent("key", 1);
IgniteFuture<Integer> fut1 = cacheAsync.future();
assertNull(fut1.get());
assertEquals((Integer)1, cache.get("key"));
cacheAsync.getAndPutIfAbsent("key", 2);
IgniteFuture<Integer> fut2 = cacheAsync.future();
assertEquals((Integer)1, fut2.get());
assertEquals((Integer)1, cache.get("key"));
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
// Check swap.
cache.put("key2", 1);
cache.localEvict(Collections.singleton("key2"));
cacheAsync.getAndPutIfAbsent("key2", 3);
assertEquals((Integer)1, cacheAsync.<Integer>future().get());
// Check db.
if (!isMultiJvm()) {
storeStgy.putToStore("key3", 3);
cacheAsync.getAndPutIfAbsent("key3", 4);
assertEquals((Integer)3, cacheAsync.<Integer>future().get());
}
cache.localEvict(Collections.singleton("key2"));
// Same checks inside tx.
tx = txShouldBeUsed() ? transactions().txStart() : null;
try {
cacheAsync.getAndPutIfAbsent("key2", 3);
assertEquals(1, cacheAsync.future().get());
if (tx != null)
tx.commit();
assertEquals((Integer)1, cache.get("key2"));
}
finally {
if (tx != null)
tx.close();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetAndPutIfAbsentAsync() throws Exception {
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
IgniteCache<String, Integer> cache = jcache();
try {
IgniteFuture<Integer> fut1 = cache.getAndPutIfAbsentAsync("key", 1);
assertNull(fut1.get());
assertEquals((Integer)1, cache.get("key"));
IgniteFuture<Integer> fut2 = cache.getAndPutIfAbsentAsync("key", 2);
assertEquals((Integer)1, fut2.get());
assertEquals((Integer)1, cache.get("key"));
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
// Check swap.
cache.put("key2", 1);
cache.localEvict(Collections.singleton("key2"));
assertEquals((Integer)1, cache.getAndPutIfAbsentAsync("key2", 3).get());
// Check db.
if (!isMultiJvm()) {
storeStgy.putToStore("key3", 3);
assertEquals((Integer)3, cache.getAndPutIfAbsentAsync("key3", 4).get());
}
cache.localEvict(Collections.singleton("key2"));
// Same checks inside tx.
tx = txShouldBeUsed() ? transactions().txStart() : null;
try {
assertEquals(1, (int)cache.getAndPutIfAbsentAsync("key2", 3).get());
if (tx != null)
tx.commit();
assertEquals((Integer)1, cache.get("key2"));
}
finally {
if (tx != null)
tx.close();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutIfAbsent() throws Exception {
IgniteCache<String, Integer> cache = jcache();
assertNull(cache.get("key"));
assert cache.putIfAbsent("key", 1);
assert cache.get("key") != null && cache.get("key") == 1;
assert !cache.putIfAbsent("key", 2);
assert cache.get("key") != null && cache.get("key") == 1;
// Check swap.
cache.put("key2", 1);
cache.localEvict(Collections.singleton("key2"));
assertFalse(cache.putIfAbsent("key2", 3));
// Check db.
if (!isMultiJvm()) {
storeStgy.putToStore("key3", 3);
assertFalse(cache.putIfAbsent("key3", 4));
}
cache.localEvict(Collections.singleton("key2"));
// Same checks inside tx.
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
try {
assertFalse(cache.putIfAbsent("key2", 3));
if (tx != null)
tx.commit();
assertEquals((Integer)1, cache.get("key2"));
}
finally {
if (tx != null)
tx.close();
}
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutxIfAbsentAsync() throws Exception {
if (txShouldBeUsed())
checkPutxIfAbsentAsync(true);
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutxIfAbsentAsyncNoTx() throws Exception {
checkPutxIfAbsentAsync(false);
}
/**
* @param inTx In tx flag.
* @throws Exception If failed.
*/
private void checkPutxIfAbsentAsyncOld(boolean inTx) throws Exception {
IgniteCache<String, Integer> cache = jcache();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cacheAsync.putIfAbsent("key", 1);
IgniteFuture<Boolean> fut1 = cacheAsync.future();
assert fut1.get();
assert cache.get("key") != null && cache.get("key") == 1;
cacheAsync.putIfAbsent("key", 2);
IgniteFuture<Boolean> fut2 = cacheAsync.future();
assert !fut2.get();
assert cache.get("key") != null && cache.get("key") == 1;
// Check swap.
cache.put("key2", 1);
cache.localEvict(Collections.singleton("key2"));
cacheAsync.putIfAbsent("key2", 3);
assertFalse(cacheAsync.<Boolean>future().get());
// Check db.
if (!isMultiJvm()) {
storeStgy.putToStore("key3", 3);
cacheAsync.putIfAbsent("key3", 4);
assertFalse(cacheAsync.<Boolean>future().get());
}
cache.localEvict(Collections.singletonList("key2"));
// Same checks inside tx.
Transaction tx = inTx ? transactions().txStart() : null;
try {
cacheAsync.putIfAbsent("key2", 3);
assertFalse(cacheAsync.<Boolean>future().get());
if (!isMultiJvm()) {
cacheAsync.putIfAbsent("key3", 4);
assertFalse(cacheAsync.<Boolean>future().get());
}
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
assertEquals((Integer)1, cache.get("key2"));
if (!isMultiJvm())
assertEquals((Integer)3, cache.get("key3"));
}
/**
* @param inTx In tx flag.
* @throws Exception If failed.
*/
private void checkPutxIfAbsentAsync(boolean inTx) throws Exception {
IgniteCache<String, Integer> cache = jcache();
IgniteFuture<Boolean> fut1 = cache.putIfAbsentAsync("key", 1);
assert fut1.get();
assert cache.get("key") != null && cache.get("key") == 1;
IgniteFuture<Boolean> fut2 = cache.putIfAbsentAsync("key", 2);
assert !fut2.get();
assert cache.get("key") != null && cache.get("key") == 1;
// Check swap.
cache.put("key2", 1);
cache.localEvict(Collections.singleton("key2"));
assertFalse(cache.putIfAbsentAsync("key2", 3).get());
// Check db.
if (!isMultiJvm()) {
storeStgy.putToStore("key3", 3);
assertFalse(cache.putIfAbsentAsync("key3", 4).get());
}
cache.localEvict(Collections.singletonList("key2"));
// Same checks inside tx.
Transaction tx = inTx ? transactions().txStart() : null;
try {
assertFalse(cache.putIfAbsentAsync("key2", 3).get());
if (!isMultiJvm())
assertFalse(cache.putIfAbsentAsync("key3", 4).get());
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
assertEquals((Integer)1, cache.get("key2"));
if (!isMultiJvm())
assertEquals((Integer)3, cache.get("key3"));
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutIfAbsentAsyncConcurrentOld() throws Exception {
IgniteCache<String, Integer> cacheAsync = jcache().withAsync();
cacheAsync.putIfAbsent("key1", 1);
IgniteFuture<Boolean> fut1 = cacheAsync.future();
cacheAsync.putIfAbsent("key2", 2);
IgniteFuture<Boolean> fut2 = cacheAsync.future();
assert fut1.get();
assert fut2.get();
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPutIfAbsentAsyncConcurrent() throws Exception {
IgniteCache<String, Integer> cache = jcache();
IgniteFuture<Boolean> fut1 = cache.putIfAbsentAsync("key1", 1);
IgniteFuture<Boolean> fut2 = cache.putIfAbsentAsync("key2", 2);
assert fut1.get();
assert fut2.get();
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetAndReplace() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key", 1);
assert cache.get("key") == 1;
info("key 1 -> 2");
assert cache.getAndReplace("key", 2) == 1;
assert cache.get("key") == 2;
assert cache.getAndReplace("wrong", 0) == null;
assert cache.get("wrong") == null;
info("key 0 -> 3");
assert !cache.replace("key", 0, 3);
assert cache.get("key") == 2;
info("key 0 -> 3");
assert !cache.replace("key", 0, 3);
assert cache.get("key") == 2;
info("key 2 -> 3");
assert cache.replace("key", 2, 3);
assert cache.get("key") == 3;
info("evict key");
cache.localEvict(Collections.singleton("key"));
info("key 3 -> 4");
assert cache.replace("key", 3, 4);
assert cache.get("key") == 4;
if (!isMultiJvm()) {
storeStgy.putToStore("key2", 5);
info("key2 5 -> 6");
assert cache.replace("key2", 5, 6);
}
for (int i = 0; i < gridCount(); i++) {
info("Peek key on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() +
", peekVal=" + grid(i).cache(DEFAULT_CACHE_NAME).localPeek("key", ONHEAP) + ']');
info("Peek key2 on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() +
", peekVal=" + grid(i).cache(DEFAULT_CACHE_NAME).localPeek("key2", ONHEAP) + ']');
}
if (!isMultiJvm())
assertEquals((Integer)6, cache.get("key2"));
cache.localEvict(Collections.singleton("key"));
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
try {
assert cache.replace("key", 4, 5);
if (tx != null)
tx.commit();
assert cache.get("key") == 5;
}
finally {
if (tx != null)
tx.close();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testReplace() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key", 1);
assert cache.get("key") == 1;
assert cache.replace("key", 2);
assert cache.get("key") == 2;
assert !cache.replace("wrong", 2);
cache.localEvict(Collections.singleton("key"));
assert cache.replace("key", 4);
assert cache.get("key") == 4;
if (!isMultiJvm()) {
storeStgy.putToStore("key2", 5);
assert cache.replace("key2", 6);
assertEquals((Integer)6, cache.get("key2"));
}
cache.localEvict(Collections.singleton("key"));
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
try {
assert cache.replace("key", 5);
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
assert cache.get("key") == 5;
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetAndReplaceAsyncOld() throws Exception {
IgniteCache<String, Integer> cache = jcache();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cache.put("key", 1);
assert cache.get("key") == 1;
cacheAsync.getAndReplace("key", 2);
assert cacheAsync.<Integer>future().get() == 1;
assert cache.get("key") == 2;
cacheAsync.getAndReplace("wrong", 0);
assert cacheAsync.future().get() == null;
assert cache.get("wrong") == null;
cacheAsync.replace("key", 0, 3);
assert !cacheAsync.<Boolean>future().get();
assert cache.get("key") == 2;
cacheAsync.replace("key", 0, 3);
assert !cacheAsync.<Boolean>future().get();
assert cache.get("key") == 2;
cacheAsync.replace("key", 2, 3);
assert cacheAsync.<Boolean>future().get();
assert cache.get("key") == 3;
cache.localEvict(Collections.singleton("key"));
cacheAsync.replace("key", 3, 4);
assert cacheAsync.<Boolean>future().get();
assert cache.get("key") == 4;
if (!isMultiJvm()) {
storeStgy.putToStore("key2", 5);
cacheAsync.replace("key2", 5, 6);
assert cacheAsync.<Boolean>future().get();
assertEquals((Integer)6, cache.get("key2"));
}
cache.localEvict(Collections.singleton("key"));
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
try {
cacheAsync.replace("key", 4, 5);
assert cacheAsync.<Boolean>future().get();
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
assert cache.get("key") == 5;
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetAndReplaceAsync() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key", 1);
assert cache.get("key") == 1;
assert cache.getAndReplaceAsync("key", 2).get() == 1;
assert cache.get("key") == 2;
assert cache.getAndReplaceAsync("wrong", 0).get() == null;
assert cache.get("wrong") == null;
assert !cache.replaceAsync("key", 0, 3).get();
assert cache.get("key") == 2;
assert !cache.replaceAsync("key", 0, 3).get();
assert cache.get("key") == 2;
assert cache.replaceAsync("key", 2, 3).get();
assert cache.get("key") == 3;
cache.localEvict(Collections.singleton("key"));
assert cache.replaceAsync("key", 3, 4).get();
assert cache.get("key") == 4;
if (!isMultiJvm()) {
storeStgy.putToStore("key2", 5);
assert cache.replaceAsync("key2", 5, 6).get();
assertEquals((Integer)6, cache.get("key2"));
}
cache.localEvict(Collections.singleton("key"));
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
try {
assert cache.replaceAsync("key", 4, 5).get();
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
assert cache.get("key") == 5;
}
/**
* @throws Exception If failed.
*/
@Test
public void testReplacexAsyncOld() throws Exception {
IgniteCache<String, Integer> cache = jcache();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cache.put("key", 1);
assert cache.get("key") == 1;
cacheAsync.replace("key", 2);
assert cacheAsync.<Boolean>future().get();
info("Finished replace.");
assertEquals((Integer)2, cache.get("key"));
cacheAsync.replace("wrond", 2);
assert !cacheAsync.<Boolean>future().get();
cache.localEvict(Collections.singleton("key"));
cacheAsync.replace("key", 4);
assert cacheAsync.<Boolean>future().get();
assert cache.get("key") == 4;
if (!isMultiJvm()) {
storeStgy.putToStore("key2", 5);
cacheAsync.replace("key2", 6);
assert cacheAsync.<Boolean>future().get();
assert cache.get("key2") == 6;
}
cache.localEvict(Collections.singleton("key"));
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
try {
cacheAsync.replace("key", 5);
assert cacheAsync.<Boolean>future().get();
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
assert cache.get("key") == 5;
}
/**
* @throws Exception If failed.
*/
@Test
public void testReplacexAsync() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key", 1);
assert cache.get("key") == 1;
assert cache.replaceAsync("key", 2).get();
info("Finished replace.");
assertEquals((Integer)2, cache.get("key"));
assert !cache.replaceAsync("wrond", 2).get();
cache.localEvict(Collections.singleton("key"));
assert cache.replaceAsync("key", 4).get();
assert cache.get("key") == 4;
if (!isMultiJvm()) {
storeStgy.putToStore("key2", 5);
assert cache.replaceAsync("key2", 6).get();
assert cache.get("key2") == 6;
}
cache.localEvict(Collections.singleton("key"));
Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
try {
assert cache.replaceAsync("key", 5).get();
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
assert cache.get("key") == 5;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAndRemove() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
assert !cache.remove("key1", 0);
assert cache.get("key1") != null && cache.get("key1") == 1;
assert cache.remove("key1", 1);
assert cache.get("key1") == null;
assert cache.getAndRemove("key2") == 2;
assert cache.get("key2") == null;
assert cache.getAndRemove("key2") == null;
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetAndRemoveObject() throws Exception {
IgniteCache<String, TestValue> cache = ignite(0).cache(DEFAULT_CACHE_NAME);
TestValue val1 = new TestValue(1);
TestValue val2 = new TestValue(2);
cache.put("key1", val1);
cache.put("key2", val2);
assert !cache.remove("key1", new TestValue(0));
TestValue oldVal = cache.get("key1");
assert oldVal != null && F.eq(val1, oldVal);
assert cache.remove("key1");
assert cache.get("key1") == null;
TestValue oldVal2 = cache.getAndRemove("key2");
assert F.eq(val2, oldVal2);
assert cache.get("key2") == null;
assert cache.getAndRemove("key2") == null;
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetAndPutObject() throws Exception {
IgniteCache<String, TestValue> cache = ignite(0).cache(DEFAULT_CACHE_NAME);
TestValue val1 = new TestValue(1);
TestValue val2 = new TestValue(2);
cache.put("key1", val1);
TestValue oldVal = cache.get("key1");
assertEquals(val1, oldVal);
oldVal = cache.getAndPut("key1", val2);
assertEquals(val1, oldVal);
TestValue updVal = cache.get("key1");
assertEquals(val2, updVal);
}
/**
* TODO: GG-11241.
*
* @throws Exception If failed.
*/
@Test
public void testDeletedEntriesFlag() throws Exception {
if (cacheMode() != LOCAL && cacheMode() != REPLICATED) {
final int cnt = 3;
IgniteCache<String, Integer> cache = jcache();
for (int i = 0; i < cnt; i++)
cache.put(String.valueOf(i), i);
for (int i = 0; i < cnt; i++)
cache.remove(String.valueOf(i));
for (int g = 0; g < gridCount(); g++)
executeOnLocalOrRemoteJvm(g, new CheckEntriesDeletedTask(cnt));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testRemoveLoad() throws Exception {
int cnt = 10;
Set<String> keys = new HashSet<>();
for (int i = 0; i < cnt; i++)
keys.add(String.valueOf(i));
jcache().removeAll(keys);
for (String key : keys)
storeStgy.putToStore(key, Integer.parseInt(key));
for (int g = 0; g < gridCount(); g++)
grid(g).cache(DEFAULT_CACHE_NAME).localLoadCache(null);
for (int g = 0; g < gridCount(); g++) {
for (int i = 0; i < cnt; i++) {
String key = String.valueOf(i);
if (grid(0).affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key).contains(grid(g).localNode()))
assertEquals((Integer)i, peek(jcache(g), key));
else
assertNull(peek(jcache(g), key));
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testRemoveLoadAsync() throws Exception {
if (isMultiJvm())
return;
int cnt = 10;
Set<String> keys = new HashSet<>();
for (int i = 0; i < cnt; i++)
keys.add(String.valueOf(i));
jcache().removeAllAsync(keys).get();
for (String key : keys)
storeStgy.putToStore(key, Integer.parseInt(key));
for (int g = 0; g < gridCount(); g++)
grid(g).cache(DEFAULT_CACHE_NAME).localLoadCacheAsync(null).get();
for (int g = 0; g < gridCount(); g++) {
for (int i = 0; i < cnt; i++) {
String key = String.valueOf(i);
if (grid(0).affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key).contains(grid(g).localNode()))
assertEquals((Integer)i, peek(jcache(g), key));
else
assertNull(peek(jcache(g), key));
}
}
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAsyncOld() throws Exception {
IgniteCache<String, Integer> cache = jcache();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cache.put("key1", 1);
cache.put("key2", 2);
cacheAsync.remove("key1", 0);
assert !cacheAsync.<Boolean>future().get();
assert cache.get("key1") != null && cache.get("key1") == 1;
cacheAsync.remove("key1", 1);
assert cacheAsync.<Boolean>future().get();
assert cache.get("key1") == null;
cacheAsync.getAndRemove("key2");
assert cacheAsync.<Integer>future().get() == 2;
assert cache.get("key2") == null;
cacheAsync.getAndRemove("key2");
assert cacheAsync.future().get() == null;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAsync() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
assert !cache.removeAsync("key1", 0).get();
assert cache.get("key1") != null && cache.get("key1") == 1;
assert cache.removeAsync("key1", 1).get();
assert cache.get("key1") == null;
assert cache.getAndRemoveAsync("key2").get() == 2;
assert cache.get("key2") == null;
assert cache.getAndRemoveAsync("key2").get() == null;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemove() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
assert cache.remove("key1");
assert cache.get("key1") == null;
assert !cache.remove("key1");
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemovexAsyncOld() throws Exception {
IgniteCache<String, Integer> cache = jcache();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cache.put("key1", 1);
cacheAsync.remove("key1");
assert cacheAsync.<Boolean>future().get();
assert cache.get("key1") == null;
cacheAsync.remove("key1");
assert !cacheAsync.<Boolean>future().get();
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemovexAsync() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
assert cache.removeAsync("key1").get();
assert cache.get("key1") == null;
assert !cache.removeAsync("key1").get();
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGlobalRemoveAll() throws Exception {
globalRemoveAll(false);
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGlobalRemoveAllAsync() throws Exception {
globalRemoveAll(true);
}
/**
* @param async If {@code true} uses asynchronous operation.
* @throws Exception In case of error.
*/
private void globalRemoveAllOld(boolean async) throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
cache.put("key3", 3);
checkSize(F.asSet("key1", "key2", "key3"));
IgniteCache<String, Integer> asyncCache = cache.withAsync();
if (async) {
asyncCache.removeAll(F.asSet("key1", "key2"));
asyncCache.future().get();
}
else
cache.removeAll(F.asSet("key1", "key2"));
checkSize(F.asSet("key3"));
checkContainsKey(false, "key1");
checkContainsKey(false, "key2");
checkContainsKey(true, "key3");
// Put values again.
cache.put("key1", 1);
cache.put("key2", 2);
cache.put("key3", 3);
if (async) {
IgniteCache<String, Integer> asyncCache0 = jcache(gridCount() > 1 ? 1 : 0).withAsync();
asyncCache0.removeAll();
asyncCache0.future().get();
}
else
jcache(gridCount() > 1 ? 1 : 0).removeAll();
assertEquals(0, cache.localSize());
long entryCnt = hugeRemoveAllEntryCount();
for (int i = 0; i < entryCnt; i++)
cache.put(String.valueOf(i), i);
for (int i = 0; i < entryCnt; i++)
assertEquals(Integer.valueOf(i), cache.get(String.valueOf(i)));
if (async) {
asyncCache.removeAll();
asyncCache.future().get();
}
else
cache.removeAll();
for (int i = 0; i < entryCnt; i++)
assertNull(cache.get(String.valueOf(i)));
}
/**
* @param async If {@code true} uses asynchronous operation.
* @throws Exception In case of error.
*/
private void globalRemoveAll(boolean async) throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
cache.put("key3", 3);
checkSize(F.asSet("key1", "key2", "key3"));
if (async)
cache.removeAllAsync(F.asSet("key1", "key2")).get();
else
cache.removeAll(F.asSet("key1", "key2"));
checkSize(F.asSet("key3"));
checkContainsKey(false, "key1");
checkContainsKey(false, "key2");
checkContainsKey(true, "key3");
// Put values again.
cache.put("key1", 1);
cache.put("key2", 2);
cache.put("key3", 3);
if (async)
jcache(gridCount() > 1 ? 1 : 0).removeAllAsync().get();
else
jcache(gridCount() > 1 ? 1 : 0).removeAll();
assertEquals(0, cache.localSize());
long entryCnt = hugeRemoveAllEntryCount();
for (int i = 0; i < entryCnt; i++)
cache.put(String.valueOf(i), i);
for (int i = 0; i < entryCnt; i++)
assertEquals(Integer.valueOf(i), cache.get(String.valueOf(i)));
if (async)
cache.removeAllAsync().get();
else
cache.removeAll();
for (int i = 0; i < entryCnt; i++)
assertNull(cache.get(String.valueOf(i)));
}
/**
* @return Count of entries to be removed in removeAll() test.
*/
protected long hugeRemoveAllEntryCount() {
return 1000L;
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAllWithNulls() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
final Set<String> c = new LinkedHashSet<>();
c.add("key1");
c.add(null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.removeAll(c);
return null;
}
}, NullPointerException.class, null);
assertEquals(0, grid(0).cache(DEFAULT_CACHE_NAME).localSize());
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.removeAll(null);
return null;
}
}, NullPointerException.class, null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.remove(null);
return null;
}
}, NullPointerException.class, null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.getAndRemove(null);
return null;
}
}, NullPointerException.class, null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.remove("key1", null);
return null;
}
}, NullPointerException.class, null);
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAllDuplicates() throws Exception {
jcache().removeAll(ImmutableSet.of("key1", "key1", "key1"));
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAllDuplicatesTx() throws Exception {
if (txShouldBeUsed()) {
try (Transaction tx = transactions().txStart()) {
jcache().removeAll(ImmutableSet.of("key1", "key1", "key1"));
tx.commit();
}
}
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAllEmpty() throws Exception {
jcache().removeAll();
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAllAsyncOld() throws Exception {
IgniteCache<String, Integer> cache = jcache();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cache.put("key1", 1);
cache.put("key2", 2);
cache.put("key3", 3);
checkSize(F.asSet("key1", "key2", "key3"));
cacheAsync.removeAll(F.asSet("key1", "key2"));
assertNull(cacheAsync.future().get());
checkSize(F.asSet("key3"));
checkContainsKey(false, "key1");
checkContainsKey(false, "key2");
checkContainsKey(true, "key3");
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAllAsync() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
cache.put("key3", 3);
checkSize(F.asSet("key1", "key2", "key3"));
assertNull(cache.removeAllAsync(F.asSet("key1", "key2")).get());
checkSize(F.asSet("key3"));
checkContainsKey(false, "key1");
checkContainsKey(false, "key2");
checkContainsKey(true, "key3");
}
/**
* @throws Exception In case of error.
*/
@Test
public void testLoadAll() throws Exception {
IgniteCache<String, Integer> cache = jcache();
Set<String> keys = new HashSet<>(primaryKeysForCache(cache, 2));
for (String key : keys)
assertNull(cache.localPeek(key, ONHEAP));
Map<String, Integer> vals = new HashMap<>();
int i = 0;
for (String key : keys) {
cache.put(key, i);
vals.put(key, i);
i++;
}
for (String key : keys)
assertEquals(vals.get(key), peek(cache, key));
cache.clear();
for (String key : keys)
assertNull(peek(cache, key));
loadAll(cache, keys, true);
for (String key : keys)
assertEquals(vals.get(key), peek(cache, key));
}
/**
* @throws Exception If failed.
*/
@Test
public void testRemoveAfterClear() throws Exception {
IgniteEx ignite = grid(0);
boolean affNode = ignite.context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinityNode();
if (!affNode) {
if (gridCount() < 2)
return;
ignite = grid(1);
}
IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
int key = 0;
Collection<Integer> keys = new ArrayList<>();
for (int k = 0; k < 2; k++) {
while (!ignite.affinity(DEFAULT_CACHE_NAME).isPrimary(ignite.localNode(), key))
key++;
keys.add(key);
key++;
}
info("Keys: " + keys);
for (Integer k : keys)
cache.put(k, k);
cache.clear();
for (int g = 0; g < gridCount(); g++) {
Ignite grid0 = grid(g);
grid0.cache(DEFAULT_CACHE_NAME).removeAll();
assertTrue(grid0.cache(DEFAULT_CACHE_NAME).localSize() == 0);
}
}
/**
* @throws Exception In case of error.
*/
@Test
public void testClear() throws Exception {
IgniteCache<String, Integer> cache = jcache();
Set<String> keys = new HashSet<>(primaryKeysForCache(cache, 3));
for (String key : keys)
assertNull(cache.get(key));
Map<String, Integer> vals = new HashMap<>(keys.size());
int i = 0;
for (String key : keys) {
cache.put(key, i);
vals.put(key, i);
i++;
}
for (String key : keys)
assertEquals(vals.get(key), peek(cache, key));
cache.clear();
for (String key : keys)
assertNull(peek(cache, key));
for (i = 0; i < gridCount(); i++)
jcache(i).clear();
for (i = 0; i < gridCount(); i++)
assert jcache(i).localSize() == 0;
for (Map.Entry<String, Integer> entry : vals.entrySet())
cache.put(entry.getKey(), entry.getValue());
for (String key : keys)
assertEquals(vals.get(key), peek(cache, key));
String first = F.first(keys);
if (lockingEnabled()) {
Lock lock = cache.lock(first);
lock.lock();
try {
cache.clear();
GridCacheContext<String, Integer> cctx = context(0);
GridCacheEntryEx entry = cctx.isNear() ? cctx.near().dht().peekEx(first) :
cctx.cache().peekEx(first);
assertNotNull(entry);
}
finally {
lock.unlock();
}
}
else {
cache.clear();
cache.put(first, vals.get(first));
}
cache.clear();
assert cache.localSize() == 0 : "Values after clear.";
i = 0;
for (String key : keys) {
cache.put(key, i);
vals.put(key, i);
i++;
}
cache.put("key1", 1);
cache.put("key2", 2);
cache.localEvict(Sets.union(ImmutableSet.of("key1", "key2"), keys));
assert cache.localSize(ONHEAP) == 0;
cache.clear();
assert cache.localPeek("key1", ONHEAP) == null;
assert cache.localPeek("key2", ONHEAP) == null;
}
/**
* @param keys0 Keys to check.
* @throws IgniteCheckedException If failed.
*/
protected void checkUnlocked(final Collection<String> keys0) throws IgniteCheckedException {
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
for (int i = 0; i < gridCount(); i++) {
GridCacheAdapter<Object, Object> cache = ((IgniteKernal)ignite(i)).internalCache(DEFAULT_CACHE_NAME);
for (String key : keys0) {
GridCacheEntryEx entry = cache.peekEx(key);
if (entry != null) {
if (entry.lockedByAny()) {
info("Entry is still locked [i=" + i + ", entry=" + entry + ']');
return false;
}
}
if (cache.isNear()) {
entry = cache.context().near().dht().peekEx(key);
if (entry != null) {
if (entry.lockedByAny()) {
info("Entry is still locked [i=" + i + ", entry=" + entry + ']');
return false;
}
}
}
}
}
return true;
}
catch (GridCacheEntryRemovedException ignore) {
info("Entry was removed, will retry");
return false;
}
}
}, 10_000);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGlobalClearAll() throws Exception {
globalClearAll(false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGlobalClearAllAsyncOld() throws Exception {
globalClearAll(true, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGlobalClearAllAsync() throws Exception {
globalClearAll(true, false);
}
/**
* @param async If {@code true} uses async method.
* @param oldAsync Use old async API.
* @throws Exception If failed.
*/
protected void globalClearAll(boolean async, boolean oldAsync) throws Exception {
// Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries
// because some of them were blocked due to having readers.
for (int i = 0; i < gridCount(); i++) {
for (String key : primaryKeysForCache(jcache(i), 3, 100_000))
jcache(i).put(key, 1);
}
if (async) {
if (oldAsync) {
IgniteCache<String, Integer> asyncCache = jcache().withAsync();
asyncCache.clear();
asyncCache.future().get();
}
else
jcache().clearAsync().get();
}
else
jcache().clear();
for (int i = 0; i < gridCount(); i++)
assert jcache(i).localSize() == 0;
}
/**
* @throws Exception In case of error.
*/
@SuppressWarnings("BusyWait")
@Test
public void testLockUnlock() throws Exception {
if (lockingEnabled()) {
final CountDownLatch lockCnt = new CountDownLatch(1);
final CountDownLatch unlockCnt = new CountDownLatch(1);
grid(0).events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
switch (evt.type()) {
case EVT_CACHE_OBJECT_LOCKED:
lockCnt.countDown();
break;
case EVT_CACHE_OBJECT_UNLOCKED:
unlockCnt.countDown();
break;
}
return true;
}
}, EVT_CACHE_OBJECT_LOCKED, EVT_CACHE_OBJECT_UNLOCKED);
IgniteCache<String, Integer> cache = jcache();
String key = primaryKeysForCache(cache, 1).get(0);
cache.put(key, 1);
assert !cache.isLocalLocked(key, false);
Lock lock = cache.lock(key);
lock.lock();
try {
lockCnt.await();
assert cache.isLocalLocked(key, false);
}
finally {
lock.unlock();
}
unlockCnt.await();
for (int i = 0; i < 100; i++)
if (cache.isLocalLocked(key, false))
Thread.sleep(10);
else
break;
assert !cache.isLocalLocked(key, false);
}
}
/**
* @throws Exception In case of error.
*/
@SuppressWarnings("BusyWait")
@Test
public void testLockUnlockAll() throws Exception {
if (lockingEnabled()) {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
assert !cache.isLocalLocked("key1", false);
assert !cache.isLocalLocked("key2", false);
Lock lock1_2 = cache.lockAll(ImmutableSet.of("key1", "key2"));
lock1_2.lock();
try {
assert cache.isLocalLocked("key1", false);
assert cache.isLocalLocked("key2", false);
}
finally {
lock1_2.unlock();
}
for (int i = 0; i < 100; i++)
if (cache.isLocalLocked("key1", false) || cache.isLocalLocked("key2", false))
Thread.sleep(10);
else
break;
assert !cache.isLocalLocked("key1", false);
assert !cache.isLocalLocked("key2", false);
lock1_2.lock();
try {
assert cache.isLocalLocked("key1", false);
assert cache.isLocalLocked("key2", false);
}
finally {
lock1_2.unlock();
}
for (int i = 0; i < 100; i++)
if (cache.isLocalLocked("key1", false) || cache.isLocalLocked("key2", false))
Thread.sleep(10);
else
break;
assert !cache.isLocalLocked("key1", false);
assert !cache.isLocalLocked("key2", false);
}
}
/**
* @throws Exception In case of error.
*/
@Test
public void testPeek() throws Exception {
Ignite ignite = primaryIgnite("key");
IgniteCache<String, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
assert peek(cache, "key") == null;
cache.put("key", 1);
cache.replace("key", 2);
assertEquals(2, peek(cache, "key").intValue());
}
/**
* @throws Exception If failed.
*/
@Test
public void testPeekTxRemoveOptimistic() throws Exception {
checkPeekTxRemove(OPTIMISTIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPeekTxRemovePessimistic() throws Exception {
checkPeekTxRemove(PESSIMISTIC);
}
/**
* @param concurrency Concurrency.
* @throws Exception If failed.
*/
private void checkPeekTxRemove(TransactionConcurrency concurrency) throws Exception {
if (txShouldBeUsed()) {
Ignite ignite = primaryIgnite("key");
IgniteCache<String, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx();
cache.put("key", 1);
try (Transaction tx = ignite.transactions().txStart(concurrency, READ_COMMITTED)) {
cache.remove("key");
assertNull(cache.get("key")); // localPeek ignores transactions.
assertNotNull(peek(cache, "key")); // localPeek ignores transactions.
tx.commit();
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPeekRemove() throws Exception {
IgniteCache<String, Integer> cache = primaryCache("key");
cache.put("key", 1);
cache.remove("key");
assertNull(peek(cache, "key"));
}
/**
* @throws Exception In case of error.
*/
@Test
public void testEvictExpired() throws Exception {
final IgniteCache<String, Integer> cache = jcache(0);
final String key = primaryKeysForCache(cache, 1).get(0);
cache.put(key, 1);
assertEquals((Integer)1, cache.get(key));
long ttl = 500;
final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));
grid(0).cache(DEFAULT_CACHE_NAME).withExpiryPolicy(expiry).put(key, 1);
final Affinity<String> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
boolean wait = waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
for (int i = 0; i < gridCount(); i++) {
if (peek(jcache(i), key) != null)
return false;
}
return true;
}
}, ttl + 1000);
assertTrue("Failed to wait for entry expiration.", wait);
// Expired entry should not be swapped.
cache.localEvict(Collections.singleton(key));
assertNull(peek(cache, "key"));
assertNull(cache.localPeek(key, ONHEAP));
assertTrue(cache.localSize() == 0);
load(cache, key, true);
for (int i = 0; i < gridCount(); i++) {
if (aff.isPrimary(grid(i).cluster().localNode(), key))
assertEquals((Integer)1, peek(jcache(i), key));
if (aff.isBackup(grid(i).cluster().localNode(), key))
assertEquals((Integer)1, peek(jcache(i), key));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPeekExpired() throws Exception {
final IgniteCache<String, Integer> c = jcache();
final String key = primaryKeysForCache(c, 1).get(0);
info("Using key: " + key);
c.put(key, 1);
assertEquals(Integer.valueOf(1), peek(c, key));
int ttl = 500;
final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));
c.withExpiryPolicy(expiry).put(key, 1);
Thread.sleep(ttl + 100);
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return peek(c, key) == null;
}
}, 2000);
assert peek(c, key) == null;
assert c.localSize() == 0 : "Cache is not empty.";
}
/**
* @throws Exception If failed.
*/
@Test
public void testPeekExpiredTx() throws Exception {
if (txShouldBeUsed()) {
final IgniteCache<String, Integer> c = jcache();
final String key = "1";
int ttl = 500;
try (Transaction tx = grid(0).transactions().txStart()) {
final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));
grid(0).cache(DEFAULT_CACHE_NAME).withExpiryPolicy(expiry).put(key, 1);
tx.commit();
}
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return peek(c, key) == null;
}
}, 2000);
assertNull(peek(c, key));
assert c.localSize() == 0;
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTtlTx() throws Exception {
if (txShouldBeUsed())
checkTtl(true, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTtlNoTx() throws Exception {
checkTtl(false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTtlNoTxOldEntry() throws Exception {
checkTtl(false, true);
}
/**
* @param inTx In tx flag.
* @param oldEntry {@code True} to check TTL on old entry, {@code false} on new.
* @throws Exception If failed.
*/
private void checkTtl(boolean inTx, boolean oldEntry) throws Exception {
int ttl = 4000;
final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));
final IgniteCache<String, Integer> c = jcache();
final String key = primaryKeysForCache(jcache(), 1).get(0);
IgnitePair<Long> entryTtl;
if (oldEntry) {
c.put(key, 1);
entryTtl = entryTtl(fullCache(), key);
assertNotNull(entryTtl.get1());
assertNotNull(entryTtl.get2());
assertEquals((Long)0L, entryTtl.get1());
assertEquals((Long)0L, entryTtl.get2());
}
long startTime = System.currentTimeMillis();
if (inTx) {
// Rollback transaction for the first time.
Transaction tx = transactions().txStart();
try {
jcache().withExpiryPolicy(expiry).put(key, 1);
}
finally {
tx.rollback();
}
if (oldEntry) {
entryTtl = entryTtl(fullCache(), key);
assertEquals((Long)0L, entryTtl.get1());
assertEquals((Long)0L, entryTtl.get2());
}
}
// Now commit transaction and check that ttl and expire time have been saved.
Transaction tx = inTx ? transactions().txStart() : null;
try {
jcache().withExpiryPolicy(expiry).put(key, 1);
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
long[] expireTimes = new long[gridCount()];
for (int i = 0; i < gridCount(); i++) {
if (grid(i).affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(grid(i).localNode(), key)) {
IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);
assertNotNull(curEntryTtl.get1());
assertNotNull(curEntryTtl.get2());
assertTrue(curEntryTtl.get2() > startTime);
expireTimes[i] = curEntryTtl.get2();
}
}
// One more update from the same cache entry to ensure that expire time is shifted forward.
U.sleep(100);
tx = inTx ? transactions().txStart() : null;
try {
jcache().withExpiryPolicy(expiry).put(key, 2);
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
for (int i = 0; i < gridCount(); i++) {
if (grid(i).affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(grid(i).localNode(), key)) {
IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);
assertNotNull(curEntryTtl.get1());
assertNotNull(curEntryTtl.get2());
assertTrue(curEntryTtl.get2() > startTime);
expireTimes[i] = curEntryTtl.get2();
}
}
// And one more direct update to ensure that expire time is shifted forward.
U.sleep(100);
tx = inTx ? transactions().txStart() : null;
try {
jcache().withExpiryPolicy(expiry).put(key, 3);
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
for (int i = 0; i < gridCount(); i++) {
if (grid(i).affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(grid(i).localNode(), key)) {
IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);
assertNotNull(curEntryTtl.get1());
assertNotNull(curEntryTtl.get2());
assertTrue(curEntryTtl.get2() > startTime);
expireTimes[i] = curEntryTtl.get2();
}
}
// And one more update to ensure that ttl is not changed and expire time is not shifted forward.
U.sleep(100);
log.info("Put 4");
tx = inTx ? transactions().txStart() : null;
try {
jcache().put(key, 4);
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
log.info("Put 4 done");
for (int i = 0; i < gridCount(); i++) {
if (grid(i).affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(grid(i).localNode(), key)) {
IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);
assertNotNull(curEntryTtl.get1());
assertNotNull(curEntryTtl.get2());
assertEquals(expireTimes[i], (long)curEntryTtl.get2());
}
}
// Avoid reloading from store.
storeStgy.removeFromStore(key);
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
@Override public boolean applyx() {
try {
Integer val = c.get(key);
if (val != null) {
info("Value is in cache [key=" + key + ", val=" + val + ']');
return false;
}
// Get "cache" field from GridCacheProxyImpl.
GridCacheAdapter c0 = cacheFromCtx(c);
if (!c0.context().deferredDelete()) {
GridCacheEntryEx e0 = c0.peekEx(key);
return e0 == null || (e0.rawGet() == null && e0.valueBytes() == null);
}
else
return true;
}
catch (GridCacheEntryRemovedException e) {
throw new RuntimeException(e);
}
}
}, Math.min(ttl * 10, getTestTimeout())));
IgniteCache fullCache = fullCache();
if (!isMultiJvmObject(fullCache)) {
GridCacheAdapter internalCache = internalCache(fullCache);
if (internalCache.isLocal())
return;
}
assert c.get(key) == null;
// Ensure that old TTL and expire time are not longer "visible".
entryTtl = entryTtl(fullCache(), key);
assertNotNull(entryTtl.get1());
assertNotNull(entryTtl.get2());
assertEquals(0, (long)entryTtl.get1());
assertEquals(0, (long)entryTtl.get2());
// Ensure that next update will not pick old expire time.
tx = inTx ? transactions().txStart() : null;
try {
jcache().put(key, 10);
if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.close();
}
U.sleep(2000);
entryTtl = entryTtl(fullCache(), key);
assertEquals((Integer)10, c.get(key));
assertNotNull(entryTtl.get1());
assertNotNull(entryTtl.get2());
assertEquals(0, (long)entryTtl.get1());
assertEquals(0, (long)entryTtl.get2());
}
/**
* @throws Exception In case of error.
*/
@Test(timeout = 10050000)
public void testLocalEvict() throws Exception {
IgniteCache<String, Integer> cache = jcache();
List<String> keys = primaryKeysForCache(cache, 3);
String key1 = keys.get(0);
String key2 = keys.get(1);
String key3 = keys.get(2);
cache.put(key1, 1);
cache.put(key2, 2);
cache.put(key3, 3);
assert peek(cache, key1) == 1;
assert peek(cache, key2) == 2;
assert peek(cache, key3) == 3;
cache.localEvict(F.asList(key1, key2));
assert cache.localPeek(key1, ONHEAP) == null;
assert cache.localPeek(key2, ONHEAP) == null;
assert peek(cache, key3) == 3;
loadAll(cache, ImmutableSet.of(key1, key2), true);
Affinity<String> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
for (int i = 0; i < gridCount(); i++) {
if (aff.isPrimaryOrBackup(grid(i).cluster().localNode(), key1))
assertEquals("node name = " + grid(i).name(), (Integer)1, peek(jcache(i), key1));
if (aff.isPrimaryOrBackup(grid(i).cluster().localNode(), key2))
assertEquals((Integer)2, peek(jcache(i), key2));
if (aff.isPrimaryOrBackup(grid(i).cluster().localNode(), key3))
assertEquals((Integer)3, peek(jcache(i), key3));
}
}
/**
* JUnit.
*/
@Test
public void testCacheProxy() {
IgniteCache<String, Integer> cache = jcache();
assert cache instanceof IgniteCacheProxy;
}
/**
* @throws Exception If failed.
*/
@Test
public void testCompactExpired() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
final String key = F.first(primaryKeysForCache(cache, 1));
cache.put(key, 1);
long ttl = 500;
final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));
grid(0).cache(DEFAULT_CACHE_NAME).withExpiryPolicy(expiry).put(key, 1);
waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return cache.localPeek(key) == null;
}
}, ttl + 1000);
// Peek will actually remove entry from cache.
assertNull(cache.localPeek(key));
assertEquals(0, cache.localSize());
// Clear readers, if any.
cache.remove(key);
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
@Test
public void testOptimisticTxMissingKey() throws Exception {
if (txShouldBeUsed()) {
try (Transaction tx = transactions().txStart(OPTIMISTIC, READ_COMMITTED)) {
// Remove missing key.
assertFalse(jcache().remove(UUID.randomUUID().toString()));
tx.commit();
}
}
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
@Test
public void testOptimisticTxMissingKeyNoCommit() throws Exception {
if (txShouldBeUsed()) {
try (Transaction tx = transactions().txStart(OPTIMISTIC, READ_COMMITTED)) {
// Remove missing key.
assertFalse(jcache().remove(UUID.randomUUID().toString()));
tx.setRollbackOnly();
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticTxReadCommittedInTx() throws Exception {
checkRemovexInTx(OPTIMISTIC, READ_COMMITTED);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticTxRepeatableReadInTx() throws Exception {
checkRemovexInTx(OPTIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticTxReadCommittedInTx() throws Exception {
checkRemovexInTx(PESSIMISTIC, READ_COMMITTED);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticTxRepeatableReadInTx() throws Exception {
checkRemovexInTx(PESSIMISTIC, REPEATABLE_READ);
}
/**
* @param concurrency Concurrency.
* @param isolation Isolation.
* @throws Exception If failed.
*/
private void checkRemovexInTx(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
if (txShouldBeUsed()) {
final int cnt = 10;
CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
for (int i = 0; i < cnt; i++)
cache.put("key" + i, i);
}
});
CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
for (int i = 0; i < cnt; i++)
assertEquals(new Integer(i), cache.get("key" + i));
}
});
CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
for (int i = 0; i < cnt; i++)
assertTrue("Failed to remove key: key" + i, cache.remove("key" + i));
}
});
CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
for (int i = 0; i < cnt; i++)
assertNull(cache.get("key" + i));
}
});
}
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
@Test
public void testPessimisticTxMissingKey() throws Exception {
if (txShouldBeUsed()) {
try (Transaction tx = transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
// Remove missing key.
assertFalse(jcache().remove(UUID.randomUUID().toString()));
tx.commit();
}
}
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
@Test
public void testPessimisticTxMissingKeyNoCommit() throws Exception {
if (txShouldBeUsed()) {
try (Transaction tx = transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
// Remove missing key.
assertFalse(jcache().remove(UUID.randomUUID().toString()));
tx.setRollbackOnly();
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticTxRepeatableRead() throws Exception {
if (txShouldBeUsed()) {
try (Transaction ignored = transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
jcache().put("key", 1);
assert jcache().get("key") == 1;
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticTxRepeatableReadOnUpdate() throws Exception {
if (txShouldBeUsed()) {
try (Transaction ignored = transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
jcache().put("key", 1);
assert jcache().getAndPut("key", 2) == 1;
}
}
}
/**
* @throws Exception In case of error.
*/
@Test
public void testToMap() throws Exception {
IgniteCache<String, Integer> cache = jcache();
cache.put("key1", 1);
cache.put("key2", 2);
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < gridCount(); i++) {
for (Cache.Entry<String, Integer> entry : jcache(i))
map.put(entry.getKey(), entry.getValue());
}
assert map.size() == 2;
assert map.get("key1") == 1;
assert map.get("key2") == 2;
}
/**
* @param keys Expected keys.
* @throws Exception If failed.
*/
protected void checkSize(final Collection<String> keys) throws Exception {
if (nearEnabled())
assertEquals(keys.size(), jcache().localSize(CachePeekMode.ALL));
else {
for (int i = 0; i < gridCount(); i++)
executeOnLocalOrRemoteJvm(i, new CheckEntriesTask(keys));
}
}
/**
* @param keys Expected keys.
* @throws Exception If failed.
*/
protected void checkKeySize(final Collection<String> keys) throws Exception {
if (nearEnabled())
assertEquals("Invalid key size: " + jcache().localSize(ALL),
keys.size(), jcache().localSize(ALL));
else {
for (int i = 0; i < gridCount(); i++)
executeOnLocalOrRemoteJvm(i, new CheckKeySizeTask(keys));
}
}
/**
* @param exp Expected value.
* @param key Key.
* @throws Exception If failed.
*/
private void checkContainsKey(boolean exp, String key) throws Exception {
if (nearEnabled())
assertEquals(exp, jcache().containsKey(key));
else {
boolean contains = false;
for (int i = 0; i < gridCount(); i++)
if (containsKey(jcache(i), key)) {
contains = true;
break;
}
assertEquals("Key: " + key, exp, contains);
}
}
/**
* @param key Key.
* @return Ignite instance for primary node.
*/
protected Ignite primaryIgnite(String key) {
ClusterNode node = grid(0).affinity(DEFAULT_CACHE_NAME).mapKeyToNode(key);
if (node == null)
throw new IgniteException("Failed to find primary node.");
UUID nodeId = node.id();
for (int i = 0; i < gridCount(); i++) {
if (grid(i).localNode().id().equals(nodeId))
return ignite(i);
}
throw new IgniteException("Failed to find primary node.");
}
/**
* @param key Key.
* @return Cache.
*/
protected IgniteCache<String, Integer> primaryCache(String key) {
return primaryIgnite(key).cache(DEFAULT_CACHE_NAME);
}
/**
* @param cache Cache.
* @param cnt Keys count.
* @param startFrom Begin value ofthe key.
* @return Collection of keys for which given cache is primary.
*/
protected List<String> primaryKeysForCache(IgniteCache<String, Integer> cache, int cnt, int startFrom) {
return executeOnLocalOrRemoteJvm(cache, new CheckPrimaryKeysTask(startFrom, cnt));
}
/**
* @param cache Cache.
* @param cnt Keys count.
* @return Collection of keys for which given cache is primary.
* @throws IgniteCheckedException If failed.
*/
protected List<String> primaryKeysForCache(IgniteCache<String, Integer> cache, int cnt)
throws IgniteCheckedException {
return primaryKeysForCache(cache, cnt, 1);
}
/**
* @param cache Cache.
* @param key Entry key.
* @return Pair [ttl, expireTime]; both values null if entry not found
*/
protected IgnitePair<Long> entryTtl(IgniteCache cache, String key) {
return executeOnLocalOrRemoteJvm(cache, new EntryTtlTask(key, true));
}
/**
* @throws Exception If failed.
*/
@Test
public void testIterator() throws Exception {
IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
final int KEYS = 1000;
for (int i = 0; i < KEYS; i++)
cache.put(i, i);
// Try to initialize readers in case when near cache is enabled.
for (int i = 0; i < gridCount(); i++) {
cache = grid(i).cache(DEFAULT_CACHE_NAME);
for (int k = 0; k < KEYS; k++)
assertEquals((Object)k, cache.get(k));
}
int cnt = 0;
for (Cache.Entry e : cache)
cnt++;
assertEquals(KEYS, cnt);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIgniteCacheIterator() throws Exception {
IgniteCache<String, Integer> cache = jcache(0);
Iterator<Cache.Entry<String, Integer>> it = cache.iterator();
boolean hasNext = it.hasNext();
if (hasNext)
assertFalse("Cache has value: " + it.next(), hasNext);
final int SIZE = 10_000;
Map<String, Integer> entries = new HashMap<>();
Map<String, Integer> putMap = new HashMap<>();
for (int i = 0; i < SIZE; ++i) {
String key = Integer.toString(i);
putMap.put(key, i);
entries.put(key, i);
if (putMap.size() == 500) {
cache.putAll(putMap);
info("Puts finished: " + (i + 1));
putMap.clear();
}
}
cache.putAll(putMap);
checkIteratorHasNext();
checkIteratorCache(entries);
checkIteratorRemove(cache, entries);
checkIteratorEmpty(cache);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIteratorLeakOnCancelCursor() throws Exception {
IgniteCache<String, Integer> cache = jcache(0);
final int SIZE = 10_000;
Map<String, Integer> putMap = new HashMap<>();
for (int i = 0; i < SIZE; ++i) {
String key = Integer.toString(i);
putMap.put(key, i);
if (putMap.size() == 500) {
cache.putAll(putMap);
info("Puts finished: " + (i + 1));
putMap.clear();
}
}
cache.putAll(putMap);
QueryCursor<Cache.Entry<String, Integer>> cur = cache.query(new ScanQuery<String, Integer>());
cur.iterator().next();
cur.close();
waitForIteratorsCleared(cache, 10);
}
/**
* If hasNext() is called repeatedly, it should return the same result.
*/
private void checkIteratorHasNext() {
Iterator<Cache.Entry<String, Integer>> iter = jcache(0).iterator();
assertEquals(iter.hasNext(), iter.hasNext());
while (iter.hasNext())
iter.next();
assertFalse(iter.hasNext());
}
/**
* @param cache Cache.
* @param entries Expected entries in the cache.
*/
private void checkIteratorRemove(IgniteCache<String, Integer> cache, Map<String, Integer> entries) {
// Check that we can remove element.
String rmvKey = Integer.toString(5);
removeCacheIterator(cache, rmvKey);
entries.remove(rmvKey);
assertFalse(cache.containsKey(rmvKey));
assertNull(cache.get(rmvKey));
checkIteratorCache(entries);
// Check that we cannot call Iterator.remove() without next().
final Iterator<Cache.Entry<String, Integer>> iter = jcache(0).iterator();
assertTrue(iter.hasNext());
iter.next();
iter.remove();
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Void call() throws Exception {
iter.remove();
return null;
}
}, IllegalStateException.class, null);
}
/**
* @param cache Cache.
* @param key Key to remove.
*/
private void removeCacheIterator(IgniteCache<String, Integer> cache, String key) {
Iterator<Cache.Entry<String, Integer>> iter = cache.iterator();
int delCnt = 0;
while (iter.hasNext()) {
Cache.Entry<String, Integer> cur = iter.next();
if (cur.getKey().equals(key)) {
iter.remove();
delCnt++;
}
}
assertEquals(1, delCnt);
}
/**
* @param entries Expected entries in the cache.
*/
private void checkIteratorCache(Map<String, Integer> entries) {
for (int i = 0; i < gridCount(); ++i)
checkIteratorCache(jcache(i), entries);
}
/**
* @param cache Cache.
* @param entries Expected entries in the cache.
*/
private void checkIteratorCache(IgniteCache<String, Integer> cache, Map<String, Integer> entries) {
Iterator<Cache.Entry<String, Integer>> iter = cache.iterator();
int cnt = 0;
while (iter.hasNext()) {
Cache.Entry<String, Integer> cur = iter.next();
assertTrue(entries.containsKey(cur.getKey()));
assertEquals(entries.get(cur.getKey()), cur.getValue());
cnt++;
}
assertEquals(entries.size(), cnt);
}
/**
* Checks iterators are cleared.
*/
private void checkIteratorsCleared() {
for (int j = 0; j < gridCount(); j++)
executeOnLocalOrRemoteJvm(j, new CheckIteratorTask());
}
/**
* Checks iterators are cleared.
*/
private void waitForIteratorsCleared(IgniteCache<String, Integer> cache, int secs) throws InterruptedException {
for (int i = 0; i < secs; i++) {
try {
cache.size(); // Trigger weak queue poll.
checkIteratorsCleared();
}
catch (Throwable t) {
// If AssertionError is in the chain, assume we need to wait and retry.
if (!X.hasCause(t, AssertionError.class))
throw t;
if (i == 9) {
for (int j = 0; j < gridCount(); j++)
executeOnLocalOrRemoteJvm(j, new PrintIteratorStateTask());
throw t;
}
log.info("Iterators not cleared, will wait");
Thread.sleep(1000);
}
}
}
/**
* Checks iterators are cleared after using.
*
* @param cache Cache.
* @throws Exception If failed.
*/
private void checkIteratorEmpty(IgniteCache<String, Integer> cache) throws Exception {
int cnt = 5;
for (int i = 0; i < cnt; ++i) {
Iterator<Cache.Entry<String, Integer>> iter = cache.iterator();
iter.next();
assert iter.hasNext();
}
System.gc();
waitForIteratorsCleared(cache, 10);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLocalClearKey() throws Exception {
addKeys();
String keyToRmv = "key" + 25;
Ignite g = primaryIgnite(keyToRmv);
g.<String, Integer>cache(DEFAULT_CACHE_NAME).localClear(keyToRmv);
checkLocalRemovedKey(keyToRmv);
g.<String, Integer>cache(DEFAULT_CACHE_NAME).put(keyToRmv, 1);
String keyToEvict = "key" + 30;
g = primaryIgnite(keyToEvict);
g.<String, Integer>cache(DEFAULT_CACHE_NAME).localEvict(Collections.singleton(keyToEvict));
g.<String, Integer>cache(DEFAULT_CACHE_NAME).localClear(keyToEvict);
checkLocalRemovedKey(keyToEvict);
}
/**
* @param keyToRmv Removed key.
*/
protected void checkLocalRemovedKey(String keyToRmv) {
for (int i = 0; i < 500; ++i) {
String key = "key" + i;
boolean found = primaryIgnite(key).cache(DEFAULT_CACHE_NAME).localPeek(key) != null;
if (keyToRmv.equals(key)) {
Collection<ClusterNode> nodes = grid(0).affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key);
for (int j = 0; j < gridCount(); ++j) {
if (nodes.contains(grid(j).localNode()) && grid(j) != primaryIgnite(key))
assertTrue("Not found on backup removed key ", grid(j).cache(DEFAULT_CACHE_NAME).localPeek(key) != null);
}
assertFalse("Found removed key " + key, found);
}
else
assertTrue("Not found key " + key, found);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testLocalClearKeys() throws Exception {
Map<String, List<String>> keys = addKeys();
Ignite g = grid(0);
Set<String> keysToRmv = new HashSet<>();
for (int i = 0; i < gridCount(); ++i) {
List<String> gridKeys = keys.get(grid(i).name());
if (gridKeys.size() > 2) {
keysToRmv.add(gridKeys.get(0));
keysToRmv.add(gridKeys.get(1));
g = grid(i);
break;
}
}
assert keysToRmv.size() > 1;
info("Will clear keys on node: " + g.cluster().localNode().id());
g.<String, Integer>cache(DEFAULT_CACHE_NAME).localClearAll(keysToRmv);
for (int i = 0; i < 500; ++i) {
String key = "key" + i;
Ignite ignite = primaryIgnite(key);
boolean found = ignite.cache(DEFAULT_CACHE_NAME).localPeek(key) != null;
if (keysToRmv.contains(key))
assertFalse("Found removed key [key=" + key + ", node=" + ignite.cluster().localNode().id() + ']',
found);
else
assertTrue("Not found key " + key, found);
}
}
/**
* Add 500 keys to cache only on primaries nodes.
*
* @return Map grid's name to its primary keys.
*/
protected Map<String, List<String>> addKeys() {
// Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries
// because some of them were blocked due to having readers.
Map<String, List<String>> keys = new HashMap<>();
for (int i = 0; i < gridCount(); ++i)
keys.put(grid(i).name(), new ArrayList<String>());
for (int i = 0; i < 500; ++i) {
String key = "key" + i;
Ignite g = primaryIgnite(key);
g.cache(DEFAULT_CACHE_NAME).put(key, "value" + i);
keys.get(g.name()).add(key);
}
return keys;
}
/**
* @throws Exception If failed.
*/
@Test
public void testGlobalClearKey() throws Exception {
testGlobalClearKey(false, Arrays.asList("key25"), false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGlobalClearKeyAsyncOld() throws Exception {
testGlobalClearKey(true, Arrays.asList("key25"), true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGlobalClearKeyAsync() throws Exception {
testGlobalClearKey(true, Arrays.asList("key25"), false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGlobalClearKeys() throws Exception {
testGlobalClearKey(false, Arrays.asList("key25", "key100", "key150"), false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGlobalClearKeysAsyncOld() throws Exception {
testGlobalClearKey(true, Arrays.asList("key25", "key100", "key150"), true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGlobalClearKeysAsync() throws Exception {
testGlobalClearKey(true, Arrays.asList("key25", "key100", "key150"), false);
}
/**
* @param async If {@code true} uses async method.
* @param keysToRmv Keys to remove.
* @param oldAsync Use old async API.
* @throws Exception If failed.
*/
protected void testGlobalClearKey(boolean async, Collection<String> keysToRmv, boolean oldAsync) throws Exception {
// Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries
// because some of them were blocked due to having readers.
for (int i = 0; i < 500; ++i) {
String key = "key" + i;
Ignite g = primaryIgnite(key);
g.cache(DEFAULT_CACHE_NAME).put(key, "value" + i);
}
if (async) {
if (oldAsync) {
IgniteCache<String, Integer> asyncCache = jcache().withAsync();
if (keysToRmv.size() == 1)
asyncCache.clear(F.first(keysToRmv));
else
asyncCache.clearAll(new HashSet<>(keysToRmv));
asyncCache.future().get();
}
else {
if (keysToRmv.size() == 1)
jcache().clearAsync(F.first(keysToRmv)).get();
else
jcache().clearAllAsync(new HashSet<>(keysToRmv)).get();
}
}
else {
if (keysToRmv.size() == 1)
jcache().clear(F.first(keysToRmv));
else
jcache().clearAll(new HashSet<>(keysToRmv));
}
for (int i = 0; i < 500; ++i) {
String key = "key" + i;
boolean found = false;
for (int j = 0; j < gridCount(); j++) {
if (jcache(j).localPeek(key) != null)
found = true;
}
if (!keysToRmv.contains(key))
assertTrue("Not found key " + key, found);
else
assertFalse("Found removed key " + key, found);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testWithSkipStore() throws Exception {
IgniteCache<String, Integer> cache = jcache(0);
IgniteCache<String, Integer> cacheSkipStore = cache.withSkipStore();
List<String> keys = primaryKeysForCache(cache, 10);
for (int i = 0; i < keys.size(); ++i)
storeStgy.putToStore(keys.get(i), i);
assertFalse(cacheSkipStore.iterator().hasNext());
for (String key : keys) {
assertNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
}
for (String key : keys) {
cacheSkipStore.remove(key);
assertNotNull(cache.get(key));
}
cache.removeAll(new HashSet<>(keys));
for (String key : keys)
assertNull(cache.get(key));
final int KEYS = 250;
// Put/remove data from multiple nodes.
keys = new ArrayList<>(KEYS);
for (int i = 0; i < KEYS; i++)
keys.add("key_" + i);
for (int i = 0; i < keys.size(); ++i)
cache.put(keys.get(i), i);
for (int i = 0; i < keys.size(); ++i) {
String key = keys.get(i);
assertNotNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertEquals(i, storeStgy.getFromStore(key));
}
for (int i = 0; i < keys.size(); ++i) {
String key = keys.get(i);
Integer val1 = -1;
cacheSkipStore.put(key, val1);
assertEquals(i, storeStgy.getFromStore(key));
assertEquals(val1, cacheSkipStore.get(key));
Integer val2 = -2;
assertEquals(val1, cacheSkipStore.invoke(key, new SetValueProcessor(val2)));
assertEquals(i, storeStgy.getFromStore(key));
assertEquals(val2, cacheSkipStore.get(key));
}
for (String key : keys) {
cacheSkipStore.remove(key);
assertNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertTrue(storeStgy.isInStore(key));
}
for (String key : keys) {
cache.remove(key);
assertNull(cacheSkipStore.get(key));
assertNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
storeStgy.putToStore(key, 0);
Integer val = -1;
assertNull(cacheSkipStore.invoke(key, new SetValueProcessor(val)));
assertEquals(0, storeStgy.getFromStore(key));
assertEquals(val, cacheSkipStore.get(key));
cache.remove(key);
storeStgy.putToStore(key, 0);
assertTrue(cacheSkipStore.putIfAbsent(key, val));
assertEquals(val, cacheSkipStore.get(key));
assertEquals(0, storeStgy.getFromStore(key));
cache.remove(key);
storeStgy.putToStore(key, 0);
assertNull(cacheSkipStore.getAndPut(key, val));
assertEquals(val, cacheSkipStore.get(key));
assertEquals(0, storeStgy.getFromStore(key));
cache.remove(key);
}
assertFalse(cacheSkipStore.iterator().hasNext());
assertTrue(storeStgy.getStoreSize() == 0);
assertTrue(cache.size(ALL) == 0);
// putAll/removeAll from multiple nodes.
Map<String, Integer> data = new LinkedHashMap<>();
for (int i = 0; i < keys.size(); i++)
data.put(keys.get(i), i);
cacheSkipStore.putAll(data);
for (String key : keys) {
assertNotNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
}
cache.putAll(data);
for (String key : keys) {
assertNotNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertTrue(storeStgy.isInStore(key));
}
cacheSkipStore.removeAll(data.keySet());
for (String key : keys) {
assertNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertTrue(storeStgy.isInStore(key));
}
cacheSkipStore.putAll(data);
for (String key : keys) {
assertNotNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertTrue(storeStgy.isInStore(key));
}
cacheSkipStore.removeAll(data.keySet());
for (String key : keys) {
assertNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertTrue(storeStgy.isInStore(key));
}
cache.removeAll(data.keySet());
for (String key : keys) {
assertNull(cacheSkipStore.get(key));
assertNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
}
assertTrue(storeStgy.getStoreSize() == 0);
// Miscellaneous checks.
String newKey = "New key";
assertFalse(storeStgy.isInStore(newKey));
cacheSkipStore.put(newKey, 1);
assertFalse(storeStgy.isInStore(newKey));
cache.put(newKey, 1);
assertTrue(storeStgy.isInStore(newKey));
Iterator<Cache.Entry<String, Integer>> it = cacheSkipStore.iterator();
assertTrue(it.hasNext());
Cache.Entry<String, Integer> entry = it.next();
String rmvKey = entry.getKey();
assertTrue(storeStgy.isInStore(rmvKey));
it.remove();
assertNull(cacheSkipStore.get(rmvKey));
assertTrue(storeStgy.isInStore(rmvKey));
assertTrue(cache.size(ALL) == 0);
assertTrue(cacheSkipStore.size(ALL) == 0);
cache.remove(rmvKey);
assertTrue(storeStgy.getStoreSize() == 0);
}
/**
* @throws Exception If failed.
*/
@Test
public void testWithSkipStoreRemoveAll() throws Exception {
if (atomicityMode() == TRANSACTIONAL || (atomicityMode() == ATOMIC && nearEnabled())) // TODO IGNITE-373.
return;
IgniteCache<String, Integer> cache = jcache(0);
IgniteCache<String, Integer> cacheSkipStore = cache.withSkipStore();
Map<String, Integer> data = new HashMap<>();
for (int i = 0; i < 100; i++)
data.put("key_" + i, i);
cache.putAll(data);
for (String key : data.keySet()) {
assertNotNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertTrue(storeStgy.isInStore(key));
}
cacheSkipStore.removeAll();
for (String key : data.keySet()) {
assertNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertTrue(storeStgy.isInStore(key));
}
cache.removeAll();
for (String key : data.keySet()) {
assertNull(cacheSkipStore.get(key));
assertNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testWithSkipStoreTx() throws Exception {
if (txShouldBeUsed()) {
IgniteCache<String, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
IgniteCache<String, Integer> cacheSkipStore = cache.withSkipStore();
final int KEYS = 250;
// Put/remove data from multiple nodes.
List<String> keys = new ArrayList<>(KEYS);
for (int i = 0; i < KEYS; i++)
keys.add("key_" + i);
Map<String, Integer> data = new LinkedHashMap<>();
for (int i = 0; i < keys.size(); i++)
data.put(keys.get(i), i);
checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, OPTIMISTIC, READ_COMMITTED);
checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, OPTIMISTIC, REPEATABLE_READ);
checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, OPTIMISTIC, SERIALIZABLE);
checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, PESSIMISTIC, READ_COMMITTED);
checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, PESSIMISTIC, REPEATABLE_READ);
checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, PESSIMISTIC, SERIALIZABLE);
}
}
/**
* @param cache Cache instance.
* @param cacheSkipStore Cache skip store projection.
* @param data Data set.
* @param keys Keys list.
* @param txConcurrency Concurrency mode.
* @param txIsolation Isolation mode.
* @throws Exception If failed.
*/
private void checkSkipStoreWithTransaction(IgniteCache<String, Integer> cache,
IgniteCache<String, Integer> cacheSkipStore,
Map<String, Integer> data,
List<String> keys,
TransactionConcurrency txConcurrency,
TransactionIsolation txIsolation)
throws Exception {
info("Test tx skip store [concurrency=" + txConcurrency + ", isolation=" + txIsolation + ']');
cache.removeAll(data.keySet());
checkEmpty(cache, cacheSkipStore);
IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
Integer val = -1;
// Several put check.
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
for (String key : keys)
cacheSkipStore.put(key, val);
for (String key : keys) {
assertEquals(val, cacheSkipStore.get(key));
assertEquals(val, cache.get(key));
assertFalse(storeStgy.isInStore(key));
}
tx.commit();
}
for (String key : keys) {
assertEquals(val, cacheSkipStore.get(key));
assertEquals(val, cache.get(key));
assertFalse(storeStgy.isInStore(key));
}
assertEquals(0, storeStgy.getStoreSize());
// cacheSkipStore putAll(..)/removeAll(..) check.
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
cacheSkipStore.putAll(data);
tx.commit();
}
for (String key : keys) {
val = data.get(key);
assertEquals(val, cacheSkipStore.get(key));
assertEquals(val, cache.get(key));
assertFalse(storeStgy.isInStore(key));
}
storeStgy.putAllToStore(data);
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
cacheSkipStore.removeAll(data.keySet());
tx.commit();
}
for (String key : keys) {
assertNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertTrue(storeStgy.isInStore(key));
cache.remove(key);
}
assertTrue(storeStgy.getStoreSize() == 0);
// cache putAll(..)/removeAll(..) check.
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
cache.putAll(data);
for (String key : keys) {
assertNotNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
}
cache.removeAll(data.keySet());
for (String key : keys) {
assertNull(cacheSkipStore.get(key));
assertNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
}
tx.commit();
}
assertTrue(storeStgy.getStoreSize() == 0);
// putAll(..) from both cacheSkipStore and cache.
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
Map<String, Integer> subMap = new HashMap<>();
for (int i = 0; i < keys.size() / 2; i++)
subMap.put(keys.get(i), i);
cacheSkipStore.putAll(subMap);
subMap.clear();
for (int i = keys.size() / 2; i < keys.size(); i++)
subMap.put(keys.get(i), i);
cache.putAll(subMap);
for (String key : keys) {
assertNotNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
}
tx.commit();
}
for (int i = 0; i < keys.size() / 2; i++) {
String key = keys.get(i);
assertNotNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
}
for (int i = keys.size() / 2; i < keys.size(); i++) {
String key = keys.get(i);
assertNotNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertTrue(storeStgy.isInStore(key));
}
cache.removeAll(data.keySet());
for (String key : keys) {
assertNull(cacheSkipStore.get(key));
assertNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
}
// Check that read-through is disabled when cacheSkipStore is used.
for (int i = 0; i < keys.size(); i++)
storeStgy.putToStore(keys.get(i), i);
assertTrue(cacheSkipStore.size(ALL) == 0);
assertTrue(cache.size(ALL) == 0);
assertTrue(storeStgy.getStoreSize() != 0);
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
assertTrue(cacheSkipStore.getAll(data.keySet()).isEmpty());
for (String key : keys) {
assertNull(cacheSkipStore.get(key));
if (txIsolation == READ_COMMITTED) {
assertNotNull(cache.get(key));
assertNotNull(cacheSkipStore.get(key));
}
}
tx.commit();
}
cache.removeAll(data.keySet());
val = -1;
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
for (String key : data.keySet()) {
storeStgy.putToStore(key, 0);
assertNull(cacheSkipStore.invoke(key, new SetValueProcessor(val)));
}
tx.commit();
}
for (String key : data.keySet()) {
assertEquals(0, storeStgy.getFromStore(key));
assertEquals(val, cacheSkipStore.get(key));
assertEquals(val, cache.get(key));
}
cache.removeAll(data.keySet());
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
for (String key : data.keySet()) {
storeStgy.putToStore(key, 0);
assertTrue(cacheSkipStore.putIfAbsent(key, val));
}
tx.commit();
}
for (String key : data.keySet()) {
assertEquals(0, storeStgy.getFromStore(key));
assertEquals(val, cacheSkipStore.get(key));
assertEquals(val, cache.get(key));
}
cache.removeAll(data.keySet());
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
for (String key : data.keySet()) {
storeStgy.putToStore(key, 0);
assertNull(cacheSkipStore.getAndPut(key, val));
}
tx.commit();
}
for (String key : data.keySet()) {
assertEquals(0, storeStgy.getFromStore(key));
assertEquals(val, cacheSkipStore.get(key));
assertEquals(val, cache.get(key));
}
cache.removeAll(data.keySet());
checkEmpty(cache, cacheSkipStore);
}
/**
* @param cache Cache instance.
* @param cacheSkipStore Cache skip store projection.
* @throws Exception If failed.
*/
private void checkEmpty(IgniteCache<String, Integer> cache, IgniteCache<String, Integer> cacheSkipStore)
throws Exception {
assertTrue(cache.size(ALL) == 0);
assertTrue(cacheSkipStore.size(ALL) == 0);
assertTrue(storeStgy.getStoreSize() == 0);
}
/**
* @return Cache start mode.
*/
protected CacheStartMode cacheStartType() {
String mode = System.getProperty("cache.start.mode");
if (CacheStartMode.NODES_THEN_CACHES.name().equalsIgnoreCase(mode))
return CacheStartMode.NODES_THEN_CACHES;
if (CacheStartMode.ONE_BY_ONE.name().equalsIgnoreCase(mode))
return CacheStartMode.ONE_BY_ONE;
return CacheStartMode.STATIC;
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetOutTx() throws Exception {
checkGetOutTx(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetOutTxAsync() throws Exception {
checkGetOutTx(true);
}
/**
* @throws Exception If failed.
*/
private void checkGetOutTx(boolean async) throws Exception {
final AtomicInteger lockEvtCnt = new AtomicInteger();
IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
lockEvtCnt.incrementAndGet();
return true;
}
};
try {
IgniteCache<String, Integer> cache = jcache(0);
List<String> keys = primaryKeysForCache(cache, 2);
assertEquals(2, keys.size());
cache.put(keys.get(0), 0);
cache.put(keys.get(1), 1);
grid(0).events().localListen(lsnr, EVT_CACHE_OBJECT_LOCKED, EVT_CACHE_OBJECT_UNLOCKED);
try (Transaction tx = transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
Integer val0;
if (async)
val0 = cache.getAsync(keys.get(0)).get();
else
val0 = cache.get(keys.get(0));
assertEquals(0, val0.intValue());
Map<String, Integer> allOutTx;
if (async)
allOutTx = cache.getAllOutTxAsync(F.asSet(keys.get(1))).get();
else
allOutTx = cache.getAllOutTx(F.asSet(keys.get(1)));
assertEquals(1, allOutTx.size());
assertTrue(allOutTx.containsKey(keys.get(1)));
assertEquals(1, allOutTx.get(keys.get(1)).intValue());
}
assertTrue(GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
info("Lock event count: " + lockEvtCnt.get());
if (atomicityMode() == ATOMIC)
return lockEvtCnt.get() == 0;
if (cacheMode() == PARTITIONED && nearEnabled()) {
if (!grid(0).configuration().isClientMode())
return lockEvtCnt.get() == 4;
}
return lockEvtCnt.get() == 2;
}
}, 15000));
}
finally {
grid(0).events().stopLocalListen(lsnr, EVT_CACHE_OBJECT_LOCKED, EVT_CACHE_OBJECT_UNLOCKED);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformException() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
IgniteFuture fut = cache.invokeAsync("key2", ERR_PROCESSOR).chain(new IgniteClosure<IgniteFuture, Object>() {
@Override public Object apply(IgniteFuture o) {
return o.get();
}
});
fut.get();
return null;
}
}, EntryProcessorException.class, null);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLockInsideTransaction() throws Exception {
if (txEnabled()) {
GridTestUtils.assertThrows(
log,
new Callable<Object>() {
@Override public Object call() throws Exception {
IgniteCache<String, Integer> cache = jcache(0);
try (Transaction tx = ignite(0).transactions().txStart()) {
cache.lock("key").lock();
}
return null;
}
},
CacheException.class,
"Explicit lock can't be acquired within a transaction."
);
GridTestUtils.assertThrows(
log,
new Callable<Object>() {
@Override public Object call() throws Exception {
IgniteCache<String, Integer> cache = jcache(0);
try (Transaction tx = ignite(0).transactions().txStart()) {
cache.lockAll(Arrays.asList("key1", "key2")).lock();
}
return null;
}
},
CacheException.class,
"Explicit lock can't be acquired within a transaction."
);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformResourceInjection() throws Exception {
ClusterGroup servers = grid(0).cluster().forServers();
if (F.isEmpty(servers.nodes()))
return;
grid(0).services( grid(0).cluster()).deployNodeSingleton(SERVICE_NAME1, new DummyServiceImpl());
IgniteCache<String, Integer> cache = jcache();
Ignite ignite = ignite(0);
doTransformResourceInjection(ignite, cache, false, false);
doTransformResourceInjection(ignite, cache, true, false);
doTransformResourceInjection(ignite, cache, true, true);
if (txEnabled()) {
doTransformResourceInjectionInTx(ignite, cache, false, false);
doTransformResourceInjectionInTx(ignite, cache, true, false);
doTransformResourceInjectionInTx(ignite, cache, true, true);
}
}
/**
* @param ignite Node.
* @param cache Cache.
* @param async Use async API.
* @param oldAsync Use old async API.
* @throws Exception If failed.
*/
private void doTransformResourceInjectionInTx(Ignite ignite, IgniteCache<String, Integer> cache, boolean async,
boolean oldAsync) throws Exception {
for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
IgniteTransactions txs = ignite.transactions();
try (Transaction tx = txs.txStart(concurrency, isolation)) {
doTransformResourceInjection(ignite, cache, async, oldAsync);
tx.commit();
}
}
}
}
/**
* @param ignite Node.
* @param cache Cache.
* @param async Use async API.
* @param oldAsync Use old async API.
* @throws Exception If failed.
*/
private void doTransformResourceInjection(Ignite ignite, IgniteCache<String, Integer> cache, boolean async,
boolean oldAsync) throws Exception {
final Collection<ResourceType> required = Arrays.asList(ResourceType.IGNITE_INSTANCE,
ResourceType.CACHE_NAME,
ResourceType.LOGGER);
final CacheEventListener lsnr = new CacheEventListener();
IgniteEvents evts = ignite.events(ignite.cluster());
UUID opId = evts.remoteListen(lsnr, null, EVT_CACHE_OBJECT_READ);
try {
checkResourceInjectionOnInvoke(cache, required, async, oldAsync);
checkResourceInjectionOnInvokeAll(cache, required, async, oldAsync);
checkResourceInjectionOnInvokeAllMap(cache, required, async, oldAsync);
}
finally {
evts.stopRemoteListen(opId);
}
}
/**
* Tests invokeAll method for map of pairs (key, entryProcessor).
*
* @param cache Cache.
* @param required Expected injected resources.
* @param async Use async API.
* @param oldAsync Use old async API.
*/
private void checkResourceInjectionOnInvokeAllMap(IgniteCache<String, Integer> cache,
Collection<ResourceType> required, boolean async, boolean oldAsync) {
Map<String, EntryProcessorResult<Integer>> results;
Map<String, EntryProcessor<String, Integer, Integer>> map = new HashMap<>();
map.put(UUID.randomUUID().toString(), new ResourceInjectionEntryProcessor());
map.put(UUID.randomUUID().toString(), new ResourceInjectionEntryProcessor());
map.put(UUID.randomUUID().toString(), new ResourceInjectionEntryProcessor());
map.put(UUID.randomUUID().toString(), new ResourceInjectionEntryProcessor());
if (async) {
if (oldAsync) {
IgniteCache<String, Integer> acache = cache.withAsync();
acache.invokeAll(map);
results = acache.<Map<String, EntryProcessorResult<Integer>>>future().get();
}
else
results = cache.invokeAllAsync(map).get();
}
else
results = cache.invokeAll(map);
assertEquals(map.size(), results.size());
for (EntryProcessorResult<Integer> res : results.values()) {
Collection<ResourceType> notInjected = ResourceInfoSet.valueOf(res.get()).notInjected(required);
if (!notInjected.isEmpty())
fail("Can't inject resource(s): " + Arrays.toString(notInjected.toArray()));
}
}
/**
* Tests invokeAll method for set of keys.
*
* @param cache Cache.
* @param required Expected injected resources.
* @param async Use async API.
* @param oldAsync Use old async API.
*/
private void checkResourceInjectionOnInvokeAll(IgniteCache<String, Integer> cache,
Collection<ResourceType> required, boolean async, boolean oldAsync) {
Set<String> keys = new HashSet<>(Arrays.asList(UUID.randomUUID().toString(),
UUID.randomUUID().toString(),
UUID.randomUUID().toString(),
UUID.randomUUID().toString()));
Map<String, EntryProcessorResult<Integer>> results;
if (async) {
if (oldAsync) {
IgniteCache<String, Integer> acache = cache.withAsync();
acache.invokeAll(keys, new ResourceInjectionEntryProcessor());
results = acache.<Map<String, EntryProcessorResult<Integer>>>future().get();
}
else
results = cache.invokeAllAsync(keys, new ResourceInjectionEntryProcessor()).get();
}
else
results = cache.invokeAll(keys, new ResourceInjectionEntryProcessor());
assertEquals(keys.size(), results.size());
for (EntryProcessorResult<Integer> res : results.values()) {
Collection<ResourceType> notInjected1 = ResourceInfoSet.valueOf(res.get()).notInjected(required);
if (!notInjected1.isEmpty())
fail("Can't inject resource(s): " + Arrays.toString(notInjected1.toArray()));
}
}
/**
* Tests invoke for single key.
*
* @param cache Cache.
* @param required Expected injected resources.
* @param async Use async API.
* @param oldAsync Use old async API.
*/
private void checkResourceInjectionOnInvoke(IgniteCache<String, Integer> cache,
Collection<ResourceType> required, boolean async, boolean oldAsync) {
String key = UUID.randomUUID().toString();
Integer flags;
if (async) {
if (oldAsync) {
IgniteCache<String, Integer> acache = cache.withAsync();
acache.invoke(key, new GridCacheAbstractFullApiSelfTest.ResourceInjectionEntryProcessor());
flags = acache.<Integer>future().get();
}
else
flags = cache.invokeAsync(key,
new GridCacheAbstractFullApiSelfTest.ResourceInjectionEntryProcessor()).get();
}
else
flags = cache.invoke(key, new GridCacheAbstractFullApiSelfTest.ResourceInjectionEntryProcessor());
if (cache.isAsync())
flags = cache.<Integer>future().get();
assertTrue("Processor result is null", flags != null);
Collection<ResourceType> notInjected = ResourceInfoSet.valueOf(flags).notInjected(required);
if (!notInjected.isEmpty())
fail("Can't inject resource(s): " + Arrays.toString(notInjected.toArray()));
}
/**
* Sets given value, returns old value.
*/
public static final class SetValueProcessor implements EntryProcessor<String, Integer, Integer> {
/** */
private Integer newVal;
/**
* @param newVal New value to set.
*/
SetValueProcessor(Integer newVal) {
this.newVal = newVal;
}
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<String, Integer> entry,
Object... arguments) throws EntryProcessorException {
Integer val = entry.getValue();
entry.setValue(newVal);
return val;
}
}
/**
*
*/
public enum CacheStartMode {
/** Start caches together nodes (not dynamically) */
STATIC,
/** */
NODES_THEN_CACHES,
/** */
ONE_BY_ONE
}
/**
*
*/
private static class RemoveEntryProcessor implements EntryProcessor<String, Integer, String>, Serializable {
/** {@inheritDoc} */
@Override public String process(MutableEntry<String, Integer> e, Object... args) {
assertNotNull(e.getKey());
Integer old = e.getValue();
e.remove();
return String.valueOf(old);
}
}
/**
*
*/
private static class IncrementEntryProcessor implements EntryProcessor<String, Integer, String>, Serializable {
/** {@inheritDoc} */
@Override public String process(MutableEntry<String, Integer> e, Object... args) {
assertNotNull(e.getKey());
Integer old = e.getValue();
e.setValue(old == null ? 1 : old + 1);
return String.valueOf(old);
}
}
/**
*
*/
public static class ResourceInjectionEntryProcessor extends ResourceInjectionEntryProcessorBase<String, Integer> {
/** */
protected transient Ignite ignite;
/** */
protected transient String cacheName;
/** */
protected transient IgniteLogger log;
/** */
protected transient DummyService svc;
/**
* @param ignite Ignite.
*/
@IgniteInstanceResource
public void setIgnite(Ignite ignite) {
assert ignite != null;
checkSet();
infoSet.set(ResourceType.IGNITE_INSTANCE, true);
this.ignite = ignite;
}
/**
* @param cacheName Cache name.
*/
@CacheNameResource
public void setCacheName(String cacheName) {
checkSet();
infoSet.set(ResourceType.CACHE_NAME, true);
this.cacheName = cacheName;
}
/**
* @param log Logger.
*/
@LoggerResource
public void setLoggerResource(IgniteLogger log) {
assert log != null;
checkSet();
infoSet.set(ResourceType.LOGGER, true);
this.log = log;
}
/**
* @param svc Service.
*/
@ServiceResource(serviceName = SERVICE_NAME1)
public void setDummyService(DummyService svc) {
assert svc != null;
checkSet();
infoSet.set(ResourceType.SERVICE, true);
this.svc = svc;
}
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
Integer oldVal = e.getValue();
e.setValue(ThreadLocalRandom.current().nextInt() + (oldVal == null ? 0 : oldVal));
return super.process(e, args);
}
}
/**
*
*/
private static class CheckEntriesTask extends TestIgniteIdxRunnable {
/** Keys. */
private final Collection<String> keys;
/**
* @param keys Keys.
*/
public CheckEntriesTask(Collection<String> keys) {
this.keys = keys;
}
/** {@inheritDoc} */
@Override public void run(int idx) throws Exception {
GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();
int size = 0;
if (ctx.isNear())
ctx = ctx.near().dht().context();
for (String key : keys) {
if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) {
GridCacheEntryEx e = ctx.cache().entryEx(key);
assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
assert !e.deleted() : "Entry is deleted: " + e;
size++;
e.touch();
}
}
assertEquals("Incorrect size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL));
}
}
/**
*
*/
private static class CheckCacheSizeTask extends TestIgniteIdxRunnable {
/** */
private final Map<String, Integer> map;
/**
* @param map Map.
*/
CheckCacheSizeTask(Map<String, Integer> map) {
this.map = map;
}
/** {@inheritDoc} */
@Override public void run(int idx) throws Exception {
GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();
int size = 0;
for (String key : map.keySet())
if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx()))
size++;
assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL));
}
}
/**
*
*/
private static class CheckPrimaryKeysTask implements TestCacheCallable<String, Integer, List<String>> {
/** Start from. */
private final int startFrom;
/** Count. */
private final int cnt;
/**
* @param startFrom Start from.
* @param cnt Count.
*/
public CheckPrimaryKeysTask(int startFrom, int cnt) {
this.startFrom = startFrom;
this.cnt = cnt;
}
/** {@inheritDoc} */
@Override public List<String> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
List<String> found = new ArrayList<>();
Affinity<Object> affinity = ignite.affinity(cache.getName());
for (int i = startFrom; i < startFrom + 100_000; i++) {
String key = "key" + i;
if (affinity.isPrimary(ignite.cluster().localNode(), key)) {
found.add(key);
if (found.size() == cnt)
return found;
}
}
throw new IgniteException("Unable to find " + cnt + " keys as primary for cache.");
}
}
/**
*
*/
public static class EntryTtlTask implements TestCacheCallable<String, Integer, IgnitePair<Long>> {
/** Entry key. */
private final String key;
/** Check cache for nearness, use DHT cache if it is near. */
private final boolean useDhtForNearCache;
/**
* @param key Entry key.
* @param useDhtForNearCache Check cache for nearness, use DHT cache if it is near.
*/
public EntryTtlTask(String key, boolean useDhtForNearCache) {
this.key = key;
this.useDhtForNearCache = useDhtForNearCache;
}
/** {@inheritDoc} */
@Override public IgnitePair<Long> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
GridCacheAdapter<?, ?> internalCache = internalCache0(cache);
if (useDhtForNearCache && internalCache.context().isNear())
internalCache = internalCache.context().near().dht();
GridCacheEntryEx entry = internalCache.entryEx(key);
entry.unswap();
IgnitePair<Long> pair = new IgnitePair<>(entry.ttl(), entry.expireTime());
if (!entry.isNear())
entry.context().cache().removeEntry(entry);
return pair;
}
}
/**
*
*/
private static class CheckIteratorTask extends TestIgniteIdxCallable<Void> {
/**
* @param idx Index.
*/
@Override public Void call(int idx) throws Exception {
GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();
GridCacheQueryManager queries = ctx.queries();
ConcurrentMap<UUID, Map<Long, GridFutureAdapter<?>>> map = GridTestUtils.getFieldValue(queries,
GridCacheQueryManager.class, "qryIters");
for (Map<Long, GridFutureAdapter<?>> map1 : map.values())
assertTrue("Iterators not removed for grid " + idx, map1.isEmpty());
return null;
}
}
/**
*
*/
private static class PrintIteratorStateTask extends TestIgniteIdxCallable<Void> {
/** */
@LoggerResource
private IgniteLogger log;
/**
* @param idx Index.
*/
@Override public Void call(int idx) throws Exception {
GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();
GridCacheQueryManager queries = ctx.queries();
ConcurrentMap<UUID, Map<Long, GridFutureAdapter<?>>> map = GridTestUtils.getFieldValue(queries,
GridCacheQueryManager.class, "qryIters");
for (Map<Long, GridFutureAdapter<?>> map1 : map.values()) {
if (!map1.isEmpty()) {
log.warning("Iterators leak detected at grid: " + idx);
for (Map.Entry<Long, GridFutureAdapter<?>> entry : map1.entrySet())
log.warning(entry.getKey() + "; " + entry.getValue());
}
}
return null;
}
}
/**
*
*/
private static class RemoveAndReturnNullEntryProcessor implements
EntryProcessor<String, Integer, Integer>, Serializable {
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
e.remove();
return null;
}
}
/**
*
*/
private static class CheckEntriesDeletedTask extends TestIgniteIdxRunnable {
/** */
private final int cnt;
/**
* @param cnt Keys count.
*/
public CheckEntriesDeletedTask(int cnt) {
this.cnt = cnt;
}
/** {@inheritDoc} */
@Override public void run(int idx) throws Exception {
for (int i = 0; i < cnt; i++) {
String key = String.valueOf(i);
GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();
GridCacheEntryEx entry = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
if (ignite.affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) {
assertNotNull(entry);
assertTrue(entry.deleted());
}
else
assertNull(entry);
}
}
}
/**
*
*/
private static class CheckKeySizeTask extends TestIgniteIdxRunnable {
/** Keys. */
private final Collection<String> keys;
/**
* @param keys Keys.
*/
public CheckKeySizeTask(Collection<String> keys) {
this.keys = keys;
}
/** {@inheritDoc} */
@Override public void run(int idx) throws Exception {
GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();
int size = 0;
for (String key : keys)
if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx()))
size++;
assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(DEFAULT_CACHE_NAME).localSize(ALL));
}
}
/**
*
*/
private static class FailedEntryProcessor implements EntryProcessor<String, Integer, Integer>, Serializable {
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
throw new EntryProcessorException("Test entry processor exception.");
}
}
/**
*
*/
private static class TestValue implements Serializable {
/** */
private int val;
/**
* @param val Value.
*/
TestValue(int val) {
this.val = val;
}
/**
* @return Value.
*/
public int value() {
return val;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof TestValue))
return false;
TestValue value = (TestValue)o;
if (val != value.val)
return false;
return true;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return val;
}
}
/**
* Dummy Service.
*/
public interface DummyService {
/**
*
*/
public void noop();
}
/**
* No-op test service.
*/
public static class DummyServiceImpl implements DummyService, Service {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
@Override public void noop() {
// No-op.
}
/** {@inheritDoc} */
@Override public void cancel(ServiceContext ctx) {
System.out.println("Cancelling service: " + ctx.name());
}
/** {@inheritDoc} */
@Override public void init(ServiceContext ctx) throws Exception {
System.out.println("Initializing service: " + ctx.name());
}
/** {@inheritDoc} */
@Override public void execute(ServiceContext ctx) {
System.out.println("Executing service: " + ctx.name());
}
}
/**
*
*/
public static class CacheEventListener implements IgniteBiPredicate<UUID, CacheEvent>, IgnitePredicate<CacheEvent> {
/** */
public final LinkedBlockingQueue<CacheEvent> evts = new LinkedBlockingQueue<>();
/** {@inheritDoc} */
@Override public boolean apply(UUID uuid, CacheEvent evt) {
evts.add(evt);
return true;
}
/** {@inheritDoc} */
@Override public boolean apply(CacheEvent evt) {
evts.add(evt);
return true;
}
}
}