| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Lock; |
| import javax.cache.Cache; |
| import javax.cache.CacheException; |
| import javax.cache.event.CacheEntryEvent; |
| import javax.cache.event.CacheEntryListenerException; |
| import javax.cache.event.CacheEntryUpdatedListener; |
| import javax.cache.expiry.Duration; |
| import javax.cache.expiry.ExpiryPolicy; |
| import javax.cache.expiry.TouchedExpiryPolicy; |
| import javax.cache.processor.EntryProcessor; |
| import javax.cache.processor.EntryProcessorException; |
| import javax.cache.processor.EntryProcessorResult; |
| import javax.cache.processor.MutableEntry; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Sets; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteTransactions; |
| import org.apache.ignite.cache.CacheEntry; |
| import org.apache.ignite.cache.CacheEntryEventSerializableFilter; |
| import org.apache.ignite.cache.CacheEntryProcessor; |
| import org.apache.ignite.cache.CachePeekMode; |
| import org.apache.ignite.cache.affinity.Affinity; |
| import org.apache.ignite.cache.query.ContinuousQuery; |
| import org.apache.ignite.cache.query.QueryCursor; |
| import org.apache.ignite.cache.query.ScanQuery; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; |
| import org.apache.ignite.internal.util.lang.GridAbsPredicate; |
| import org.apache.ignite.internal.util.lang.GridAbsPredicateX; |
| import org.apache.ignite.internal.util.lang.IgnitePair; |
| import org.apache.ignite.internal.util.typedef.CIX1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.PA; |
| import org.apache.ignite.internal.util.typedef.internal.A; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiPredicate; |
| import org.apache.ignite.lang.IgniteClosure; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.IgniteCacheConfigVariationsAbstractTest; |
| import org.apache.ignite.transactions.Transaction; |
| import org.apache.ignite.transactions.TransactionConcurrency; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.cache.CacheMode.REPLICATED; |
| import static org.apache.ignite.cache.CachePeekMode.ALL; |
| import static org.apache.ignite.cache.CachePeekMode.OFFHEAP; |
| import static org.apache.ignite.cache.CachePeekMode.ONHEAP; |
| import static org.apache.ignite.cache.CachePeekMode.PRIMARY; |
| import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_LOCKED; |
| import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED; |
| import static org.apache.ignite.testframework.GridTestUtils.assertThrows; |
| import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; |
| import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; |
| import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; |
| import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; |
| import static org.apache.ignite.transactions.TransactionState.COMMITTED; |
| |
| /** |
| * Full API cache test. |
| */ |
| @SuppressWarnings({"unchecked"}) |
| public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVariationsAbstractTest { |
| /** Test timeout */ |
| private static final long TEST_TIMEOUT = 60 * 1000; |
| |
| /** */ |
| public static final CacheEntryProcessor<String, Integer, String> ERR_PROCESSOR = |
| new CacheEntryProcessor<String, Integer, String>() { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| @Override public String process(MutableEntry<String, Integer> e, Object... args) { |
| throw new RuntimeException("Failed!"); |
| } |
| }; |
| |
| /** Increment processor for invoke operations. */ |
| public static final EntryProcessor<Object, Object, Object> INCR_PROCESSOR = new IncrementEntryProcessor(); |
| |
| /** Increment processor for invoke operations with IgniteEntryProcessor. */ |
| public static final CacheEntryProcessor<Object, Object, Object> INCR_IGNITE_PROCESSOR = |
| new CacheEntryProcessor<Object, Object, Object>() { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| @Override public Object process(MutableEntry<Object, Object> e, Object... args) { |
| return INCR_PROCESSOR.process(e, args); |
| } |
| }; |
| |
| /** Increment processor for invoke operations. */ |
| public static final EntryProcessor<Object, Object, Object> RMV_PROCESSOR = new RemoveEntryProcessor(); |
| |
| /** Increment processor for invoke operations with IgniteEntryProcessor. */ |
| public static final CacheEntryProcessor<Object, Object, Object> RMV_IGNITE_PROCESSOR = |
| new CacheEntryProcessor<Object, Object, Object>() { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| @Override public Object process(MutableEntry<Object, Object> e, Object... args) { |
| return RMV_PROCESSOR.process(e, args); |
| } |
| }; |
| |
| /** */ |
| public static final int CNT = 20; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| return super.getConfiguration(igniteInstanceName).setIncludeEventTypes(EventType.EVTS_ALL); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected long getTestTimeout() { |
| return TEST_TIMEOUT; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testSize() throws Exception { |
| assert jcache().localSize() == 0; |
| |
| int size = 10; |
| |
| final Map<String, Integer> map = new HashMap<>(); |
| |
| for (int i = 0; i < size; i++) |
| map.put("key" + i, i); |
| |
| // Put in primary nodes to avoid near readers which will prevent entry from being cleared. |
| Map<ClusterNode, Collection<String>> mapped = grid(0).<String>affinity(cacheName()).mapKeysToNodes(map.keySet()); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| Collection<String> keys = mapped.get(grid(i).localNode()); |
| |
| if (!F.isEmpty(keys)) { |
| for (String key : keys) |
| jcache(i).put(key, map.get(key)); |
| } |
| } |
| |
| map.remove("key0"); |
| |
| mapped = grid(0).<String>affinity(cacheName()).mapKeysToNodes(map.keySet()); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| // Will actually delete entry from map. |
| CU.invalidate(jcache(i), "key0"); |
| |
| assertNull("Failed check for grid: " + i, jcache(i).localPeek("key0", ONHEAP)); |
| |
| Collection<String> keysCol = mapped.get(grid(i).localNode()); |
| |
| assert jcache(i).localSize() != 0 || F.isEmpty(keysCol); |
| } |
| |
| for (int i = 0; i < gridCount(); i++) |
| executeOnLocalOrRemoteJvm(i, new CheckCacheSizeTask(map, cacheName())); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| Collection<String> keysCol = mapped.get(grid(i).localNode()); |
| |
| assertEquals("Failed check for grid: " + i, !F.isEmpty(keysCol) ? keysCol.size() : 0, |
| jcache(i).localSize(PRIMARY)); |
| } |
| |
| int globalPrimarySize = map.size(); |
| |
| for (int i = 0; i < gridCount(); i++) |
| assertEquals(globalPrimarySize, jcache(i).size(PRIMARY)); |
| |
| for (int i = 0; i < gridCount(); i++) |
| assertEquals(globalPrimarySize, jcache(i).sizeLong(PRIMARY)); |
| |
| for (int i = 0; i < gridCount(); i++) |
| assertEquals(globalPrimarySize, (int)jcache(i).sizeAsync(PRIMARY).get()); |
| |
| for (int i = 0; i < gridCount(); i++) |
| assertEquals((long)globalPrimarySize, (long)jcache(i).sizeLongAsync(PRIMARY).get()); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| IgniteCacheProxy cache = (IgniteCacheProxy)jcache(i); |
| |
| long cacheSize = 0; |
| |
| int parts = cache.context().affinity().partitions(); |
| |
| for (int part = 0; part < parts; ++part) |
| cacheSize += jcache(i).sizeLong(part, PRIMARY); |
| |
| assertEquals((long)globalPrimarySize, cacheSize); |
| } |
| |
| for (int i = 0; i < gridCount(); i++) { |
| IgniteCacheProxy cache = (IgniteCacheProxy)jcache(i); |
| |
| long cacheSize = 0; |
| |
| int parts = cache.context().affinity().partitions(); |
| |
| for (int part = 0; part < parts; ++part) |
| cacheSize += jcache(i).sizeLongAsync(part, PRIMARY).get(); |
| |
| assertEquals((long)globalPrimarySize, cacheSize); |
| } |
| |
| int times = 1; |
| |
| if (cacheMode() == REPLICATED) |
| times = gridCount() - clientsCount(); |
| else if (cacheMode() == PARTITIONED) |
| times = Math.min(gridCount(), jcache().getConfiguration(CacheConfiguration.class).getBackups() + 1); |
| |
| int globalSize = globalPrimarySize * times; |
| |
| for (int i = 0; i < gridCount(); i++) |
| assertEquals(globalSize, jcache(i).size(ALL)); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testContainsKey() throws Exception { |
| |
| Map<String, Integer> vals = new HashMap<>(); |
| |
| for (int i = 0; i < CNT; i++) |
| vals.put("key" + i, i); |
| |
| jcache().putAll(vals); |
| |
| checkContainsKey(true, "key0"); |
| checkContainsKey(false, "testContainsKeyWrongKey"); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| assertTrue(jcache(i).containsKeys(vals.keySet())); |
| assertTrue(jcache(i).containsKeysAsync(vals.keySet()).get()); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testContainsKeyTx() throws Exception { |
| if (!txEnabled()) |
| return; |
| |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteTransactions txs = ignite(0).transactions(); |
| |
| for (int i = 0; i < 10; i++) { |
| String key = String.valueOf(i); |
| |
| try (Transaction tx = txs.txStart()) { |
| assertNull(key, cache.get(key)); |
| |
| assertFalse(cache.containsKey(key)); |
| |
| tx.commit(); |
| } |
| |
| try (Transaction tx = txs.txStart()) { |
| assertNull(key, cache.get(key)); |
| |
| cache.put(key, i); |
| |
| assertTrue(cache.containsKey(key)); |
| |
| tx.commit(); |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testContainsKeysTx() throws Exception { |
| if (!txEnabled()) |
| return; |
| |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteTransactions txs = ignite(0).transactions(); |
| |
| Set<String> keys = new HashSet<>(); |
| |
| for (int i = 0; i < 10; i++) { |
| String key = String.valueOf(i); |
| |
| keys.add(key); |
| } |
| |
| try (Transaction tx = txs.txStart()) { |
| for (String key : keys) |
| assertNull(key, cache.get(key)); |
| |
| assertFalse(cache.containsKeys(keys)); |
| |
| tx.commit(); |
| } |
| |
| try (Transaction tx = txs.txStart()) { |
| for (String key : keys) |
| assertNull(key, cache.get(key)); |
| |
| for (String key : keys) |
| cache.put(key, 0); |
| |
| assertTrue(cache.containsKeys(keys)); |
| |
| tx.commit(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testRemoveInExplicitLocks() throws Exception { |
| if (lockingEnabled()) { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("a", 1); |
| |
| Lock lock = cache.lockAll(ImmutableSet.of("a", "b", "c", "d")); |
| |
| lock.lock(); |
| |
| try { |
| cache.remove("a"); |
| |
| // Make sure single-key operation did not remove lock. |
| cache.putAll(F.asMap("b", 2, "c", 3, "d", 4)); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testRemoveAllSkipStore() throws Exception { |
| if (!storeEnabled()) |
| return; |
| |
| IgniteCache<String, Integer> jcache = jcache(); |
| |
| jcache.putAll(F.asMap("1", 1, "2", 2, "3", 3)); |
| |
| jcache.withSkipStore().removeAll(); |
| |
| assertEquals((Integer)1, jcache.get("1")); |
| assertEquals((Integer)2, jcache.get("2")); |
| assertEquals((Integer)3, jcache.get("3")); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testAtomicOps() throws IgniteCheckedException { |
| IgniteCache<String, Integer> c = jcache(); |
| |
| final int cnt = 10; |
| |
| for (int i = 0; i < cnt; i++) |
| assertNull(c.getAndPutIfAbsent("k" + i, i)); |
| |
| for (int i = 0; i < cnt; i++) { |
| boolean wrong = i % 2 == 0; |
| |
| String key = "k" + i; |
| |
| boolean res = c.replace(key, wrong ? i + 1 : i, -1); |
| |
| assertEquals(wrong, !res); |
| } |
| |
| for (int i = 0; i < cnt; i++) { |
| boolean success = i % 2 != 0; |
| |
| String key = "k" + i; |
| |
| boolean res = c.remove(key, -1); |
| |
| assertTrue(success == res); |
| } |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGet() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() { |
| IgniteCache cache = jcache(); |
| |
| cache.put(key(1), value(1)); |
| cache.put(key(2), value(2)); |
| |
| assertEquals(value(1), cache.get(key(1))); |
| assertEquals(value(2), cache.get(key(2))); |
| // Wrong key. |
| assertNull(cache.get(key(3))); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGetAsyncOld() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| IgniteCache<String, Integer> cacheAsync = cache.withAsync(); |
| |
| cacheAsync.get("key1"); |
| |
| IgniteFuture<Integer> fut1 = cacheAsync.future(); |
| |
| cacheAsync.get("key2"); |
| |
| IgniteFuture<Integer> fut2 = cacheAsync.future(); |
| |
| cacheAsync.get("wrongKey"); |
| |
| IgniteFuture<Integer> fut3 = cacheAsync.future(); |
| |
| assert fut1.get() == 1; |
| assert fut2.get() == 2; |
| assert fut3.get() == null; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGetAsync() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| IgniteFuture<Integer> fut1 = cache.getAsync("key1"); |
| |
| IgniteFuture<Integer> fut2 = cache.getAsync("key2"); |
| |
| IgniteFuture<Integer> fut3 = cache.getAsync("wrongKey"); |
| |
| assert fut1.get() == 1; |
| assert fut2.get() == 2; |
| assert fut3.get() == null; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGetAll() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() { |
| final Object key1 = key(1); |
| final Object key2 = key(2); |
| final Object key9999 = key(9999); |
| |
| final Object val1 = value(1); |
| final Object val2 = value(2); |
| |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| final IgniteCache<Object, Object> cache = jcache(); |
| |
| try { |
| cache.put(key1, val1); |
| cache.put(key2, val2); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.getAll(null).isEmpty(); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| assert cache.getAll(Collections.<Object>emptySet()).isEmpty(); |
| |
| Map<Object, Object> map1 = cache.getAll(ImmutableSet.of(key1, key2, key9999)); |
| |
| info("Retrieved map1: " + map1); |
| |
| assert 2 == map1.size() : "Invalid map: " + map1; |
| |
| assertEquals(val1, map1.get(key1)); |
| assertEquals(val2, map1.get(key2)); |
| assertNull(map1.get(key9999)); |
| |
| Map<Object, Object> map2 = cache.getAll(ImmutableSet.of(key1, key2, key9999)); |
| |
| info("Retrieved map2: " + map2); |
| |
| assert 2 == map2.size() : "Invalid map: " + map2; |
| |
| assertEquals(val1, map2.get(key1)); |
| assertEquals(val2, map2.get(key2)); |
| assertNull(map2.get(key9999)); |
| |
| // Now do the same checks but within transaction. |
| if (txShouldBeUsed()) { |
| try (Transaction tx0 = transactions().txStart()) { |
| assert cache.getAll(Collections.<Object>emptySet()).isEmpty(); |
| |
| map1 = cache.getAll(ImmutableSet.of(key1, key2, key9999)); |
| |
| info("Retrieved map1: " + map1); |
| |
| assert 2 == map1.size() : "Invalid map: " + map1; |
| |
| assertEquals(val1, map2.get(key1)); |
| assertEquals(val2, map2.get(key2)); |
| assertNull(map2.get(key9999)); |
| |
| map2 = cache.getAll(ImmutableSet.of(key1, key2, key9999)); |
| |
| info("Retrieved map2: " + map2); |
| |
| assert 2 == map2.size() : "Invalid map: " + map2; |
| |
| assertEquals(val1, map2.get(key1)); |
| assertEquals(val2, map2.get(key2)); |
| assertNull(map2.get(key9999)); |
| |
| tx0.commit(); |
| } |
| } |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGetAllWithNulls() throws Exception { |
| final IgniteCache<String, Integer> cache = jcache(); |
| |
| final Set<String> c = new HashSet<>(); |
| |
| c.add("key1"); |
| c.add(null); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.getAll(c); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGetTxNonExistingKey() throws Exception { |
| if (txShouldBeUsed()) { |
| try (Transaction ignored = transactions().txStart()) { |
| assert jcache().get("key999123") == null; |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGetAllAsyncOld() throws Exception { |
| final IgniteCache<String, Integer> cache = jcache(); |
| |
| final IgniteCache<String, Integer> cacheAsync = cache.withAsync(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cacheAsync.getAll(null); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| cacheAsync.getAll(Collections.<String>emptySet()); |
| IgniteFuture<Map<String, Integer>> fut2 = cacheAsync.future(); |
| |
| cacheAsync.getAll(ImmutableSet.of("key1", "key2")); |
| IgniteFuture<Map<String, Integer>> fut3 = cacheAsync.future(); |
| |
| assert fut2.get().isEmpty(); |
| assert fut3.get().size() == 2 : "Invalid map: " + fut3.get(); |
| assert fut3.get().get("key1") == 1; |
| assert fut3.get().get("key2") == 2; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGetAllAsync() throws Exception { |
| final IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.getAllAsync(null); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| IgniteFuture<Map<String, Integer>> fut2 = cache.getAllAsync(Collections.<String>emptySet()); |
| |
| IgniteFuture<Map<String, Integer>> fut3 = cache.getAllAsync(ImmutableSet.of("key1", "key2")); |
| |
| assert fut2.get().isEmpty(); |
| assert fut3.get().size() == 2 : "Invalid map: " + fut3.get(); |
| assert fut3.get().get("key1") == 1; |
| assert fut3.get().get("key2") == 2; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPut() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| IgniteCache cache = jcache(); |
| |
| final Object key1 = key(1); |
| final Object val1 = value(1); |
| final Object key2 = key(2); |
| final Object val2 = value(2); |
| |
| assert cache.getAndPut(key1, val1) == null; |
| assert cache.getAndPut(key2, val2) == null; |
| |
| // Check inside transaction. |
| assertEquals(val1, cache.get(key1)); |
| assertEquals(val2, cache.get(key2)); |
| |
| // Put again to check returned values. |
| assertEquals(val1, cache.getAndPut(key1, val1)); |
| assertEquals(val2, cache.getAndPut(key2, val2)); |
| |
| checkContainsKey(true, key1); |
| checkContainsKey(true, key2); |
| |
| assert cache.get(key1) != null; |
| assert cache.get(key2) != null; |
| assert cache.get(key(100500)) == null; |
| |
| // Check outside transaction. |
| checkContainsKey(true, key1); |
| checkContainsKey(true, key2); |
| |
| assertEquals(val1, cache.get(key1)); |
| assertEquals(val2, cache.get(key2)); |
| assert cache.get(key(100500)) == null; |
| |
| assertEquals(val1, cache.getAndPut(key1, value(10))); |
| assertEquals(val2, cache.getAndPut(key2, value(11))); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutTx() throws Exception { |
| if (txShouldBeUsed()) { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| try (Transaction tx = transactions().txStart()) { |
| assert cache.getAndPut("key1", 1) == null; |
| assert cache.getAndPut("key2", 2) == null; |
| |
| // Check inside transaction. |
| assert cache.get("key1") == 1; |
| assert cache.get("key2") == 2; |
| |
| // Put again to check returned values. |
| assert cache.getAndPut("key1", 1) == 1; |
| assert cache.getAndPut("key2", 2) == 2; |
| |
| assert cache.get("key1") != null; |
| assert cache.get("key2") != null; |
| assert cache.get("wrong") == null; |
| |
| tx.commit(); |
| } |
| |
| // Check outside transaction. |
| checkContainsKey(true, "key1"); |
| checkContainsKey(true, "key2"); |
| |
| assert cache.get("key1") == 1; |
| assert cache.get("key2") == 2; |
| assert cache.get("wrong") == null; |
| |
| assertEquals((Integer)1, cache.getAndPut("key1", 10)); |
| assertEquals((Integer)2, cache.getAndPut("key2", 11)); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeOptimisticReadCommitted() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvoke(OPTIMISTIC, READ_COMMITTED); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeOptimisticRepeatableRead() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvoke(OPTIMISTIC, REPEATABLE_READ); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokePessimisticReadCommitted() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvoke(PESSIMISTIC, READ_COMMITTED); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokePessimisticRepeatableRead() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvoke(PESSIMISTIC, REPEATABLE_READ); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testIgniteInvokeOptimisticReadCommitted1() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkIgniteInvoke(OPTIMISTIC, READ_COMMITTED); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testIgniteInvokeOptimisticRepeatableRead() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkIgniteInvoke(OPTIMISTIC, REPEATABLE_READ); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testIgniteInvokePessimisticReadCommitted() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkIgniteInvoke(PESSIMISTIC, READ_COMMITTED); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testIgniteInvokePessimisticRepeatableRead() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkIgniteInvoke(PESSIMISTIC, REPEATABLE_READ); |
| } |
| }); |
| } |
| |
| /** |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @throws Exception If failed. |
| */ |
| private void checkIgniteInvoke(TransactionConcurrency concurrency, TransactionIsolation isolation) |
| throws Exception { |
| checkInvoke(concurrency, isolation, INCR_IGNITE_PROCESSOR, RMV_IGNITE_PROCESSOR); |
| } |
| |
| /** |
| * @param concurrency Transaction concurrency. |
| * @param isolation Transaction isolation. |
| * @param incrProcessor Increment processor. |
| * @param rmvProseccor Remove processor. |
| */ |
| private void checkInvoke(TransactionConcurrency concurrency, TransactionIsolation isolation, |
| EntryProcessor<Object, Object, Object> incrProcessor, |
| EntryProcessor<Object, Object, Object> rmvProseccor) { |
| IgniteCache cache = jcache(); |
| |
| final Object key1 = key(1); |
| final Object key2 = key(2); |
| final Object key3 = key(3); |
| |
| final Object val1 = value(1); |
| final Object val2 = value(2); |
| final Object val3 = value(3); |
| |
| cache.put(key2, val1); |
| cache.put(key3, val3); |
| |
| Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, isolation) : null; |
| |
| try { |
| assertNull(cache.invoke(key1, incrProcessor, dataMode)); |
| assertEquals(val1, cache.invoke(key2, incrProcessor, dataMode)); |
| assertEquals(val3, cache.invoke(key3, rmvProseccor)); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| |
| throw e; |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| assertEquals(val1, cache.get(key1)); |
| assertEquals(val2, cache.get(key2)); |
| assertNull(cache.get(key3)); |
| |
| for (int i = 0; i < gridCount(); i++) |
| assertNull("Failed for cache: " + i, jcache(i).localPeek(key3, ONHEAP)); |
| |
| cache.remove(key1); |
| cache.put(key2, val1); |
| cache.put(key3, val3); |
| |
| assertNull(cache.invoke(key1, incrProcessor, dataMode)); |
| assertEquals(val1, cache.invoke(key2, incrProcessor, dataMode)); |
| assertEquals(val3, cache.invoke(key3, rmvProseccor)); |
| |
| assertEquals(val1, cache.get(key1)); |
| assertEquals(val2, cache.get(key2)); |
| assertNull(cache.get(key3)); |
| |
| for (int i = 0; i < gridCount(); i++) |
| assertNull(jcache(i).localPeek(key3, ONHEAP)); |
| } |
| |
| /** |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @throws Exception If failed. |
| */ |
| private void checkInvoke(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { |
| checkInvoke(concurrency, isolation, INCR_PROCESSOR, RMV_PROCESSOR); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAllOptimisticReadCommitted() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeAll(OPTIMISTIC, READ_COMMITTED); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAllOptimisticRepeatableRead() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeAll(OPTIMISTIC, REPEATABLE_READ); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAllPessimisticReadCommitted() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeAll(PESSIMISTIC, READ_COMMITTED); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAllPessimisticRepeatableRead() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeAll(PESSIMISTIC, REPEATABLE_READ); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAllAsyncOptimisticReadCommitted() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeAllAsync(OPTIMISTIC, READ_COMMITTED); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAllAsyncOptimisticRepeatableRead() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeAllAsync(OPTIMISTIC, REPEATABLE_READ); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAllAsyncPessimisticReadCommitted() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeAllAsync(PESSIMISTIC, READ_COMMITTED); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAllAsyncPessimisticRepeatableRead() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeAllAsync(PESSIMISTIC, REPEATABLE_READ); |
| } |
| }); |
| } |
| |
| /** |
| * @param concurrency Transaction concurrency. |
| * @param isolation Transaction isolation. |
| * @throws Exception If failed. |
| */ |
| private void checkInvokeAll(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { |
| // TODO IGNITE-2664: enable tests for all modes when IGNITE-2664 will be fixed. |
| if (dataMode != DataMode.EXTERNALIZABLE && gridCount() > 1) |
| return; |
| |
| final Object key1 = key(1); |
| final Object key2 = key(2); |
| final Object key3 = key(3); |
| |
| final Object val1 = value(1); |
| final Object val2 = value(2); |
| final Object val3 = value(3); |
| final Object val4 = value(4); |
| |
| final IgniteCache<Object, Object> cache = jcache(); |
| |
| cache.put(key2, val1); |
| cache.put(key3, val3); |
| |
| if (txShouldBeUsed()) { |
| Map<Object, EntryProcessorResult<Object>> res; |
| |
| try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { |
| res = cache.invokeAll(F.asSet(key1, key2, key3), INCR_PROCESSOR, dataMode); |
| |
| tx.commit(); |
| } |
| |
| assertEquals(val1, cache.get(key1)); |
| assertEquals(val2, cache.get(key2)); |
| assertEquals(val4, cache.get(key3)); |
| |
| assertNull(res.get(key1)); |
| assertEquals(val1, res.get(key2).get()); |
| assertEquals(val3, res.get(key3).get()); |
| |
| assertEquals(2, res.size()); |
| |
| cache.remove(key1); |
| cache.put(key2, val1); |
| cache.put(key3, val3); |
| } |
| |
| Map<Object, EntryProcessorResult<Object>> res = cache.invokeAll(F.asSet(key1, key2, key3), RMV_PROCESSOR); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| assertNull(jcache(i).localPeek(key1, ONHEAP)); |
| assertNull(jcache(i).localPeek(key2, ONHEAP)); |
| assertNull(jcache(i).localPeek(key3, ONHEAP)); |
| } |
| |
| assertNull(res.get(key1)); |
| assertEquals(val1, res.get(key2).get()); |
| assertEquals(val3, res.get(key3).get()); |
| |
| assertEquals(2, res.size()); |
| |
| cache.remove(key1); |
| cache.put(key2, val1); |
| cache.put(key3, val3); |
| |
| res = cache.invokeAll(F.asSet(key1, key2, key3), INCR_PROCESSOR, dataMode); |
| |
| assertEquals(val1, cache.get(key1)); |
| assertEquals(val2, cache.get(key2)); |
| assertEquals(val4, cache.get(key3)); |
| |
| assertNull(res.get(key1)); |
| assertEquals(val1, res.get(key2).get()); |
| assertEquals(val3, res.get(key3).get()); |
| |
| assertEquals(2, res.size()); |
| |
| cache.remove(key1); |
| cache.put(key2, val1); |
| cache.put(key3, val3); |
| |
| res = cache.invokeAll(F.asMap(key1, INCR_PROCESSOR, key2, INCR_PROCESSOR, key3, INCR_PROCESSOR), dataMode); |
| |
| assertEquals(val1, cache.get(key1)); |
| assertEquals(val2, cache.get(key2)); |
| assertEquals(val4, cache.get(key3)); |
| |
| assertNull(res.get(key1)); |
| assertEquals(val1, res.get(key2).get()); |
| assertEquals(val3, res.get(key3).get()); |
| |
| assertEquals(2, res.size()); |
| } |
| |
| /** |
| * @param concurrency Transaction concurrency. |
| * @param isolation Transaction isolation. |
| * @throws Exception If failed. |
| */ |
| private void checkInvokeAllAsync(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { |
| // TODO IGNITE-2664: enable tests for all modes when IGNITE-2664 will be fixed. |
| if (dataMode != DataMode.EXTERNALIZABLE && gridCount() > 1) |
| return; |
| |
| final Object key1 = key(1); |
| final Object key2 = key(2); |
| final Object key3 = key(3); |
| |
| final Object val1 = value(1); |
| final Object val2 = value(2); |
| final Object val3 = value(3); |
| final Object val4 = value(4); |
| |
| final IgniteCache<Object, Object> cache = jcache(); |
| |
| cache.put(key2, val1); |
| cache.put(key3, val3); |
| |
| if (txShouldBeUsed()) { |
| Map<Object, EntryProcessorResult<Object>> res; |
| |
| try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { |
| res = cache.invokeAllAsync(F.asSet(key1, key2, key3), INCR_PROCESSOR, dataMode).get(); |
| |
| tx.commit(); |
| } |
| |
| assertEquals(val1, cache.get(key1)); |
| assertEquals(val2, cache.get(key2)); |
| assertEquals(val4, cache.get(key3)); |
| |
| assertNull(res.get(key1)); |
| assertEquals(val1, res.get(key2).get()); |
| assertEquals(val3, res.get(key3).get()); |
| |
| assertEquals(2, res.size()); |
| |
| cache.remove(key1); |
| cache.put(key2, val1); |
| cache.put(key3, val3); |
| } |
| |
| Map<Object, EntryProcessorResult<Object>> res = |
| cache.invokeAllAsync(F.asSet(key1, key2, key3), RMV_PROCESSOR).get(); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| assertNull(jcache(i).localPeek(key1, ONHEAP)); |
| assertNull(jcache(i).localPeek(key2, ONHEAP)); |
| assertNull(jcache(i).localPeek(key3, ONHEAP)); |
| } |
| |
| assertNull(res.get(key1)); |
| assertEquals(val1, res.get(key2).get()); |
| assertEquals(val3, res.get(key3).get()); |
| |
| assertEquals(2, res.size()); |
| |
| cache.remove(key1); |
| cache.put(key2, val1); |
| cache.put(key3, val3); |
| |
| res = cache.invokeAllAsync(F.asSet(key1, key2, key3), INCR_PROCESSOR, dataMode).get(); |
| |
| assertEquals(val1, cache.get(key1)); |
| assertEquals(val2, cache.get(key2)); |
| assertEquals(val4, cache.get(key3)); |
| |
| assertNull(res.get(key1)); |
| assertEquals(val1, res.get(key2).get()); |
| assertEquals(val3, res.get(key3).get()); |
| |
| assertEquals(2, res.size()); |
| |
| cache.remove(key1); |
| cache.put(key2, val1); |
| cache.put(key3, val3); |
| |
| res = cache.invokeAllAsync( |
| F.asMap(key1, INCR_PROCESSOR, key2, INCR_PROCESSOR, key3, INCR_PROCESSOR), dataMode).get(); |
| |
| assertEquals(val1, cache.get(key1)); |
| assertEquals(val2, cache.get(key2)); |
| assertEquals(val4, cache.get(key3)); |
| |
| assertNull(res.get(key1)); |
| assertEquals(val1, res.get(key2).get()); |
| assertEquals(val3, res.get(key3).get()); |
| |
| assertEquals(2, res.size()); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAllWithNulls() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| final Object key1 = key(1); |
| |
| final IgniteCache<Object, Object> cache = jcache(); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.invokeAll((Set<Object>)null, INCR_PROCESSOR, dataMode); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.invokeAll(F.asSet(key1), null); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| { |
| final Set<Object> keys = new LinkedHashSet<>(2); |
| |
| keys.add(key1); |
| keys.add(null); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.invokeAll(keys, INCR_PROCESSOR, dataMode); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.invokeAll(F.asSet(key1), null); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-11885") |
| @Test |
| public void testInvokeSequentialOptimisticNoStart() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeSequential0(false, OPTIMISTIC); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-11885") |
| @Test |
| public void testInvokeSequentialPessimisticNoStart() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeSequential0(false, PESSIMISTIC); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-11885") |
| @Test |
| public void testInvokeSequentialOptimisticWithStart() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeSequential0(true, OPTIMISTIC); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-11885") |
| @Test |
| public void testInvokeSequentialPessimisticWithStart() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeSequential0(true, PESSIMISTIC); |
| } |
| }); |
| } |
| |
| /** |
| * @param startVal Whether to put value. |
| * @param concurrency Concurrency. |
| * @throws Exception If failed. |
| */ |
| private void checkInvokeSequential0(boolean startVal, TransactionConcurrency concurrency) |
| throws Exception { |
| final Object val1 = value(1); |
| final Object val2 = value(2); |
| final Object val3 = value(3); |
| |
| IgniteCache<Object, Object> cache = jcache(); |
| |
| final Object key = primaryTestObjectKeysForCache(cache, 1).get(0); |
| |
| Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null; |
| |
| try { |
| if (startVal) |
| cache.put(key, val2); |
| else |
| assertEquals(null, cache.get(key)); |
| |
| Object expRes = startVal ? val2 : null; |
| |
| assertEquals(expRes, cache.invoke(key, INCR_PROCESSOR, dataMode)); |
| |
| expRes = startVal ? val3 : val1; |
| |
| assertEquals(expRes, cache.invoke(key, INCR_PROCESSOR, dataMode)); |
| |
| expRes = value(valueOf(expRes) + 1); |
| |
| assertEquals(expRes, cache.invoke(key, INCR_PROCESSOR, dataMode)); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| Object exp = value((startVal ? 2 : 0) + 3); |
| |
| assertEquals(exp, cache.get(key)); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| if (ignite(i).affinity(cacheName()).isPrimaryOrBackup(grid(i).localNode(), key)) |
| assertEquals(exp, jcache(i).localPeek(key)); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAfterRemoveOptimistic() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeAfterRemove(OPTIMISTIC); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAfterRemovePessimistic() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeAfterRemove(PESSIMISTIC); |
| } |
| }); |
| } |
| |
| /** |
| * @param concurrency Concurrency. |
| * @throws Exception If failed. |
| */ |
| private void checkInvokeAfterRemove(TransactionConcurrency concurrency) throws Exception { |
| IgniteCache<Object, Object> cache = jcache(); |
| |
| Object key = key(1); |
| |
| cache.put(key, value(4)); |
| |
| Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null; |
| |
| try { |
| cache.remove(key); |
| |
| cache.invoke(key, INCR_PROCESSOR, dataMode); |
| cache.invoke(key, INCR_PROCESSOR, dataMode); |
| cache.invoke(key, INCR_PROCESSOR, dataMode); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| assertEquals(value(3), cache.get(key)); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeReturnValueGetOptimisticReadCommitted() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeReturnValue(false, OPTIMISTIC, READ_COMMITTED); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeReturnValueGetOptimisticRepeatableRead() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeReturnValue(false, OPTIMISTIC, REPEATABLE_READ); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeReturnValueGetPessimisticReadCommitted() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeReturnValue(false, PESSIMISTIC, READ_COMMITTED); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeReturnValueGetPessimisticRepeatableRead() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeReturnValue(false, PESSIMISTIC, REPEATABLE_READ); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeReturnValuePutInTx() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| checkInvokeReturnValue(true, OPTIMISTIC, READ_COMMITTED); |
| } |
| }); |
| } |
| |
| /** |
| * @param put Whether to put value. |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @throws Exception If failed. |
| */ |
| private void checkInvokeReturnValue(boolean put, |
| TransactionConcurrency concurrency, |
| TransactionIsolation isolation) |
| throws Exception { |
| IgniteCache<Object, Object> cache = jcache(); |
| |
| Object key = key(1); |
| Object val1 = value(1); |
| Object val2 = value(2); |
| |
| if (!put) |
| cache.put(key, val1); |
| |
| Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, isolation) : null; |
| |
| try { |
| if (put) |
| cache.put(key, val1); |
| |
| cache.invoke(key, INCR_PROCESSOR, dataMode); |
| |
| assertEquals(val2, cache.get(key)); |
| |
| if (tx != null) { |
| // Second get inside tx. Make sure read value is not transformed twice. |
| assertEquals(val2, cache.get(key)); |
| |
| tx.commit(); |
| } |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGetAndPutAsyncOld() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteCache<String, Integer> cacheAsync = cache.withAsync(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| cacheAsync.getAndPut("key1", 10); |
| |
| IgniteFuture<Integer> fut1 = cacheAsync.future(); |
| |
| cacheAsync.getAndPut("key2", 11); |
| |
| IgniteFuture<Integer> fut2 = cacheAsync.future(); |
| |
| assertEquals((Integer)1, fut1.get(5000)); |
| assertEquals((Integer)2, fut2.get(5000)); |
| |
| assertEquals((Integer)10, cache.get("key1")); |
| assertEquals((Integer)11, cache.get("key2")); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGetAndPutAsync() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| IgniteFuture<Integer> fut1 = cache.getAndPutAsync("key1", 10); |
| |
| IgniteFuture<Integer> fut2 = cache.getAndPutAsync("key2", 11); |
| |
| assertEquals((Integer)1, fut1.get(5000)); |
| assertEquals((Integer)2, fut2.get(5000)); |
| |
| assertEquals((Integer)10, cache.get("key1")); |
| assertEquals((Integer)11, cache.get("key2")); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutAsyncOld0() throws Exception { |
| IgniteCache cacheAsync = jcache().withAsync(); |
| |
| cacheAsync.getAndPut("key1", 0); |
| |
| IgniteFuture<Integer> fut1 = cacheAsync.future(); |
| |
| cacheAsync.getAndPut("key2", 1); |
| |
| IgniteFuture<Integer> fut2 = cacheAsync.future(); |
| |
| assert fut1.get(5000) == null; |
| assert fut2.get(5000) == null; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutAsync0() throws Exception { |
| IgniteFuture<?> fut1 = jcache().getAndPutAsync("key1", 0); |
| |
| IgniteFuture<?> fut2 = jcache().getAndPutAsync("key2", 1); |
| |
| assert fut1.get(5000) == null; |
| assert fut2.get(5000) == null; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAsyncOld() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| final Object key1 = key(1); |
| final Object key2 = key(2); |
| final Object key3 = key(3); |
| |
| final Object val1 = value(1); |
| final Object val2 = value(2); |
| final Object val3 = value(3); |
| |
| IgniteCache<Object, Object> cache = jcache(); |
| |
| cache.put(key2, val1); |
| cache.put(key3, val3); |
| |
| IgniteCache<Object, Object> cacheAsync = cache.withAsync(); |
| |
| assertNull(cacheAsync.invoke(key1, INCR_PROCESSOR, dataMode)); |
| |
| IgniteFuture<?> fut0 = cacheAsync.future(); |
| |
| assertNull(cacheAsync.invoke(key2, INCR_PROCESSOR, dataMode)); |
| |
| IgniteFuture<?> fut1 = cacheAsync.future(); |
| |
| assertNull(cacheAsync.invoke(key3, RMV_PROCESSOR)); |
| |
| IgniteFuture<?> fut2 = cacheAsync.future(); |
| |
| fut0.get(); |
| fut1.get(); |
| fut2.get(); |
| |
| assertEquals(val1, cache.get(key1)); |
| assertEquals(val2, cache.get(key2)); |
| assertNull(cache.get(key3)); |
| |
| for (int i = 0; i < gridCount(); i++) |
| assertNull(jcache(i).localPeek(key3, ONHEAP)); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeAsync() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| final Object key1 = key(1); |
| final Object key2 = key(2); |
| final Object key3 = key(3); |
| |
| final Object val1 = value(1); |
| final Object val2 = value(2); |
| final Object val3 = value(3); |
| |
| IgniteCache<Object, Object> cache = jcache(); |
| |
| cache.put(key2, val1); |
| cache.put(key3, val3); |
| |
| IgniteFuture<?> fut0 = cache.invokeAsync(key1, INCR_PROCESSOR, dataMode); |
| |
| IgniteFuture<?> fut1 = cache.invokeAsync(key2, INCR_PROCESSOR, dataMode); |
| |
| IgniteFuture<?> fut2 = cache.invokeAsync(key3, RMV_PROCESSOR); |
| |
| fut0.get(); |
| fut1.get(); |
| fut2.get(); |
| |
| assertEquals(val1, cache.get(key1)); |
| assertEquals(val2, cache.get(key2)); |
| assertNull(cache.get(key3)); |
| |
| for (int i = 0; i < gridCount(); i++) |
| assertNull(jcache(i).localPeek(key3, ONHEAP)); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvoke() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| final Object k0 = key(0); |
| final Object k1 = key(1); |
| |
| final Object val1 = value(1); |
| final Object val2 = value(2); |
| final Object val3 = value(3); |
| |
| final IgniteCache<Object, Object> cache = jcache(); |
| |
| assertNull(cache.invoke(k0, INCR_PROCESSOR, dataMode)); |
| |
| assertEquals(k1, cache.get(k0)); |
| |
| assertEquals(val1, cache.invoke(k0, INCR_PROCESSOR, dataMode)); |
| |
| assertEquals(val2, cache.get(k0)); |
| |
| cache.put(k1, val1); |
| |
| assertEquals(val1, cache.invoke(k1, INCR_PROCESSOR, dataMode)); |
| |
| assertEquals(val2, cache.get(k1)); |
| |
| assertEquals(val2, cache.invoke(k1, INCR_PROCESSOR, dataMode)); |
| |
| assertEquals(val3, cache.get(k1)); |
| |
| RemoveAndReturnNullEntryProcessor c = new RemoveAndReturnNullEntryProcessor(); |
| |
| assertNull(cache.invoke(k1, c)); |
| assertNull(cache.get(k1)); |
| |
| for (int i = 0; i < gridCount(); i++) |
| assertNull(jcache(i).localPeek(k1, ONHEAP)); |
| |
| final EntryProcessor<Object, Object, Object> errProcessor = new FailedEntryProcessor(); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.invoke(k1, errProcessor); |
| |
| return null; |
| } |
| }, EntryProcessorException.class, "Test entry processor exception."); |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutx() throws Exception { |
| if (txShouldBeUsed()) |
| checkPut(true); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutxNoTx() throws Exception { |
| checkPut(false); |
| } |
| |
| /** |
| * @param inTx Whether to start transaction. |
| * @throws Exception If failed. |
| */ |
| private void checkPut(boolean inTx) throws Exception { |
| Transaction tx = inTx ? transactions().txStart() : null; |
| |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| try { |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| // Check inside transaction. |
| assert cache.get("key1") == 1; |
| assert cache.get("key2") == 2; |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| checkSize(F.asSet("key1", "key2")); |
| |
| // Check outside transaction. |
| checkContainsKey(true, "key1"); |
| checkContainsKey(true, "key2"); |
| checkContainsKey(false, "wrong"); |
| |
| assert cache.get("key1") == 1; |
| assert cache.get("key2") == 2; |
| assert cache.get("wrong") == null; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPutAsyncOld() throws Exception { |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| IgniteCache cacheAsync = jcache().withAsync(); |
| |
| try { |
| jcache().put("key2", 1); |
| |
| cacheAsync.put("key1", 10); |
| |
| IgniteFuture<?> fut1 = cacheAsync.future(); |
| |
| cacheAsync.put("key2", 11); |
| |
| IgniteFuture<?> fut2 = cacheAsync.future(); |
| |
| IgniteFuture<Transaction> f = null; |
| |
| if (tx != null) { |
| tx = (Transaction)tx.withAsync(); |
| |
| tx.commit(); |
| |
| f = tx.future(); |
| } |
| |
| assertNull(fut1.get()); |
| assertNull(fut2.get()); |
| |
| assert f == null || f.get().state() == COMMITTED; |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| checkSize(F.asSet("key1", "key2")); |
| |
| assert (Integer)jcache().get("key1") == 10; |
| assert (Integer)jcache().get("key2") == 11; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPutAsync() throws Exception { |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| try { |
| jcache().put("key2", 1); |
| |
| IgniteFuture<?> fut1 = jcache().putAsync("key1", 10); |
| |
| IgniteFuture<?> fut2 = jcache().putAsync("key2", 11); |
| |
| IgniteFuture<Void> f = null; |
| |
| if (tx != null) |
| f = tx.commitAsync(); |
| |
| assertNull(fut1.get()); |
| assertNull(fut2.get()); |
| |
| try { |
| if (f != null) |
| f.get(); |
| } |
| catch (Throwable t) { |
| assert false : "Unexpected exception " + t; |
| } |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| checkSize(F.asSet("key1", "key2")); |
| |
| assert (Integer)jcache().get("key1") == 10; |
| assert (Integer)jcache().get("key2") == 11; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutAll() throws Exception { |
| Map<String, Integer> map = F.asMap("key1", 1, "key2", 2); |
| |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.putAll(map); |
| |
| checkSize(F.asSet("key1", "key2")); |
| |
| assert cache.get("key1") == 1; |
| assert cache.get("key2") == 2; |
| |
| map.put("key1", 10); |
| map.put("key2", 20); |
| |
| cache.putAll(map); |
| |
| checkSize(F.asSet("key1", "key2")); |
| |
| assert cache.get("key1") == 10; |
| assert cache.get("key2") == 20; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-11885") |
| @Test |
| public void testNullInTx() throws Exception { |
| if (!txShouldBeUsed()) |
| return; |
| |
| final IgniteCache<String, Integer> cache = jcache(); |
| |
| for (int i = 0; i < 100; i++) { |
| final String key = "key-" + i; |
| |
| assertNull(cache.get(key)); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| IgniteTransactions txs = transactions(); |
| |
| try (Transaction tx = txs.txStart()) { |
| cache.put(key, 1); |
| |
| cache.put(null, 2); |
| |
| tx.commit(); |
| } |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| assertNull(cache.get(key)); |
| |
| cache.put(key, 1); |
| |
| assertEquals(1, (int)cache.get(key)); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| IgniteTransactions txs = transactions(); |
| |
| try (Transaction tx = txs.txStart()) { |
| cache.put(key, 2); |
| |
| cache.remove(null); |
| |
| tx.commit(); |
| } |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| assertEquals(1, (int)cache.get(key)); |
| |
| cache.put(key, 2); |
| |
| assertEquals(2, (int)cache.get(key)); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| IgniteTransactions txs = transactions(); |
| |
| Map<String, Integer> map = new LinkedHashMap<>(); |
| |
| map.put("k1", 1); |
| map.put("k2", 2); |
| map.put(null, 3); |
| |
| try (Transaction tx = txs.txStart()) { |
| cache.put(key, 1); |
| |
| cache.putAll(map); |
| |
| tx.commit(); |
| } |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| assertNull(cache.get("k1")); |
| assertNull(cache.get("k2")); |
| |
| assertEquals(2, (int)cache.get(key)); |
| |
| cache.put(key, 3); |
| |
| assertEquals(3, (int)cache.get(key)); |
| } |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutAllWithNulls() throws Exception { |
| final IgniteCache<String, Integer> cache = jcache(); |
| |
| { |
| final Map<String, Integer> m = new LinkedHashMap<>(2); |
| |
| m.put("key1", 1); |
| m.put(null, 2); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.putAll(m); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| cache.put("key1", 1); |
| |
| assertEquals(1, (int)cache.get("key1")); |
| } |
| |
| { |
| final Map<String, Integer> m = new LinkedHashMap<>(2); |
| |
| m.put("key3", 3); |
| m.put("key4", null); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.putAll(m); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| m.put("key4", 4); |
| |
| cache.putAll(m); |
| |
| assertEquals(3, (int)cache.get("key3")); |
| assertEquals(4, (int)cache.get("key4")); |
| } |
| |
| assertThrows(log, new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| cache.put("key1", null); |
| |
| return null; |
| } |
| }, NullPointerException.class, A.NULL_MSG_PREFIX); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| cache.getAndPut("key1", null); |
| |
| return null; |
| } |
| }, NullPointerException.class, A.NULL_MSG_PREFIX); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| cache.put(null, 1); |
| |
| return null; |
| } |
| }, NullPointerException.class, A.NULL_MSG_PREFIX); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| cache.replace(null, 1); |
| |
| return null; |
| } |
| }, NullPointerException.class, A.NULL_MSG_PREFIX); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| cache.getAndReplace(null, 1); |
| |
| return null; |
| } |
| }, NullPointerException.class, A.NULL_MSG_PREFIX); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| cache.replace("key", null); |
| |
| return null; |
| } |
| }, NullPointerException.class, A.NULL_MSG_PREFIX); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| cache.getAndReplace("key", null); |
| |
| return null; |
| } |
| }, NullPointerException.class, A.NULL_MSG_PREFIX); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| cache.replace(null, 1, 2); |
| |
| return null; |
| } |
| }, NullPointerException.class, A.NULL_MSG_PREFIX); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| cache.replace("key", null, 2); |
| |
| return null; |
| } |
| }, NullPointerException.class, A.NULL_MSG_PREFIX); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| cache.replace("key", 1, null); |
| |
| return null; |
| } |
| }, NullPointerException.class, A.NULL_MSG_PREFIX); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutAllAsyncOld() throws Exception { |
| Map<String, Integer> map = F.asMap("key1", 1, "key2", 2); |
| |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteCache<String, Integer> cacheAsync = cache.withAsync(); |
| |
| cacheAsync.putAll(map); |
| |
| IgniteFuture<?> f1 = cacheAsync.future(); |
| |
| map.put("key1", 10); |
| map.put("key2", 20); |
| |
| cacheAsync.putAll(map); |
| |
| IgniteFuture<?> f2 = cacheAsync.future(); |
| |
| assertNull(f2.get()); |
| assertNull(f1.get()); |
| |
| checkSize(F.asSet("key1", "key2")); |
| |
| assert cache.get("key1") == 10; |
| assert cache.get("key2") == 20; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutAllAsync() throws Exception { |
| Map<String, Integer> map = F.asMap("key1", 1, "key2", 2); |
| |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteFuture<?> f1 = cache.putAllAsync(map); |
| |
| map.put("key1", 10); |
| map.put("key2", 20); |
| |
| IgniteFuture<?> f2 = cache.putAllAsync(map); |
| |
| assertNull(f2.get()); |
| assertNull(f1.get()); |
| |
| checkSize(F.asSet("key1", "key2")); |
| |
| assert cache.get("key1") == 10; |
| assert cache.get("key2") == 20; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGetAndPutIfAbsent() throws Exception { |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| try { |
| assert cache.getAndPutIfAbsent("key", 1) == null; |
| |
| assert cache.get("key") != null; |
| assert cache.get("key") == 1; |
| |
| assert cache.getAndPutIfAbsent("key", 2) != null; |
| assert cache.getAndPutIfAbsent("key", 2) == 1; |
| |
| assert cache.get("key") != null; |
| assert cache.get("key") == 1; |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| assert cache.getAndPutIfAbsent("key", 2) != null; |
| |
| for (int i = 0; i < gridCount(); i++) { |
| info("Peek on node [i=" + i + ", id=" + grid(i).localNode().id() + ", val=" + |
| grid(i).cache(cacheName()).localPeek("key", ONHEAP) + ']'); |
| } |
| |
| assertEquals((Integer)1, cache.getAndPutIfAbsent("key", 2)); |
| |
| assert cache.get("key") != null; |
| assert cache.get("key") == 1; |
| |
| if (!storeEnabled()) |
| return; |
| |
| // Check swap. |
| cache.put("key2", 1); |
| |
| cache.localEvict(Collections.singleton("key2")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key2"); |
| |
| assertEquals((Integer)1, cache.getAndPutIfAbsent("key2", 3)); |
| |
| // Check db. |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| putToStore("key3", 3); |
| |
| assertEquals((Integer)3, cache.getAndPutIfAbsent("key3", 4)); |
| |
| assertEquals((Integer)3, cache.get("key3")); |
| } |
| |
| assertEquals((Integer)1, cache.get("key2")); |
| |
| cache.localEvict(Collections.singleton("key2")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key2"); |
| |
| // Same checks inside tx. |
| tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| try { |
| assertEquals((Integer)1, cache.getAndPutIfAbsent("key2", 3)); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| assertEquals((Integer)1, cache.get("key2")); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGetAndPutIfAbsentAsyncOld() throws Exception { |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteCache<String, Integer> cacheAsync = cache.withAsync(); |
| |
| try { |
| cacheAsync.getAndPutIfAbsent("key", 1); |
| |
| IgniteFuture<Integer> fut1 = cacheAsync.future(); |
| |
| assertNull(fut1.get()); |
| assertEquals((Integer)1, cache.get("key")); |
| |
| cacheAsync.getAndPutIfAbsent("key", 2); |
| |
| IgniteFuture<Integer> fut2 = cacheAsync.future(); |
| |
| assertEquals((Integer)1, fut2.get()); |
| assertEquals((Integer)1, cache.get("key")); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| if (!storeEnabled()) |
| return; |
| |
| // Check swap. |
| cache.put("key2", 1); |
| |
| cache.localEvict(Collections.singleton("key2")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key2"); |
| |
| cacheAsync.getAndPutIfAbsent("key2", 3); |
| |
| assertEquals((Integer)1, cacheAsync.<Integer>future().get()); |
| |
| // Check db. |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| putToStore("key3", 3); |
| |
| cacheAsync.getAndPutIfAbsent("key3", 4); |
| |
| assertEquals((Integer)3, cacheAsync.<Integer>future().get()); |
| } |
| |
| cache.localEvict(Collections.singleton("key2")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key2"); |
| |
| // Same checks inside tx. |
| tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| try { |
| cacheAsync.getAndPutIfAbsent("key2", 3); |
| |
| assertEquals(1, cacheAsync.future().get()); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| assertEquals((Integer)1, cache.get("key2")); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGetAndPutIfAbsentAsync() throws Exception { |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| try { |
| IgniteFuture<Integer> fut1 = cache.getAndPutIfAbsentAsync("key", 1); |
| |
| assertNull(fut1.get()); |
| assertEquals((Integer)1, cache.get("key")); |
| |
| IgniteFuture<Integer> fut2 = cache.getAndPutIfAbsentAsync("key", 2); |
| |
| assertEquals((Integer)1, fut2.get()); |
| assertEquals((Integer)1, cache.get("key")); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| if (!storeEnabled()) |
| return; |
| |
| // Check swap. |
| cache.put("key2", 1); |
| |
| cache.localEvict(Collections.singleton("key2")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key2"); |
| |
| assertEquals((Integer)1, cache.getAndPutIfAbsentAsync("key2", 3).get()); |
| |
| // Check db. |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| putToStore("key3", 3); |
| |
| assertEquals((Integer)3, cache.getAndPutIfAbsentAsync("key3", 4).get()); |
| } |
| |
| cache.localEvict(Collections.singleton("key2")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key2"); |
| |
| // Same checks inside tx. |
| tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| try { |
| assertEquals(1, (int)cache.getAndPutIfAbsentAsync("key2", 3).get()); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| assertEquals((Integer)1, cache.get("key2")); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPutIfAbsent() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| assertNull(cache.get("key")); |
| assert cache.putIfAbsent("key", 1); |
| assert cache.get("key") != null && cache.get("key") == 1; |
| assert !cache.putIfAbsent("key", 2); |
| assert cache.get("key") != null && cache.get("key") == 1; |
| |
| if (!storeEnabled()) |
| return; |
| |
| // Check swap. |
| cache.put("key2", 1); |
| |
| cache.localEvict(Collections.singleton("key2")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key2"); |
| |
| assertFalse(cache.putIfAbsent("key2", 3)); |
| |
| // Check db. |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| putToStore("key3", 3); |
| |
| assertFalse(cache.putIfAbsent("key3", 4)); |
| } |
| |
| cache.localEvict(Collections.singleton("key2")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key2"); |
| |
| // Same checks inside tx. |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| try { |
| assertFalse(cache.putIfAbsent("key2", 3)); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| assertEquals((Integer)1, cache.get("key2")); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutxIfAbsentAsyncOld() throws Exception { |
| if (txShouldBeUsed()) |
| checkPutxIfAbsentAsyncOld(true); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutxIfAbsentAsyncOldNoTx() throws Exception { |
| checkPutxIfAbsentAsyncOld(false); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutxIfAbsentAsync() throws Exception { |
| if (txShouldBeUsed()) |
| checkPutxIfAbsentAsync(true); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutxIfAbsentAsyncNoTx() throws Exception { |
| checkPutxIfAbsentAsync(false); |
| } |
| |
| /** |
| * @param inTx In tx flag. |
| * @throws Exception If failed. |
| */ |
| private void checkPutxIfAbsentAsyncOld(boolean inTx) throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteCache<String, Integer> cacheAsync = cache.withAsync(); |
| |
| cacheAsync.putIfAbsent("key", 1); |
| |
| IgniteFuture<Boolean> fut1 = cacheAsync.future(); |
| |
| assert fut1.get(); |
| assert cache.get("key") != null && cache.get("key") == 1; |
| |
| cacheAsync.putIfAbsent("key", 2); |
| |
| IgniteFuture<Boolean> fut2 = cacheAsync.future(); |
| |
| assert !fut2.get(); |
| assert cache.get("key") != null && cache.get("key") == 1; |
| |
| if (!storeEnabled()) |
| return; |
| |
| // Check swap. |
| cache.put("key2", 1); |
| |
| cache.localEvict(Collections.singleton("key2")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key2"); |
| |
| cacheAsync.putIfAbsent("key2", 3); |
| |
| assertFalse(cacheAsync.<Boolean>future().get()); |
| |
| // Check db. |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| putToStore("key3", 3); |
| |
| cacheAsync.putIfAbsent("key3", 4); |
| |
| assertFalse(cacheAsync.<Boolean>future().get()); |
| } |
| |
| cache.localEvict(Collections.singletonList("key2")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key2"); |
| |
| // Same checks inside tx. |
| Transaction tx = inTx ? transactions().txStart() : null; |
| |
| try { |
| cacheAsync.putIfAbsent("key2", 3); |
| |
| assertFalse(cacheAsync.<Boolean>future().get()); |
| |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| cacheAsync.putIfAbsent("key3", 4); |
| |
| assertFalse(cacheAsync.<Boolean>future().get()); |
| } |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| assertEquals((Integer)1, cache.get("key2")); |
| |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) |
| assertEquals((Integer)3, cache.get("key3")); |
| } |
| |
| /** |
| * @param inTx In tx flag. |
| * @throws Exception If failed. |
| */ |
| private void checkPutxIfAbsentAsync(boolean inTx) throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteFuture<Boolean> fut1 = cache.putIfAbsentAsync("key", 1); |
| |
| assert fut1.get(); |
| assert cache.get("key") != null && cache.get("key") == 1; |
| |
| IgniteFuture<Boolean> fut2 = cache.putIfAbsentAsync("key", 2); |
| |
| assert !fut2.get(); |
| assert cache.get("key") != null && cache.get("key") == 1; |
| |
| if (!storeEnabled()) |
| return; |
| |
| // Check swap. |
| cache.put("key2", 1); |
| |
| cache.localEvict(Collections.singleton("key2")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key2"); |
| |
| assertFalse(cache.putIfAbsentAsync("key2", 3).get()); |
| |
| // Check db. |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| putToStore("key3", 3); |
| |
| assertFalse(cache.putIfAbsentAsync("key3", 4).get()); |
| } |
| |
| cache.localEvict(Collections.singletonList("key2")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key2"); |
| |
| // Same checks inside tx. |
| Transaction tx = inTx ? transactions().txStart() : null; |
| |
| try { |
| assertFalse(cache.putIfAbsentAsync("key2", 3).get()); |
| |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) |
| assertFalse(cache.putIfAbsentAsync("key3", 4).get()); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| assertEquals((Integer)1, cache.get("key2")); |
| |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) |
| assertEquals((Integer)3, cache.get("key3")); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutIfAbsentAsyncOldConcurrent() throws Exception { |
| IgniteCache cacheAsync = jcache().withAsync(); |
| |
| cacheAsync.putIfAbsent("key1", 1); |
| |
| IgniteFuture<Boolean> fut1 = cacheAsync.future(); |
| |
| cacheAsync.putIfAbsent("key2", 2); |
| |
| IgniteFuture<Boolean> fut2 = cacheAsync.future(); |
| |
| assert fut1.get(); |
| assert fut2.get(); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPutIfAbsentAsyncConcurrent() throws Exception { |
| IgniteCache cache = jcache(); |
| |
| IgniteFuture<Boolean> fut1 = cache.putIfAbsentAsync("key1", 1); |
| |
| IgniteFuture<Boolean> fut2 = cache.putIfAbsentAsync("key2", 2); |
| |
| assert fut1.get(); |
| assert fut2.get(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGetAndReplace() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key", 1); |
| |
| assert cache.get("key") == 1; |
| |
| info("key 1 -> 2"); |
| |
| assert cache.getAndReplace("key", 2) == 1; |
| |
| assert cache.get("key") == 2; |
| |
| assert cache.getAndReplace("wrong", 0) == null; |
| |
| assert cache.get("wrong") == null; |
| |
| info("key 0 -> 3"); |
| |
| assert !cache.replace("key", 0, 3); |
| |
| assert cache.get("key") == 2; |
| |
| info("key 0 -> 3"); |
| |
| assert !cache.replace("key", 0, 3); |
| |
| assert cache.get("key") == 2; |
| |
| info("key 2 -> 3"); |
| |
| assert cache.replace("key", 2, 3); |
| |
| assert cache.get("key") == 3; |
| |
| if (!storeEnabled()) |
| return; |
| |
| info("evict key"); |
| |
| cache.localEvict(Collections.singleton("key")); |
| |
| info("key 3 -> 4"); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key"); |
| |
| assert cache.replace("key", 3, 4); |
| |
| assert cache.get("key") == 4; |
| |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| putToStore("key2", 5); |
| |
| info("key2 5 -> 6"); |
| |
| assert cache.replace("key2", 5, 6); |
| } |
| |
| for (int i = 0; i < gridCount(); i++) { |
| info("Peek key on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() + |
| ", peekVal=" + grid(i).cache(cacheName()).localPeek("key", ONHEAP) + ']'); |
| |
| info("Peek key2 on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() + |
| ", peekVal=" + grid(i).cache(cacheName()).localPeek("key2", ONHEAP) + ']'); |
| } |
| |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) |
| assertEquals((Integer)6, cache.get("key2")); |
| |
| cache.localEvict(Collections.singleton("key")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key"); |
| |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| try { |
| assert cache.replace("key", 4, 5); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| assert cache.get("key") == 5; |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testReplace() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key", 1); |
| |
| assert cache.get("key") == 1; |
| |
| assert cache.replace("key", 2); |
| |
| assert cache.get("key") == 2; |
| |
| assert !cache.replace("wrong", 2); |
| |
| if (!storeEnabled()) |
| return; |
| |
| cache.localEvict(Collections.singleton("key")); |
| |
| if (!isLoadPreviousValue()) |
| assert cache.get("key") == 2; |
| |
| assert cache.replace("key", 4); |
| |
| assert cache.get("key") == 4; |
| |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| putToStore("key2", 5); |
| |
| cache.replace("key2", 6); |
| |
| assertEquals((Integer)6, cache.get("key2")); |
| } |
| |
| cache.localEvict(Collections.singleton("key")); |
| |
| if (!isLoadPreviousValue()) |
| assert cache.get("key") == 4; |
| |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| try { |
| assert cache.replace("key", 5); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| assert cache.get("key") == 5; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGetAndReplaceAsyncOld() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteCache<String, Integer> cacheAsync = cache.withAsync(); |
| |
| cache.put("key", 1); |
| |
| assert cache.get("key") == 1; |
| |
| cacheAsync.getAndReplace("key", 2); |
| |
| assert cacheAsync.<Integer>future().get() == 1; |
| |
| assert cache.get("key") == 2; |
| |
| cacheAsync.getAndReplace("wrong", 0); |
| |
| assert cacheAsync.future().get() == null; |
| |
| assert cache.get("wrong") == null; |
| |
| cacheAsync.replace("key", 0, 3); |
| |
| assert !cacheAsync.<Boolean>future().get(); |
| |
| assert cache.get("key") == 2; |
| |
| cacheAsync.replace("key", 0, 3); |
| |
| assert !cacheAsync.<Boolean>future().get(); |
| |
| assert cache.get("key") == 2; |
| |
| cacheAsync.replace("key", 2, 3); |
| |
| assert cacheAsync.<Boolean>future().get(); |
| |
| assert cache.get("key") == 3; |
| |
| if (!storeEnabled()) |
| return; |
| |
| cache.localEvict(Collections.singleton("key")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key"); |
| |
| cacheAsync.replace("key", 3, 4); |
| |
| assert cacheAsync.<Boolean>future().get(); |
| |
| assert cache.get("key") == 4; |
| |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| putToStore("key2", 5); |
| |
| cacheAsync.replace("key2", 5, 6); |
| |
| assert cacheAsync.<Boolean>future().get(); |
| |
| assertEquals((Integer)6, cache.get("key2")); |
| } |
| |
| cache.localEvict(Collections.singleton("key")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key"); |
| |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| try { |
| cacheAsync.replace("key", 4, 5); |
| |
| assert cacheAsync.<Boolean>future().get(); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| assert cache.get("key") == 5; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGetAndReplaceAsync() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key", 1); |
| |
| assert cache.get("key") == 1; |
| |
| assert cache.getAndReplaceAsync("key", 2).get() == 1; |
| |
| assert cache.get("key") == 2; |
| |
| assert cache.getAndReplaceAsync("wrong", 0).get() == null; |
| |
| assert cache.get("wrong") == null; |
| |
| assert !cache.replaceAsync("key", 0, 3).get(); |
| |
| assert cache.get("key") == 2; |
| |
| assert !cache.replaceAsync("key", 0, 3).get(); |
| |
| assert cache.get("key") == 2; |
| |
| assert cache.replaceAsync("key", 2, 3).get(); |
| |
| assert cache.get("key") == 3; |
| |
| if (!storeEnabled()) |
| return; |
| |
| cache.localEvict(Collections.singleton("key")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key"); |
| |
| assert cache.replaceAsync("key", 3, 4).get(); |
| |
| assert cache.get("key") == 4; |
| |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| putToStore("key2", 5); |
| |
| assert cache.replaceAsync("key2", 5, 6).get(); |
| |
| assertEquals((Integer)6, cache.get("key2")); |
| } |
| |
| cache.localEvict(Collections.singleton("key")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key"); |
| |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| try { |
| assert cache.replaceAsync("key", 4, 5).get(); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| assert cache.get("key") == 5; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testReplacexAsyncOld() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteCache<String, Integer> cacheAsync = cache.withAsync(); |
| |
| cache.put("key", 1); |
| |
| assert cache.get("key") == 1; |
| |
| cacheAsync.replace("key", 2); |
| |
| assert cacheAsync.<Boolean>future().get(); |
| |
| info("Finished replace."); |
| |
| assertEquals((Integer)2, cache.get("key")); |
| |
| cacheAsync.replace("wrond", 2); |
| |
| assert !cacheAsync.<Boolean>future().get(); |
| |
| if (!storeEnabled()) |
| return; |
| |
| cache.localEvict(Collections.singleton("key")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key"); |
| |
| cacheAsync.replace("key", 4); |
| |
| assert cacheAsync.<Boolean>future().get(); |
| |
| assert cache.get("key") == 4; |
| |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| putToStore("key2", 5); |
| |
| cacheAsync.replace("key2", 6); |
| |
| assert cacheAsync.<Boolean>future().get(); |
| |
| assert cache.get("key2") == 6; |
| } |
| |
| cache.localEvict(Collections.singleton("key")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key"); |
| |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| try { |
| cacheAsync.replace("key", 5); |
| |
| assert cacheAsync.<Boolean>future().get(); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| assert cache.get("key") == 5; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testReplacexAsync() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key", 1); |
| |
| assert cache.get("key") == 1; |
| |
| assert cache.replaceAsync("key", 2).get(); |
| |
| info("Finished replace."); |
| |
| assertEquals((Integer)2, cache.get("key")); |
| |
| assert !cache.replaceAsync("wrond", 2).get(); |
| |
| if (!storeEnabled()) |
| return; |
| |
| cache.localEvict(Collections.singleton("key")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key"); |
| |
| assert cache.replaceAsync("key", 4).get(); |
| |
| assert cache.get("key") == 4; |
| |
| if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { |
| putToStore("key2", 5); |
| |
| assert cache.replaceAsync("key2", 6).get(); |
| |
| assert cache.get("key2") == 6; |
| } |
| |
| cache.localEvict(Collections.singleton("key")); |
| |
| if (!isLoadPreviousValue()) |
| cache.get("key"); |
| |
| Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; |
| |
| try { |
| assert cache.replaceAsync("key", 5).get(); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| assert cache.get("key") == 5; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGetAndRemove() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| assert !cache.remove("key1", 0); |
| assert cache.get("key1") != null && cache.get("key1") == 1; |
| assert cache.remove("key1", 1); |
| assert cache.get("key1") == null; |
| assert cache.getAndRemove("key2") == 2; |
| assert cache.get("key2") == null; |
| assert cache.getAndRemove("key2") == null; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings("serial") |
| @Test |
| public void testGetAndRemoveObject() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| IgniteCache<String, Object> cache = ignite(0).cache(cacheName()); |
| |
| Map<String, Object> map = new HashMap<>(); |
| |
| for (int i = 0; i < CNT; i++) |
| map.put("key" + i, value(i)); |
| |
| for (Map.Entry<String, Object> e : map.entrySet()) { |
| final String key = e.getKey(); |
| final Object val = e.getValue(); |
| |
| cache.put(key, val); |
| |
| assertFalse(cache.remove(key, new SerializableObject(-1))); |
| |
| Object oldVal = cache.get(key); |
| |
| assertNotNull(oldVal); |
| assertEquals(val, oldVal); |
| |
| assertTrue(cache.remove(key)); |
| |
| assertNull(cache.get(key)); |
| } |
| |
| for (Map.Entry<String, Object> e : map.entrySet()) { |
| final String key = e.getKey(); |
| final Object val = e.getValue(); |
| |
| cache.put(key, val); |
| |
| Object oldVal = cache.getAndRemove(key); |
| |
| assertEquals(val, oldVal); |
| |
| assertNull(cache.get(key)); |
| assertNull(cache.getAndRemove(key)); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGetAndPutSerializableObject() throws Exception { |
| IgniteCache<String, SerializableObject> cache = ignite(0).cache(cacheName()); |
| |
| SerializableObject val1 = new SerializableObject(1); |
| SerializableObject val2 = new SerializableObject(2); |
| |
| cache.put("key1", val1); |
| |
| SerializableObject oldVal = cache.get("key1"); |
| |
| assertEquals(val1, oldVal); |
| |
| oldVal = cache.getAndPut("key1", val2); |
| |
| assertEquals(val1, oldVal); |
| |
| SerializableObject updVal = cache.get("key1"); |
| |
| assertEquals(val2, updVal); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDeletedEntriesFlag() throws Exception { |
| if (cacheMode() == PARTITIONED) { |
| final int cnt = 3; |
| |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| for (int i = 0; i < cnt; i++) |
| cache.put(String.valueOf(i), i); |
| |
| for (int i = 0; i < cnt; i++) |
| cache.remove(String.valueOf(i)); |
| |
| for (int g = 0; g < gridCount(); g++) |
| executeOnLocalOrRemoteJvm(g, new CheckEntriesDeletedTask(cnt, cacheName())); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testRemoveLoad() throws Exception { |
| if (!storeEnabled()) |
| return; |
| |
| int cnt = 10; |
| |
| Set<String> keys = new HashSet<>(); |
| |
| for (int i = 0; i < cnt; i++) |
| keys.add(String.valueOf(i)); |
| |
| jcache().removeAll(keys); |
| |
| for (String key : keys) |
| putToStore(key, Integer.parseInt(key)); |
| |
| for (int g = 0; g < gridCount(); g++) |
| grid(g).cache(cacheName()).localLoadCache(null); |
| |
| for (int g = 0; g < gridCount(); g++) { |
| for (int i = 0; i < cnt; i++) { |
| String key = String.valueOf(i); |
| |
| if (grid(0).affinity(cacheName()).mapKeyToPrimaryAndBackups(key).contains(grid(g).localNode())) |
| assertEquals(i, jcache(g).localPeek(key)); |
| else |
| assertNull(jcache(g).localPeek(key)); |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testRemoveAsyncOld() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteCache<String, Integer> cacheAsync = cache.withAsync(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| cacheAsync.remove("key1", 0); |
| |
| assert !cacheAsync.<Boolean>future().get(); |
| |
| assert cache.get("key1") != null && cache.get("key1") == 1; |
| |
| cacheAsync.remove("key1", 1); |
| |
| assert cacheAsync.<Boolean>future().get(); |
| |
| assert cache.get("key1") == null; |
| |
| cacheAsync.getAndRemove("key2"); |
| |
| assert cacheAsync.<Integer>future().get() == 2; |
| |
| assert cache.get("key2") == null; |
| |
| cacheAsync.getAndRemove("key2"); |
| |
| assert cacheAsync.future().get() == null; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testRemoveAsync() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| assert !cache.removeAsync("key1", 0).get(); |
| |
| assert cache.get("key1") != null && cache.get("key1") == 1; |
| |
| assert cache.removeAsync("key1", 1).get(); |
| |
| assert cache.get("key1") == null; |
| |
| assert cache.getAndRemoveAsync("key2").get() == 2; |
| |
| assert cache.get("key2") == null; |
| |
| assert cache.getAndRemoveAsync("key2").get() == null; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testRemove() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key1", 1); |
| |
| assert cache.remove("key1"); |
| assert cache.get("key1") == null; |
| assert !cache.remove("key1"); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testRemovexAsyncOld() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteCache<String, Integer> cacheAsync = cache.withAsync(); |
| |
| cache.put("key1", 1); |
| |
| cacheAsync.remove("key1"); |
| |
| assert cacheAsync.<Boolean>future().get(); |
| |
| assert cache.get("key1") == null; |
| |
| cacheAsync.remove("key1"); |
| |
| assert !cacheAsync.<Boolean>future().get(); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testRemovexAsync() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key1", 1); |
| |
| assert cache.removeAsync("key1").get(); |
| |
| assert cache.get("key1") == null; |
| |
| assert !cache.removeAsync("key1").get(); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGlobalRemoveAll() throws Exception { |
| globalRemoveAll(false, false); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGlobalRemoveAllAsyncOld() throws Exception { |
| globalRemoveAll(true, true); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testGlobalRemoveAllAsync() throws Exception { |
| globalRemoveAll(true, false); |
| } |
| |
| /** |
| * @param async If {@code true} uses asynchronous operation. |
| * @param oldAsync Use old async API. |
| * @throws Exception In case of error. |
| */ |
| private void globalRemoveAll(boolean async, boolean oldAsync) throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| cache.put("key3", 3); |
| |
| checkSize(F.asSet("key1", "key2", "key3")); |
| |
| IgniteCache<String, Integer> asyncCache = cache.withAsync(); |
| |
| if (async) { |
| if (oldAsync) { |
| asyncCache.removeAll(F.asSet("key1", "key2")); |
| |
| asyncCache.future().get(); |
| } |
| else |
| cache.removeAllAsync(F.asSet("key1", "key2")).get(); |
| } |
| else |
| cache.removeAll(F.asSet("key1", "key2")); |
| |
| checkSize(F.asSet("key3")); |
| |
| checkContainsKey(false, "key1"); |
| checkContainsKey(false, "key2"); |
| checkContainsKey(true, "key3"); |
| |
| // Put values again. |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| cache.put("key3", 3); |
| |
| if (async) { |
| if (oldAsync) { |
| IgniteCache asyncCache0 = jcache(gridCount() > 1 ? 1 : 0).withAsync(); |
| |
| asyncCache0.removeAll(); |
| |
| asyncCache0.future().get(); |
| } |
| else |
| jcache(gridCount() > 1 ? 1 : 0).removeAllAsync().get(); |
| } |
| else |
| jcache(gridCount() > 1 ? 1 : 0).removeAll(); |
| |
| assertEquals(0, cache.localSize()); |
| long entryCnt = hugeRemoveAllEntryCount(); |
| |
| for (int i = 0; i < entryCnt; i++) |
| cache.put(String.valueOf(i), i); |
| |
| for (int i = 0; i < entryCnt; i++) |
| assertEquals(Integer.valueOf(i), cache.get(String.valueOf(i))); |
| |
| if (async) { |
| if (oldAsync) { |
| asyncCache.removeAll(); |
| |
| asyncCache.future().get(); |
| } |
| else |
| cache.removeAllAsync().get(); |
| } |
| else |
| cache.removeAll(); |
| |
| for (int i = 0; i < entryCnt; i++) |
| assertNull(cache.get(String.valueOf(i))); |
| } |
| |
| /** |
| * @return Count of entries to be removed in removeAll() test. |
| */ |
| protected long hugeRemoveAllEntryCount() { |
| return 1000L; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testRemoveAllWithNulls() throws Exception { |
| final IgniteCache<String, Integer> cache = jcache(); |
| |
| final Set<String> c = new LinkedHashSet<>(); |
| |
| c.add("key1"); |
| c.add(null); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.removeAll(c); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| assertEquals(0, jcache().localSize()); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.removeAll(null); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.remove(null); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.getAndRemove(null); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| cache.remove("key1", null); |
| |
| return null; |
| } |
| }, NullPointerException.class, null); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testRemoveAllDuplicates() throws Exception { |
| jcache().removeAll(ImmutableSet.of("key1", "key1", "key1")); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testRemoveAllDuplicatesTx() throws Exception { |
| if (txShouldBeUsed()) { |
| try (Transaction tx = transactions().txStart()) { |
| jcache().removeAll(ImmutableSet.of("key1", "key1", "key1")); |
| |
| tx.commit(); |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testRemoveAllEmpty() throws Exception { |
| jcache().removeAll(); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testRemoveAllAsyncOld() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| IgniteCache<String, Integer> cacheAsync = cache.withAsync(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| cache.put("key3", 3); |
| |
| checkSize(F.asSet("key1", "key2", "key3")); |
| |
| cacheAsync.removeAll(F.asSet("key1", "key2")); |
| |
| assertNull(cacheAsync.future().get()); |
| |
| checkSize(F.asSet("key3")); |
| |
| checkContainsKey(false, "key1"); |
| checkContainsKey(false, "key2"); |
| checkContainsKey(true, "key3"); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testRemoveAllAsync() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| cache.put("key3", 3); |
| |
| checkSize(F.asSet("key1", "key2", "key3")); |
| |
| assertNull(cache.removeAllAsync(F.asSet("key1", "key2")).get()); |
| |
| checkSize(F.asSet("key3")); |
| |
| checkContainsKey(false, "key1"); |
| checkContainsKey(false, "key2"); |
| checkContainsKey(true, "key3"); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-11885") |
| @Test |
| public void testLoadAll() throws Exception { |
| if (!storeEnabled()) |
| return; |
| |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| Set<String> keys = new HashSet<>(primaryKeysForCache(2)); |
| |
| for (String key : keys) |
| assertNull(cache.localPeek(key, ONHEAP)); |
| |
| Map<String, Integer> vals = new HashMap<>(); |
| |
| int i = 0; |
| |
| for (String key : keys) { |
| cache.put(key, i); |
| |
| vals.put(key, i); |
| |
| i++; |
| } |
| |
| for (String key : keys) |
| assertEquals(vals.get(key), cache.localPeek(key)); |
| |
| cache.clear(); |
| |
| for (String key : keys) |
| assertNull(cache.localPeek(key)); |
| |
| loadAll(cache, keys, true); |
| |
| for (String key : keys) |
| assertEquals(vals.get(key), cache.localPeek(key)); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testRemoveAfterClear() throws Exception { |
| IgniteEx ignite = grid(0); |
| |
| boolean affNode = ignite.context().cache().internalCache(cacheName()).context().affinityNode(); |
| |
| if (!affNode) { |
| if (gridCount() < 2) |
| return; |
| |
| ignite = grid(1); |
| } |
| |
| IgniteCache<Integer, Integer> cache = ignite.cache(cacheName()); |
| |
| int key = 0; |
| |
| Collection<Integer> keys = new ArrayList<>(); |
| |
| for (int k = 0; k < 2; k++) { |
| while (!ignite.affinity(cacheName()).isPrimary(ignite.localNode(), key)) |
| key++; |
| |
| keys.add(key); |
| |
| key++; |
| } |
| |
| info("Keys: " + keys); |
| |
| for (Integer k : keys) |
| cache.put(k, k); |
| |
| cache.clear(); |
| |
| for (int g = 0; g < gridCount(); g++) { |
| Ignite grid0 = grid(g); |
| |
| grid0.cache(cacheName()).removeAll(); |
| |
| assertTrue(grid0.cache(cacheName()).localSize() == 0); |
| } |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-11885") |
| @Test |
| public void testClear() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| Set<String> keys = new HashSet<>(primaryKeysForCache(3)); |
| |
| for (String key : keys) |
| assertNull(cache.get(key)); |
| |
| Map<String, Integer> vals = new HashMap<>(keys.size()); |
| |
| int i = 0; |
| |
| for (String key : keys) { |
| cache.put(key, i); |
| |
| vals.put(key, i); |
| |
| i++; |
| } |
| |
| for (String key : keys) |
| assertEquals(vals.get(key), cache.localPeek(key)); |
| |
| cache.clear(); |
| |
| for (String key : keys) |
| assertNull(cache.localPeek(key)); |
| |
| for (i = 0; i < gridCount(); i++) |
| jcache(i).clear(); |
| |
| for (i = 0; i < gridCount(); i++) |
| assert jcache(i).localSize() == 0; |
| |
| for (Map.Entry<String, Integer> entry : vals.entrySet()) |
| cache.put(entry.getKey(), entry.getValue()); |
| |
| for (String key : keys) |
| assertEquals(vals.get(key), cache.localPeek(key)); |
| |
| String first = F.first(keys); |
| |
| if (lockingEnabled()) { |
| Lock lock = cache.lock(first); |
| |
| lock.lock(); |
| |
| try { |
| cache.clear(); |
| |
| GridCacheContext<String, Integer> cctx = context(0); |
| |
| GridCacheEntryEx entry = cctx.isNear() ? cctx.near().dht().peekEx(first) : |
| cctx.cache().peekEx(first); |
| |
| assertNotNull(entry); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| else { |
| cache.clear(); |
| |
| cache.put(first, vals.get(first)); |
| } |
| |
| cache.clear(); |
| |
| assert cache.localSize() == 0 : "Values after clear."; |
| |
| i = 0; |
| |
| for (String key : keys) { |
| cache.put(key, i); |
| |
| vals.put(key, i); |
| |
| i++; |
| } |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| cache.localEvict(Sets.union(ImmutableSet.of("key1", "key2"), keys)); |
| |
| assert cache.localSize(ONHEAP) == 0; |
| |
| // TODO: GG-11148 check if test for promote makes sense. |
| // cache.clear(); |
| // |
| // cache.localPromote(ImmutableSet.of("key2", "key1")); |
| // |
| // assert cache.localPeek("key1", ONHEAP) == null; |
| // assert cache.localPeek("key2", ONHEAP) == null; |
| } |
| |
| /** |
| * @param keys0 Keys to check. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected void checkUnlocked(final Collection<String> keys0) throws IgniteCheckedException { |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| try { |
| for (int i = 0; i < gridCount(); i++) { |
| GridCacheAdapter<Object, Object> cache = ((IgniteKernal)ignite(i)).internalCache(cacheName()); |
| |
| for (String key : keys0) { |
| GridCacheEntryEx entry = cache.peekEx(key); |
| |
| if (entry != null) { |
| if (entry.lockedByAny()) { |
| info("Entry is still locked [i=" + i + ", entry=" + entry + ']'); |
| |
| return false; |
| } |
| } |
| |
| if (cache.isNear()) { |
| entry = cache.context().near().dht().peekEx(key); |
| |
| if (entry != null) { |
| if (entry.lockedByAny()) { |
| info("Entry is still locked [i=" + i + ", entry=" + entry + ']'); |
| |
| return false; |
| } |
| } |
| } |
| } |
| } |
| |
| return true; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| info("Entry was removed, will retry"); |
| |
| return false; |
| } |
| } |
| }, 10_000); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGlobalClearAll() throws Exception { |
| globalClearAll(false, false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGlobalClearAllAsyncOld() throws Exception { |
| globalClearAll(true, true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGlobalClearAllAsync() throws Exception { |
| globalClearAll(true, false); |
| } |
| |
| /** |
| * @param async If {@code true} uses async method. |
| * @param oldAsync Use old async API. |
| * @throws Exception If failed. |
| */ |
| protected void globalClearAll(boolean async, boolean oldAsync) throws Exception { |
| // Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries |
| // because some of them were blocked due to having readers. |
| for (int i = 0; i < gridCount(); i++) { |
| for (String key : primaryKeysForCache(i, 3, 100_000)) |
| jcache(i).put(key, 1); |
| } |
| |
| if (async) { |
| if (oldAsync) { |
| IgniteCache asyncCache = jcache().withAsync(); |
| |
| asyncCache.clear(); |
| |
| asyncCache.future().get(); |
| } |
| else |
| jcache().clearAsync().get(); |
| } |
| else |
| jcache().clear(); |
| |
| for (int i = 0; i < gridCount(); i++) |
| assert jcache(i).localSize() == 0; |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @SuppressWarnings("BusyWait") |
| @Test |
| public void testLockUnlock() throws Exception { |
| if (lockingEnabled()) { |
| final CountDownLatch lockCnt = new CountDownLatch(1); |
| final CountDownLatch unlockCnt = new CountDownLatch(1); |
| |
| grid(0).events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| switch (evt.type()) { |
| case EVT_CACHE_OBJECT_LOCKED: |
| lockCnt.countDown(); |
| |
| break; |
| case EVT_CACHE_OBJECT_UNLOCKED: |
| unlockCnt.countDown(); |
| |
| break; |
| } |
| |
| return true; |
| } |
| }, EVT_CACHE_OBJECT_LOCKED, EVT_CACHE_OBJECT_UNLOCKED); |
| |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| String key = primaryKeysForCache(1).get(0); |
| |
| cache.put(key, 1); |
| |
| assert !cache.isLocalLocked(key, false); |
| |
| Lock lock = cache.lock(key); |
| |
| lock.lock(); |
| |
| try { |
| lockCnt.await(); |
| |
| assert cache.isLocalLocked(key, false); |
| } |
| finally { |
| lock.unlock(); |
| } |
| |
| unlockCnt.await(); |
| |
| for (int i = 0; i < 100; i++) |
| if (cache.isLocalLocked(key, false)) |
| Thread.sleep(10); |
| else |
| break; |
| |
| assert !cache.isLocalLocked(key, false); |
| } |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @SuppressWarnings("BusyWait") |
| @Test |
| public void testLockUnlockAll() throws Exception { |
| if (lockingEnabled()) { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| assert !cache.isLocalLocked("key1", false); |
| assert !cache.isLocalLocked("key2", false); |
| |
| Lock lock1_2 = cache.lockAll(ImmutableSet.of("key1", "key2")); |
| |
| lock1_2.lock(); |
| |
| try { |
| assert cache.isLocalLocked("key1", false); |
| assert cache.isLocalLocked("key2", false); |
| } |
| finally { |
| lock1_2.unlock(); |
| } |
| |
| for (int i = 0; i < 100; i++) |
| if (cache.isLocalLocked("key1", false) || cache.isLocalLocked("key2", false)) |
| Thread.sleep(10); |
| else |
| break; |
| |
| assert !cache.isLocalLocked("key1", false); |
| assert !cache.isLocalLocked("key2", false); |
| |
| lock1_2.lock(); |
| |
| try { |
| assert cache.isLocalLocked("key1", false); |
| assert cache.isLocalLocked("key2", false); |
| } |
| finally { |
| lock1_2.unlock(); |
| } |
| |
| for (int i = 0; i < 100; i++) |
| if (cache.isLocalLocked("key1", false) || cache.isLocalLocked("key2", false)) |
| Thread.sleep(10); |
| else |
| break; |
| |
| assert !cache.isLocalLocked("key1", false); |
| assert !cache.isLocalLocked("key2", false); |
| } |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testPeek() throws Exception { |
| Ignite ignite = primaryIgnite("key"); |
| IgniteCache<String, Integer> cache = ignite.cache(cacheName()); |
| |
| assertNull(cache.localPeek("key")); |
| |
| cache.put("key", 1); |
| |
| cache.replace("key", 2); |
| |
| assertEquals(2, cache.localPeek("key").intValue()); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPeekTxRemoveOptimistic() throws Exception { |
| checkPeekTxRemove(OPTIMISTIC); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPeekTxRemovePessimistic() throws Exception { |
| checkPeekTxRemove(PESSIMISTIC); |
| } |
| |
| /** |
| * @param concurrency Concurrency. |
| * @throws Exception If failed. |
| */ |
| private void checkPeekTxRemove(TransactionConcurrency concurrency) throws Exception { |
| if (txShouldBeUsed()) { |
| Ignite ignite = primaryIgnite("key"); |
| IgniteCache<String, Integer> cache = ignite.cache(cacheName()); |
| |
| cache.put("key", 1); |
| |
| try (Transaction tx = ignite.transactions().txStart(concurrency, READ_COMMITTED)) { |
| cache.remove("key"); |
| |
| assertNull(cache.get("key")); // localPeek ignores transactions. |
| assertNotNull(cache.localPeek("key")); // localPeek ignores transactions. |
| |
| tx.commit(); |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPeekRemove() throws Exception { |
| IgniteCache<String, Integer> cache = primaryCache("key"); |
| |
| cache.put("key", 1); |
| cache.remove("key"); |
| |
| assertNull(cache.localPeek("key")); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testEvictExpired() throws Exception { |
| final IgniteCache<String, Integer> cache = jcache(); |
| |
| final String key = primaryKeysForCache(1).get(0); |
| |
| cache.put(key, 1); |
| |
| assertEquals((Integer)1, cache.get(key)); |
| |
| long ttl = 500; |
| |
| final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl)); |
| |
| grid(0).cache(cacheName()).withExpiryPolicy(expiry).put(key, 1); |
| |
| boolean wait = waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| for (int i = 0; i < gridCount(); i++) { |
| if (jcache(i).localPeek(key) != null) |
| return false; |
| } |
| |
| return true; |
| } |
| }, ttl + 1000); |
| |
| assertTrue("Failed to wait for entry expiration.", wait); |
| |
| // Expired entry should not be swapped. |
| cache.localEvict(Collections.singleton(key)); |
| |
| assertNull(cache.localPeek("key")); |
| |
| assertNull(cache.localPeek(key, ONHEAP)); |
| |
| assertTrue(cache.localSize() == 0); |
| |
| if (storeEnabled()) { |
| load(cache, key, true); |
| |
| Affinity<String> aff = ignite(0).affinity(cacheName()); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| if (aff.isPrimary(grid(i).cluster().localNode(), key)) |
| assertEquals(1, jcache(i).localPeek(key)); |
| |
| if (aff.isBackup(grid(i).cluster().localNode(), key)) |
| assertEquals(1, jcache(i).localPeek(key)); |
| } |
| } |
| } |
| |
| /** |
| * JUnit. |
| * |
| * @throws Exception If failed. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-11885") |
| @Test |
| public void testPeekExpired() throws Exception { |
| final IgniteCache<String, Integer> c = jcache(); |
| |
| final String key = primaryKeysForCache(1).get(0); |
| |
| info("Using key: " + key); |
| |
| c.put(key, 1); |
| |
| assertEquals(Integer.valueOf(1), c.localPeek(key)); |
| |
| int ttl = 500; |
| |
| final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl)); |
| |
| c.withExpiryPolicy(expiry).put(key, 1); |
| |
| Thread.sleep(ttl + 100); |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return c.localPeek(key) == null; |
| } |
| }, 2000); |
| |
| assertNull(c.localPeek(key)); |
| |
| assert c.localSize() == 0 : "Cache is not empty."; |
| } |
| |
| /** |
| * JUnit. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPeekExpiredTx() throws Exception { |
| if (txShouldBeUsed()) { |
| final IgniteCache<String, Integer> c = jcache(); |
| |
| final String key = "1"; |
| int ttl = 500; |
| |
| try (Transaction tx = grid(0).transactions().txStart()) { |
| final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl)); |
| |
| grid(0).cache(cacheName()).withExpiryPolicy(expiry).put(key, 1); |
| |
| tx.commit(); |
| } |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return c.localPeek(key) == null; |
| } |
| }, 2000); |
| |
| assertNull(c.localPeek(key)); |
| |
| assert c.localSize() == 0; |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTtlTx() throws Exception { |
| if (txShouldBeUsed()) |
| checkTtl(true, false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTtlNoTx() throws Exception { |
| checkTtl(false, false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTtlNoTxOldEntry() throws Exception { |
| checkTtl(false, true); |
| } |
| |
| /** |
| * @param inTx In tx flag. |
| * @param oldEntry {@code True} to check TTL on old entry, {@code false} on new. |
| * @throws Exception If failed. |
| */ |
| private void checkTtl(boolean inTx, boolean oldEntry) throws Exception { |
| int ttlVals[] = {600, 1000, 3000}; |
| |
| int i = 0; |
| while (i < ttlVals.length) { |
| try { |
| checkTtl0(inTx, oldEntry, ttlVals[i]); |
| break; |
| } |
| catch (AssertionError e) { |
| if (i < ttlVals.length - 1) |
| info("Ttl test failed, try execute with increased ttl"); |
| else |
| throw e; |
| } |
| i++; |
| } |
| } |
| |
| /** |
| * @param inTx In tx flag. |
| * @param oldEntry {@code True} to check TTL on old entry, {@code false} on new. |
| * @param ttl TTL value. |
| * @throws Exception If failed. |
| */ |
| private void checkTtl0(boolean inTx, boolean oldEntry, int ttl) throws Exception { |
| final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl)); |
| |
| final IgniteCache<String, Integer> c = jcache(); |
| |
| final String key = primaryKeysForCache(1).get(0); |
| |
| IgnitePair<Long> entryTtl; |
| |
| if (oldEntry) { |
| c.put(key, 1); |
| |
| entryTtl = entryTtl(serverNodeCache(), key); |
| assertEquals((Long)0L, entryTtl.get1()); |
| assertEquals((Long)0L, entryTtl.get2()); |
| } |
| |
| long startTime = System.currentTimeMillis(); |
| |
| if (inTx) { |
| // Rollback transaction for the first time. |
| Transaction tx = transactions().txStart(); |
| |
| try { |
| jcache().withExpiryPolicy(expiry).put(key, 1); |
| } |
| finally { |
| tx.rollback(); |
| } |
| |
| if (oldEntry) { |
| entryTtl = entryTtl(serverNodeCache(), key); |
| |
| assertNotNull(entryTtl.get1()); |
| assertNotNull(entryTtl.get2()); |
| assertEquals((Long)0L, entryTtl.get1()); |
| assertEquals((Long)0L, entryTtl.get2()); |
| } |
| } |
| |
| // Now commit transaction and check that ttl and expire time have been saved. |
| Transaction tx = inTx ? transactions().txStart() : null; |
| |
| try { |
| jcache().withExpiryPolicy(expiry).put(key, 1); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| long[] expireTimes = new long[gridCount()]; |
| |
| for (int i = 0; i < gridCount(); i++) { |
| if (grid(i).affinity(cacheName()).isPrimaryOrBackup(grid(i).localNode(), key)) { |
| IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key); |
| |
| assertNotNull(curEntryTtl.get1()); |
| assertNotNull(curEntryTtl.get2()); |
| assertTrue(curEntryTtl.get2() > startTime); |
| expireTimes[i] = curEntryTtl.get2(); |
| } |
| } |
| |
| // One more update from the same cache entry to ensure that expire time is shifted forward. |
| U.sleep(100); |
| |
| tx = inTx ? transactions().txStart() : null; |
| |
| try { |
| jcache().withExpiryPolicy(expiry).put(key, 2); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| for (int i = 0; i < gridCount(); i++) { |
| if (grid(i).affinity(cacheName()).isPrimaryOrBackup(grid(i).localNode(), key)) { |
| IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key); |
| |
| assertNotNull(curEntryTtl.get1()); |
| assertNotNull(curEntryTtl.get2()); |
| assertTrue(curEntryTtl.get2() > startTime); |
| expireTimes[i] = curEntryTtl.get2(); |
| } |
| } |
| |
| // And one more direct update to ensure that expire time is shifted forward. |
| U.sleep(100); |
| |
| tx = inTx ? transactions().txStart() : null; |
| |
| try { |
| jcache().withExpiryPolicy(expiry).put(key, 3); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| for (int i = 0; i < gridCount(); i++) { |
| if (grid(i).affinity(cacheName()).isPrimaryOrBackup(grid(i).localNode(), key)) { |
| IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key); |
| |
| assertNotNull(curEntryTtl.get1()); |
| assertNotNull(curEntryTtl.get2()); |
| assertTrue(curEntryTtl.get2() > startTime); |
| expireTimes[i] = curEntryTtl.get2(); |
| } |
| } |
| |
| // And one more update to ensure that ttl is not changed and expire time is not shifted forward. |
| U.sleep(100); |
| |
| log.info("Put 4"); |
| |
| tx = inTx ? transactions().txStart() : null; |
| |
| try { |
| jcache().put(key, 4); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| log.info("Put 4 done"); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| if (grid(i).affinity(cacheName()).isPrimaryOrBackup(grid(i).localNode(), key)) { |
| IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key); |
| |
| assertNotNull(curEntryTtl.get1()); |
| assertNotNull(curEntryTtl.get2()); |
| assertEquals(expireTimes[i], (long)curEntryTtl.get2()); |
| } |
| } |
| |
| // Avoid reloading from store. |
| storeStgy.removeFromStore(key); |
| |
| assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() { |
| @Override public boolean applyx() { |
| try { |
| Integer val = c.get(key); |
| |
| if (val != null) { |
| info("Value is in cache [key=" + key + ", val=" + val + ']'); |
| |
| return false; |
| } |
| |
| // Get "cache" field from GridCacheProxyImpl. |
| GridCacheAdapter c0 = cacheFromCtx(c); |
| |
| if (!c0.context().deferredDelete()) { |
| GridCacheEntryEx e0 = c0.peekEx(key); |
| |
| return e0 == null || (e0.rawGet() == null && e0.valueBytes() == null); |
| } |
| else |
| return true; |
| } |
| catch (GridCacheEntryRemovedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| }, Math.min(ttl * 10, getTestTimeout()))); |
| |
| IgniteCache srvNodeCache = serverNodeCache(); |
| |
| assert c.get(key) == null; |
| |
| // Ensure that old TTL and expire time are not longer "visible". |
| entryTtl = entryTtl(srvNodeCache, key); |
| assertNull(entryTtl); |
| |
| // Ensure that next update will not pick old expire time. |
| |
| tx = inTx ? transactions().txStart() : null; |
| |
| try { |
| jcache().put(key, 10); |
| |
| if (tx != null) |
| tx.commit(); |
| } |
| finally { |
| if (tx != null) |
| tx.close(); |
| } |
| |
| U.sleep(ttl + 500); |
| |
| entryTtl = entryTtl(srvNodeCache, key); |
| |
| assertEquals((Integer)10, c.get(key)); |
| |
| assertNotNull(entryTtl.get1()); |
| assertNotNull(entryTtl.get2()); |
| assertEquals(0, (long)entryTtl.get1()); |
| assertEquals(0, (long)entryTtl.get2()); |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-11885") |
| @Test |
| public void testLocalEvict() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| List<String> keys = primaryKeysForCache(3); |
| |
| String key1 = keys.get(0); |
| String key2 = keys.get(1); |
| String key3 = keys.get(2); |
| |
| cache.put(key1, 1); |
| cache.put(key2, 2); |
| cache.put(key3, 3); |
| |
| assertEquals((Integer)1, cache.localPeek(key1)); |
| assertEquals((Integer)2, cache.localPeek(key2)); |
| assertEquals((Integer)3, cache.localPeek(key3)); |
| |
| cache.localEvict(F.asList(key1, key2)); |
| |
| assert cache.localPeek(key1, ONHEAP) == null; |
| assert cache.localPeek(key2, ONHEAP) == null; |
| assertEquals((Integer)3, cache.localPeek(key3, OFFHEAP)); |
| |
| if (storeEnabled()) { |
| loadAll(cache, ImmutableSet.of(key1, key2), true); |
| |
| Affinity<String> aff = ignite(0).affinity(cacheName()); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| if (aff.isPrimaryOrBackup(grid(i).cluster().localNode(), key1)) |
| assertEquals(1, jcache(i).localPeek(key1)); |
| |
| if (aff.isPrimaryOrBackup(grid(i).cluster().localNode(), key2)) |
| assertEquals(2, jcache(i).localPeek(key2)); |
| |
| if (aff.isPrimaryOrBackup(grid(i).cluster().localNode(), key3)) |
| assertEquals(3, jcache(i).localPeek(key3)); |
| } |
| } |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param k Key. |
| */ |
| private void checkKeyAfterLocalEvict(IgniteCache<String, Integer> cache, String k) { |
| assertNull(cache.localPeek(k, ONHEAP)); |
| assertNotNull(cache.localPeek(k, OFFHEAP)); |
| } |
| |
| /** |
| * JUnit. |
| */ |
| @Test |
| public void testCacheProxy() { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| assert cache instanceof IgniteCacheProxy; |
| } |
| |
| /** |
| * JUnit. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCompactExpired() throws Exception { |
| final IgniteCache<String, Integer> cache = jcache(); |
| |
| final String key = F.first(primaryKeysForCache(1)); |
| |
| cache.put(key, 1); |
| |
| long ttl = 500; |
| |
| final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl)); |
| |
| grid(0).cache(cacheName()).withExpiryPolicy(expiry).put(key, 1); |
| |
| waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return cache.localPeek(key) == null; |
| } |
| }, ttl + 1000); |
| |
| // Peek will actually remove entry from cache. |
| assertNull(cache.localPeek(key)); |
| |
| assert cache.localSize() == 0; |
| |
| // Clear readers, if any. |
| cache.remove(key); |
| } |
| |
| /** |
| * JUnit. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testOptimisticTxMissingKey() throws Exception { |
| if (txShouldBeUsed()) { |
| try (Transaction tx = transactions().txStart(OPTIMISTIC, READ_COMMITTED)) { |
| // Remove missing key. |
| assertFalse(jcache().remove(UUID.randomUUID().toString())); |
| |
| tx.commit(); |
| } |
| } |
| } |
| |
| /** |
| * JUnit. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testOptimisticTxMissingKeyNoCommit() throws Exception { |
| if (txShouldBeUsed()) { |
| try (Transaction tx = transactions().txStart(OPTIMISTIC, READ_COMMITTED)) { |
| // Remove missing key. |
| assertFalse(jcache().remove(UUID.randomUUID().toString())); |
| |
| tx.setRollbackOnly(); |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testOptimisticTxReadCommittedInTx() throws Exception { |
| checkRemovexInTx(OPTIMISTIC, READ_COMMITTED); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testOptimisticTxRepeatableReadInTx() throws Exception { |
| checkRemovexInTx(OPTIMISTIC, REPEATABLE_READ); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPessimisticTxReadCommittedInTx() throws Exception { |
| checkRemovexInTx(PESSIMISTIC, READ_COMMITTED); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPessimisticTxRepeatableReadInTx() throws Exception { |
| checkRemovexInTx(PESSIMISTIC, REPEATABLE_READ); |
| } |
| |
| /** |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @throws Exception If failed. |
| */ |
| private void checkRemovexInTx(final TransactionConcurrency concurrency, |
| final TransactionIsolation isolation) throws Exception { |
| if (txShouldBeUsed()) { |
| final int cnt = 10; |
| |
| CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<Object, Object>>() { |
| @Override public void applyx(IgniteCache cache) { |
| for (int i = 0; i < cnt; i++) |
| cache.put("key" + i, i); |
| } |
| }); |
| |
| CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<Object, Object>>() { |
| @Override public void applyx(IgniteCache<Object, Object> cache) { |
| for (int i = 0; i < cnt; i++) |
| assertEquals(new Integer(i), cache.get("key" + i)); |
| } |
| }); |
| |
| CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<Object, Object>>() { |
| @Override public void applyx(IgniteCache<Object, Object> cache) { |
| for (int i = 0; i < cnt; i++) { |
| boolean removed = cache.remove("key" + i); |
| |
| assertTrue(removed); |
| } |
| } |
| }); |
| |
| CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<Object, Object>>() { |
| @Override public void applyx(IgniteCache<Object, Object> cache) { |
| for (int i = 0; i < cnt; i++) |
| assertNull(cache.get("key" + i)); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * JUnit. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPessimisticTxMissingKey() throws Exception { |
| if (txShouldBeUsed()) { |
| try (Transaction tx = transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { |
| // Remove missing key. |
| assertFalse(jcache().remove(UUID.randomUUID().toString())); |
| |
| tx.commit(); |
| } |
| } |
| } |
| |
| /** |
| * JUnit. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPessimisticTxMissingKeyNoCommit() throws Exception { |
| if (txShouldBeUsed()) { |
| try (Transaction tx = transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { |
| // Remove missing key. |
| assertFalse(jcache().remove(UUID.randomUUID().toString())); |
| |
| tx.setRollbackOnly(); |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPessimisticTxRepeatableRead() throws Exception { |
| if (txShouldBeUsed()) { |
| try (Transaction ignored = transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { |
| jcache().put("key", 1); |
| |
| assert (Integer)jcache().get("key") == 1; |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPessimisticTxRepeatableReadOnUpdate() throws Exception { |
| if (txShouldBeUsed()) { |
| try (Transaction ignored = transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { |
| jcache().put("key", 1); |
| |
| assert (Integer)jcache().getAndPut("key", 2) == 1; |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception In case of error. |
| */ |
| @Test |
| public void testToMap() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(); |
| |
| cache.put("key1", 1); |
| cache.put("key2", 2); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| for (Cache.Entry entry : jcache(i)) |
| storeStgy.putToStore(entry.getKey(), entry.getValue()); |
| } |
| |
| assert storeStgy.getStoreSize() == 2; |
| assert (Integer)storeStgy.getFromStore("key1") == 1; |
| assert (Integer)storeStgy.getFromStore("key2") == 2; |
| } |
| |
| /** |
| * @param keys Expected keys. |
| * @throws Exception If failed. |
| */ |
| protected void checkSize(final Collection<String> keys) throws Exception { |
| if (nearEnabled()) |
| assertEquals(keys.size(), jcache().localSize(CachePeekMode.ALL)); |
| else { |
| for (int i = 0; i < gridCount(); i++) |
| executeOnLocalOrRemoteJvm(i, new CheckEntriesTask(keys, cacheName())); |
| } |
| } |
| |
| /** |
| * @param keys Expected keys. |
| * @throws Exception If failed. |
| */ |
| protected void checkKeySize(final Collection<String> keys) throws Exception { |
| if (nearEnabled()) |
| assertEquals("Invalid key size: " + jcache().localSize(ALL), |
| keys.size(), jcache().localSize(ALL)); |
| else { |
| for (int i = 0; i < gridCount(); i++) |
| executeOnLocalOrRemoteJvm(i, new CheckKeySizeTask(keys, cacheName())); |
| } |
| } |
| |
| /** |
| * @param exp Expected value. |
| * @param key Key. |
| * @throws Exception If failed. |
| */ |
| private void checkContainsKey(boolean exp, Object key) throws Exception { |
| if (nearEnabled()) { |
| assertEquals(exp, jcache().containsKey(key)); |
| assertEquals(exp, (boolean)jcache().containsKeyAsync(key).get()); |
| } |
| else { |
| boolean contains = false; |
| |
| for (int i = 0; i < gridCount(); i++) |
| if (containsKey(jcache(i), key)) { |
| contains = true; |
| |
| break; |
| } |
| |
| assertEquals("Key: " + key, exp, contains); |
| } |
| } |
| |
| /** |
| * @param key Key. |
| * @return Ignite instance of the primary node for the specified key. |
| */ |
| protected Ignite primaryIgnite(String key) { |
| ClusterNode node = grid(0).affinity(cacheName()).mapKeyToNode(key); |
| |
| if (node == null) |
| throw new IgniteException("Failed to find primary node."); |
| |
| UUID nodeId = node.id(); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| if (grid(i).localNode().id().equals(nodeId)) |
| return ignite(i); |
| } |
| |
| throw new IgniteException("Failed to find primary node."); |
| } |
| |
| /** |
| * @param key Key. |
| * @return Cache. |
| */ |
| protected IgniteCache<String, Integer> primaryCache(String key) { |
| return primaryIgnite(key).cache(cacheName()); |
| } |
| |
| /** |
| * @param gridIdx Grid index. |
| * @param cnt Keys count. |
| * @param startFrom Key value to start. |
| * @return Collection of keys for which given cache is primary. |
| */ |
| protected List<String> primaryKeysForCache(int gridIdx, int cnt, int startFrom) { |
| if (gridIdx == CLIENT_NODE_IDX || gridIdx == CLIENT_NEAR_ONLY_IDX) |
| return primaryKeysForCache0(serverNodeCache(), cnt, 1); |
| |
| return primaryKeysForCache0(jcache(gridIdx), cnt, 1); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param cnt Keys count. |
| * @return Collection of keys for which given cache is primary. |
| */ |
| protected List<String> primaryKeysForCache0(IgniteCache cache, int cnt, int startFrom) { |
| return executeOnLocalOrRemoteJvm(cache, new CheckPrimaryKeysTask(startFrom, cnt)); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param cnt Keys count. |
| * @return Collection of keys for which given cache is primary. |
| */ |
| protected List<Object> primaryTestObjectKeysForCache(IgniteCache cache, int cnt) { |
| return primaryTestObjectKeysForCache(cache, cnt, 1); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param cnt Keys count. |
| * @return Collection of keys for which given cache is primary. |
| */ |
| protected List<Object> primaryTestObjectKeysForCache(IgniteCache cache, int cnt, int startFrom) { |
| return executeOnLocalOrRemoteJvm(cache, new CheckPrimaryTestObjectKeysTask(startFrom, cnt, dataMode)); |
| } |
| |
| /** |
| * @param cnt Keys count. |
| * @return Collection of keys for which given cache is primary. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected List<String> primaryKeysForCache(int cnt) |
| throws IgniteCheckedException { |
| return primaryKeysForCache(testedNodeIdx, cnt, 1); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param key Entry key. |
| * @return Pair [ttl, expireTime]; both values null if entry not found |
| */ |
| protected IgnitePair<Long> entryTtl(IgniteCache cache, String key) { |
| return executeOnLocalOrRemoteJvm(cache, new EntryTtlTask(key)); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testIterator() throws Exception { |
| IgniteCache<Integer, Integer> cache = grid(0).cache(cacheName()); |
| |
| final int KEYS = 1000; |
| |
| for (int i = 0; i < KEYS; i++) |
| cache.put(i, i); |
| |
| // Try to initialize readers in case when near cache is enabled. |
| for (int i = 0; i < gridCount(); i++) { |
| cache = grid(i).cache(cacheName()); |
| |
| for (int k = 0; k < KEYS; k++) |
| assertEquals((Object)k, cache.get(k)); |
| } |
| |
| int cnt = 0; |
| |
| for (Cache.Entry e : cache) |
| cnt++; |
| |
| assertEquals(KEYS, cnt); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testIgniteCacheIterator() throws Exception { |
| IgniteCache<String, Integer> cache = jcache(0); |
| |
| Iterator<Cache.Entry<String, Integer>> it = cache.iterator(); |
| |
| boolean hasNext = it.hasNext(); |
| |
| if (hasNext) |
| assertFalse("Cache has value: " + it.next(), hasNext); |
| |
| final int SIZE = 10_000; |
| |
| Map<String, Integer> entries = new HashMap<>(); |
| |
| Map<String, Integer> putMap = new HashMap<>(); |
| |
| for (int i = 0; i < SIZE; ++i) { |
| String key = Integer.toString(i); |
| |
| putMap.put(key, i); |
| |
| entries.put(key, i); |
| |
| if (putMap.size() == 500) { |
| cache.putAll(putMap); |
| |
| info("Puts finished: " + (i + 1)); |
| |
| putMap.clear(); |
| } |
| } |
| |
| cache.putAll(putMap); |
| |
| checkIteratorHasNext(); |
| |
| checkIteratorCache(entries); |
| |
| checkIteratorRemove(cache, entries); |
| |
| checkIteratorEmpty(cache); |
| } |
| |
| /** |
| * If hasNext() is called repeatedly, it should return the same result. |
| */ |
| private void checkIteratorHasNext() { |
| Iterator<Cache.Entry<Object, Object>> iter = jcache(0).iterator(); |
| |
| assertEquals(iter.hasNext(), iter.hasNext()); |
| |
| while (iter.hasNext()) |
| iter.next(); |
| |
| assertFalse(iter.hasNext()); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param entries Expected entries in the cache. |
| */ |
| private void checkIteratorRemove(IgniteCache<String, Integer> cache, Map<String, Integer> entries) { |
| // Check that we can remove element. |
| String rmvKey = Integer.toString(5); |
| |
| removeCacheIterator(cache, rmvKey); |
| |
| entries.remove(rmvKey); |
| |
| assertFalse(cache.containsKey(rmvKey)); |
| assertNull(cache.get(rmvKey)); |
| |
| checkIteratorCache(entries); |
| |
| // Check that we cannot call Iterator.remove() without next(). |
| final Iterator<Cache.Entry<Object, Object>> iter = jcache(0).iterator(); |
| |
| assertTrue(iter.hasNext()); |
| |
| iter.next(); |
| |
| iter.remove(); |
| |
| GridTestUtils.assertThrows(log, new Callable<Object>() { |
| @Override public Void call() throws Exception { |
| iter.remove(); |
| |
| return null; |
| } |
| }, IllegalStateException.class, null); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param key Key to remove. |
| */ |
| private void removeCacheIterator(IgniteCache<String, Integer> cache, String key) { |
| Iterator<Cache.Entry<String, Integer>> iter = cache.iterator(); |
| |
| int delCnt = 0; |
| |
| while (iter.hasNext()) { |
| Cache.Entry<String, Integer> cur = iter.next(); |
| |
| if (cur.getKey().equals(key)) { |
| iter.remove(); |
| |
| delCnt++; |
| } |
| } |
| |
| assertEquals(1, delCnt); |
| } |
| |
| /** |
| * @param entries Expected entries in the cache. |
| */ |
| private void checkIteratorCache(Map<String, Integer> entries) { |
| for (int i = 0; i < gridCount(); ++i) |
| checkIteratorCache(jcache(i), entries); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param entries Expected entries in the cache. |
| */ |
| private void checkIteratorCache(IgniteCache cache, Map<String, Integer> entries) { |
| Iterator<Cache.Entry<String, Integer>> iter = cache.iterator(); |
| |
| int cnt = 0; |
| |
| while (iter.hasNext()) { |
| Cache.Entry<String, Integer> cur = iter.next(); |
| |
| assertTrue(entries.containsKey(cur.getKey())); |
| assertEquals(entries.get(cur.getKey()), cur.getValue()); |
| |
| cnt++; |
| } |
| |
| assertEquals(entries.size(), cnt); |
| } |
| |
| /** |
| * Checks iterators are cleared. |
| */ |
| private void checkIteratorsCleared() { |
| for (int j = 0; j < gridCount(); j++) |
| executeOnLocalOrRemoteJvm(j, new CheckIteratorTask(cacheName())); |
| } |
| |
| /** |
| * Checks iterators are cleared after using. |
| * |
| * @param cache Cache. |
| * @throws Exception If failed. |
| */ |
| private void checkIteratorEmpty(IgniteCache<String, Integer> cache) throws Exception { |
| int cnt = 5; |
| |
| for (int i = 0; i < cnt; ++i) { |
| Iterator<Cache.Entry<String, Integer>> iter = cache.iterator(); |
| |
| iter.next(); |
| |
| assert iter.hasNext(); |
| } |
| |
| System.gc(); |
| |
| for (int i = 0; i < 10; i++) { |
| try { |
| cache.size(); // Trigger weak queue poll. |
| |
| checkIteratorsCleared(); |
| } |
| catch (AssertionError e) { |
| if (i == 9) |
| throw e; |
| |
| log.info("Set iterators not cleared, will wait"); |
| |
| Thread.sleep(1000); |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLocalClearKey() throws Exception { |
| addKeys(); |
| |
| String keyToRmv = "key" + 25; |
| |
| Ignite g = primaryIgnite(keyToRmv); |
| |
| g.<String, Integer>cache(cacheName()).localClear(keyToRmv); |
| |
| checkLocalRemovedKey(keyToRmv); |
| |
| g.<String, Integer>cache(cacheName()).put(keyToRmv, 1); |
| |
| String keyToEvict = "key" + 30; |
| |
| g = primaryIgnite(keyToEvict); |
| |
| g.<String, Integer>cache(cacheName()).localEvict(Collections.singleton(keyToEvict)); |
| |
| g.<String, Integer>cache(cacheName()).localClear(keyToEvict); |
| |
| checkLocalRemovedKey(keyToEvict); |
| } |
| |
| /** |
| * @param keyToRmv Removed key. |
| */ |
| protected void checkLocalRemovedKey(String keyToRmv) { |
| for (int i = 0; i < 500; ++i) { |
| String key = "key" + i; |
| |
| boolean found = primaryIgnite(key).cache(cacheName()).localPeek(key) != null; |
| |
| if (keyToRmv.equals(key)) { |
| Collection<ClusterNode> nodes = grid(0).affinity(cacheName()).mapKeyToPrimaryAndBackups(key); |
| |
| for (int j = 0; j < gridCount(); ++j) { |
| if (nodes.contains(grid(j).localNode()) && grid(j) != primaryIgnite(key)) |
| assertTrue("Not found on backup removed key ", grid(j).cache(cacheName()).localPeek(key) != null); |
| } |
| |
| assertFalse("Found removed key " + key, found); |
| } |
| else |
| assertTrue("Not found key " + key, found); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLocalClearKeys() throws Exception { |
| Map<String, List<String>> keys = addKeys(); |
| |
| Ignite g = grid(0); |
| |
| Set<String> keysToRmv = new HashSet<>(); |
| |
| for (int i = 0; i < gridCount(); ++i) { |
| List<String> gridKeys = keys.get(grid(i).name()); |
| |
| if (gridKeys.size() > 2) { |
| keysToRmv.add(gridKeys.get(0)); |
| |
| keysToRmv.add(gridKeys.get(1)); |
| |
| g = grid(i); |
| |
| break; |
| } |
| } |
| |
| assert keysToRmv.size() > 1; |
| |
| info("Will clear keys on node: " + g.cluster().localNode().id()); |
| |
| g.<String, Integer>cache(cacheName()).localClearAll(keysToRmv); |
| |
| for (int i = 0; i < 500; ++i) { |
| String key = "key" + i; |
| |
| Ignite ignite = primaryIgnite(key); |
| |
| boolean found = ignite.cache(cacheName()).localPeek(key) != null; |
| |
| if (keysToRmv.contains(key)) |
| assertFalse("Found removed key [key=" + key + ", node=" + ignite.cluster().localNode().id() + ']', |
| found); |
| else |
| assertTrue("Not found key " + key, found); |
| } |
| } |
| |
| /** |
| * Add 500 keys to cache only on primaries nodes. |
| * |
| * @return Map grid's name to its primary keys. |
| */ |
| protected Map<String, List<String>> addKeys() { |
| // Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries |
| // because some of them were blocked due to having readers. |
| Map<String, List<String>> keys = new HashMap<>(); |
| |
| for (int i = 0; i < gridCount(); ++i) |
| keys.put(grid(i).name(), new ArrayList<String>()); |
| |
| for (int i = 0; i < 500; ++i) { |
| String key = "key" + i; |
| |
| Ignite g = primaryIgnite(key); |
| |
| g.cache(cacheName()).put(key, "value" + i); |
| |
| keys.get(g.name()).add(key); |
| } |
| |
| return keys; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGlobalClearKey() throws Exception { |
| testGlobalClearKey(false, false, Arrays.asList("key25")); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGlobalClearKeyAsyncOld() throws Exception { |
| testGlobalClearKey(true, true, Arrays.asList("key25")); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGlobalClearKeyAsync() throws Exception { |
| testGlobalClearKey(true, false, Arrays.asList("key25")); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGlobalClearKeys() throws Exception { |
| testGlobalClearKey(false, false, Arrays.asList("key25", "key100", "key150")); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGlobalClearKeysAsyncOld() throws Exception { |
| testGlobalClearKey(true, true, Arrays.asList("key25", "key100", "key150")); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGlobalClearKeysAsync() throws Exception { |
| testGlobalClearKey(true, false, Arrays.asList("key25", "key100", "key150")); |
| } |
| |
| /** |
| * @param async If {@code true} uses async method. |
| * @param oldAsync Use old async API. |
| * @param keysToRmv Keys to remove. |
| * @throws Exception If failed. |
| */ |
| protected void testGlobalClearKey(boolean async, boolean oldAsync, Collection<String> keysToRmv) throws Exception { |
| // Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries |
| // because some of them were blocked due to having readers. |
| for (int i = 0; i < 500; ++i) { |
| String key = "key" + i; |
| |
| Ignite g = primaryIgnite(key); |
| |
| g.cache(cacheName()).put(key, "value" + i); |
| } |
| |
| if (async) { |
| if (oldAsync) { |
| IgniteCache asyncCache = jcache().withAsync(); |
| |
| if (keysToRmv.size() == 1) |
| asyncCache.clear(F.first(keysToRmv)); |
| else |
| asyncCache.clearAll(new HashSet<>(keysToRmv)); |
| |
| asyncCache.future().get(); |
| } |
| else { |
| if (keysToRmv.size() == 1) |
| jcache().clearAsync(F.first(keysToRmv)).get(); |
| else |
| jcache().clearAllAsync(new HashSet<>(keysToRmv)).get(); |
| } |
| } |
| else { |
| if (keysToRmv.size() == 1) |
| jcache().clear(F.first(keysToRmv)); |
| else |
| jcache().clearAll(new HashSet<>(keysToRmv)); |
| } |
| |
| for (int i = 0; i < 500; ++i) { |
| String key = "key" + i; |
| |
| boolean found = false; |
| |
| for (int j = 0; j < gridCount(); j++) { |
| if (jcache(j).localPeek(key) != null) |
| found = true; |
| } |
| |
| if (!keysToRmv.contains(key)) |
| assertTrue("Not found key " + key, found); |
| else |
| assertFalse("Found removed key " + key, found); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testWithSkipStore() throws Exception { |
| if (!storeEnabled()) |
| return; |
| |
| IgniteCache<String, Integer> cache = grid(0).cache(cacheName()); |
| |
| IgniteCache<String, Integer> cacheSkipStore = cache.withSkipStore(); |
| |
| List<String> keys = primaryKeysForCache(0, 10, 1); |
| |
| for (int i = 0; i < keys.size(); ++i) |
| putToStore(keys.get(i), i); |
| |
| assertFalse(cacheSkipStore.iterator().hasNext()); |
| |
| for (String key : keys) { |
| assertNull(cacheSkipStore.get(key)); |
| |
| assertNotNull(cache.get(key)); |
| } |
| |
| for (String key : keys) { |
| cacheSkipStore.remove(key); |
| |
| assertNotNull(cache.get(key)); |
| } |
| |
| cache.removeAll(new HashSet<>(keys)); |
| |
| for (String key : keys) |
| assertNull(cache.get(key)); |
| |
| final int KEYS = 250; |
| |
| // Put/remove data from multiple nodes. |
| |
| keys = new ArrayList<>(KEYS); |
| |
| for (int i = 0; i < KEYS; i++) |
| keys.add("key_" + i); |
| |
| for (int i = 0; i < keys.size(); ++i) |
| cache.put(keys.get(i), i); |
| |
| for (int i = 0; i < keys.size(); ++i) { |
| String key = keys.get(i); |
| |
| assertNotNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertEquals(i, storeStgy.getFromStore(key)); |
| } |
| |
| for (int i = 0; i < keys.size(); ++i) { |
| String key = keys.get(i); |
| |
| Integer val1 = -1; |
| |
| cacheSkipStore.put(key, val1); |
| assertEquals(i, storeStgy.getFromStore(key)); |
| assertEquals(val1, cacheSkipStore.get(key)); |
| |
| Integer val2 = -2; |
| |
| assertEquals(val1, cacheSkipStore.invoke(key, new SetValueProcessor(val2))); |
| assertEquals(i, storeStgy.getFromStore(key)); |
| assertEquals(val2, cacheSkipStore.get(key)); |
| } |
| |
| for (String key : keys) { |
| cacheSkipStore.remove(key); |
| |
| assertNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertTrue(storeStgy.isInStore(key)); |
| } |
| |
| for (String key : keys) { |
| cache.remove(key); |
| |
| assertNull(cacheSkipStore.get(key)); |
| assertNull(cache.get(key)); |
| assertFalse(storeStgy.isInStore(key)); |
| |
| storeStgy.putToStore(key, 0); |
| |
| Integer val = -1; |
| |
| assertNull(cacheSkipStore.invoke(key, new SetValueProcessor(val))); |
| assertEquals(0, storeStgy.getFromStore(key)); |
| assertEquals(val, cacheSkipStore.get(key)); |
| |
| cache.remove(key); |
| |
| storeStgy.putToStore(key, 0); |
| |
| assertTrue(cacheSkipStore.putIfAbsent(key, val)); |
| assertEquals(val, cacheSkipStore.get(key)); |
| assertEquals(0, storeStgy.getFromStore(key)); |
| |
| cache.remove(key); |
| |
| storeStgy.putToStore(key, 0); |
| |
| assertNull(cacheSkipStore.getAndPut(key, val)); |
| assertEquals(val, cacheSkipStore.get(key)); |
| assertEquals(0, storeStgy.getFromStore(key)); |
| |
| cache.remove(key); |
| } |
| |
| assertFalse(cacheSkipStore.iterator().hasNext()); |
| assertTrue(storeStgy.getStoreSize() == 0); |
| assertTrue(cache.size(ALL) == 0); |
| |
| // putAll/removeAll from multiple nodes. |
| |
| Map<String, Integer> data = new LinkedHashMap<>(); |
| |
| for (int i = 0; i < keys.size(); i++) |
| data.put(keys.get(i), i); |
| |
| cacheSkipStore.putAll(data); |
| |
| for (String key : keys) { |
| assertNotNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertFalse(storeStgy.isInStore(key)); |
| } |
| |
| cache.putAll(data); |
| |
| for (String key : keys) { |
| assertNotNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertTrue(storeStgy.isInStore(key)); |
| } |
| |
| cacheSkipStore.removeAll(data.keySet()); |
| |
| for (String key : keys) { |
| assertNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertTrue(storeStgy.isInStore(key)); |
| } |
| |
| cacheSkipStore.putAll(data); |
| |
| for (String key : keys) { |
| assertNotNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertTrue(storeStgy.isInStore(key)); |
| } |
| |
| cacheSkipStore.removeAll(data.keySet()); |
| |
| for (String key : keys) { |
| assertNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertTrue(storeStgy.isInStore(key)); |
| } |
| |
| cache.removeAll(data.keySet()); |
| |
| for (String key : keys) { |
| assertNull(cacheSkipStore.get(key)); |
| assertNull(cache.get(key)); |
| assertFalse(storeStgy.isInStore(key)); |
| } |
| |
| assertTrue(storeStgy.getStoreSize() == 0); |
| |
| // Miscellaneous checks. |
| |
| String newKey = "New key"; |
| |
| assertFalse(storeStgy.isInStore(newKey)); |
| |
| cacheSkipStore.put(newKey, 1); |
| |
| assertFalse(storeStgy.isInStore(newKey)); |
| |
| cache.put(newKey, 1); |
| |
| assertTrue(storeStgy.isInStore(newKey)); |
| |
| Iterator<Cache.Entry<String, Integer>> it = cacheSkipStore.iterator(); |
| |
| assertTrue(it.hasNext()); |
| |
| Cache.Entry<String, Integer> entry = it.next(); |
| |
| String rmvKey = entry.getKey(); |
| |
| assertTrue(storeStgy.isInStore(rmvKey)); |
| |
| it.remove(); |
| |
| assertNull(cacheSkipStore.get(rmvKey)); |
| |
| assertTrue(storeStgy.isInStore(rmvKey)); |
| |
| assertTrue(cache.size(ALL) == 0); |
| assertTrue(cacheSkipStore.size(ALL) == 0); |
| |
| cache.remove(rmvKey); |
| |
| assertTrue(storeStgy.getStoreSize() == 0); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testWithSkipStoreRemoveAll() throws Exception { |
| if (atomicityMode() == TRANSACTIONAL || (atomicityMode() == ATOMIC && nearEnabled())) // TODO IGNITE-373. |
| return; |
| |
| if (!storeEnabled()) |
| return; |
| |
| IgniteCache<String, Integer> cache = grid(0).cache(cacheName()); |
| |
| IgniteCache<String, Integer> cacheSkipStore = cache.withSkipStore(); |
| |
| Map<String, Integer> data = new HashMap<>(); |
| |
| for (int i = 0; i < 100; i++) |
| data.put("key_" + i, i); |
| |
| cache.putAll(data); |
| |
| for (String key : data.keySet()) { |
| assertNotNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertTrue(storeStgy.isInStore(key)); |
| } |
| |
| cacheSkipStore.removeAll(); |
| |
| for (String key : data.keySet()) { |
| assertNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertTrue(storeStgy.isInStore(key)); |
| } |
| |
| cache.removeAll(); |
| |
| for (String key : data.keySet()) { |
| assertNull(cacheSkipStore.get(key)); |
| assertNull(cache.get(key)); |
| assertFalse(storeStgy.isInStore(key)); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testWithSkipStoreTx() throws Exception { |
| if (txShouldBeUsed() && storeEnabled()) { |
| IgniteCache<String, Integer> cache = grid(0).cache(cacheName()); |
| |
| IgniteCache<String, Integer> cacheSkipStore = cache.withSkipStore(); |
| |
| final int KEYS = 250; |
| |
| // Put/remove data from multiple nodes. |
| |
| List<String> keys = new ArrayList<>(KEYS); |
| |
| for (int i = 0; i < KEYS; i++) |
| keys.add("key_" + i); |
| |
| Map<String, Integer> data = new LinkedHashMap<>(); |
| |
| for (int i = 0; i < keys.size(); i++) |
| data.put(keys.get(i), i); |
| |
| checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, OPTIMISTIC, READ_COMMITTED); |
| |
| checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, OPTIMISTIC, REPEATABLE_READ); |
| |
| checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, OPTIMISTIC, SERIALIZABLE); |
| |
| checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, PESSIMISTIC, READ_COMMITTED); |
| |
| checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, PESSIMISTIC, REPEATABLE_READ); |
| |
| checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, PESSIMISTIC, SERIALIZABLE); |
| } |
| } |
| |
| /** |
| * @param cache Cache instance. |
| * @param cacheSkipStore Cache skip store projection. |
| * @param data Data set. |
| * @param keys Keys list. |
| * @param txConcurrency Concurrency mode. |
| * @param txIsolation Isolation mode. |
| * @throws Exception If failed. |
| */ |
| private void checkSkipStoreWithTransaction(IgniteCache<String, Integer> cache, |
| IgniteCache<String, Integer> cacheSkipStore, |
| Map<String, Integer> data, |
| List<String> keys, |
| TransactionConcurrency txConcurrency, |
| TransactionIsolation txIsolation) |
| throws Exception { |
| info("Test tx skip store [concurrency=" + txConcurrency + ", isolation=" + txIsolation + ']'); |
| |
| cache.removeAll(data.keySet()); |
| checkEmpty(cache, cacheSkipStore); |
| |
| IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); |
| |
| Integer val = -1; |
| |
| // Several put check. |
| try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { |
| for (String key : keys) |
| cacheSkipStore.put(key, val); |
| |
| for (String key : keys) { |
| assertEquals(val, cacheSkipStore.get(key)); |
| assertEquals(val, cache.get(key)); |
| assertFalse(storeStgy.isInStore(key)); |
| } |
| |
| tx.commit(); |
| } |
| |
| for (String key : keys) { |
| assertEquals(val, cacheSkipStore.get(key)); |
| assertEquals(val, cache.get(key)); |
| assertFalse(storeStgy.isInStore(key)); |
| } |
| |
| assertEquals(0, storeStgy.getStoreSize()); |
| |
| // cacheSkipStore putAll(..)/removeAll(..) check. |
| try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { |
| cacheSkipStore.putAll(data); |
| |
| tx.commit(); |
| } |
| |
| for (String key : keys) { |
| val = data.get(key); |
| |
| assertEquals(val, cacheSkipStore.get(key)); |
| assertEquals(val, cache.get(key)); |
| assertFalse(storeStgy.isInStore(key)); |
| } |
| |
| storeStgy.putAllToStore(data); |
| |
| try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { |
| cacheSkipStore.removeAll(data.keySet()); |
| |
| tx.commit(); |
| } |
| |
| for (String key : keys) { |
| assertNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertTrue(storeStgy.isInStore(key)); |
| |
| cache.remove(key); |
| } |
| |
| assertTrue(storeStgy.getStoreSize() == 0); |
| |
| // cache putAll(..)/removeAll(..) check. |
| try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { |
| cache.putAll(data); |
| |
| for (String key : keys) { |
| assertNotNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertFalse(storeStgy.isInStore(key)); |
| } |
| |
| cache.removeAll(data.keySet()); |
| |
| for (String key : keys) { |
| assertNull(cacheSkipStore.get(key)); |
| assertNull(cache.get(key)); |
| assertFalse(storeStgy.isInStore(key)); |
| } |
| |
| tx.commit(); |
| } |
| |
| assertTrue(storeStgy.getStoreSize() == 0); |
| |
| // putAll(..) from both cacheSkipStore and cache. |
| try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { |
| Map<String, Integer> subMap = new HashMap<>(); |
| |
| for (int i = 0; i < keys.size() / 2; i++) |
| subMap.put(keys.get(i), i); |
| |
| cacheSkipStore.putAll(subMap); |
| |
| subMap.clear(); |
| |
| for (int i = keys.size() / 2; i < keys.size(); i++) |
| subMap.put(keys.get(i), i); |
| |
| cache.putAll(subMap); |
| |
| for (String key : keys) { |
| assertNotNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertFalse(storeStgy.isInStore(key)); |
| } |
| |
| tx.commit(); |
| } |
| |
| for (int i = 0; i < keys.size() / 2; i++) { |
| String key = keys.get(i); |
| |
| assertNotNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertFalse(storeStgy.isInStore(key)); |
| } |
| |
| for (int i = keys.size() / 2; i < keys.size(); i++) { |
| String key = keys.get(i); |
| |
| assertNotNull(cacheSkipStore.get(key)); |
| assertNotNull(cache.get(key)); |
| assertTrue(storeStgy.isInStore(key)); |
| } |
| |
| cache.removeAll(data.keySet()); |
| |
| for (String key : keys) { |
| assertNull(cacheSkipStore.get(key)); |
| assertNull(cache.get(key)); |
| assertFalse(storeStgy.isInStore(key)); |
| } |
| |
| // Check that read-through is disabled when cacheSkipStore is used. |
| for (int i = 0; i < keys.size(); i++) |
| putToStore(keys.get(i), i); |
| |
| assertTrue(cacheSkipStore.size(ALL) == 0); |
| assertTrue(cache.size(ALL) == 0); |
| assertTrue(storeStgy.getStoreSize() != 0); |
| |
| try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { |
| assertTrue(cacheSkipStore.getAll(data.keySet()).isEmpty()); |
| |
| for (String key : keys) { |
| assertNull(cacheSkipStore.get(key)); |
| |
| if (txIsolation == READ_COMMITTED) { |
| assertNotNull(cache.get(key)); |
| assertNotNull(cacheSkipStore.get(key)); |
| } |
| } |
| |
| tx.commit(); |
| } |
| |
| cache.removeAll(data.keySet()); |
| |
| val = -1; |
| |
| try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { |
| for (String key : data.keySet()) { |
| storeStgy.putToStore(key, 0); |
| |
| assertNull(cacheSkipStore.invoke(key, new SetValueProcessor(val))); |
| } |
| |
| tx.commit(); |
| } |
| |
| for (String key : data.keySet()) { |
| assertEquals(0, storeStgy.getFromStore(key)); |
| |
| assertEquals(val, cacheSkipStore.get(key)); |
| assertEquals(val, cache.get(key)); |
| } |
| |
| cache.removeAll(data.keySet()); |
| |
| try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { |
| for (String key : data.keySet()) { |
| storeStgy.putToStore(key, 0); |
| |
| assertTrue(cacheSkipStore.putIfAbsent(key, val)); |
| } |
| |
| tx.commit(); |
| } |
| |
| for (String key : data.keySet()) { |
| assertEquals(0, storeStgy.getFromStore(key)); |
| |
| assertEquals(val, cacheSkipStore.get(key)); |
| assertEquals(val, cache.get(key)); |
| } |
| |
| cache.removeAll(data.keySet()); |
| |
| try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { |
| for (String key : data.keySet()) { |
| storeStgy.putToStore(key, 0); |
| |
| assertNull(cacheSkipStore.getAndPut(key, val)); |
| } |
| |
| tx.commit(); |
| } |
| |
| for (String key : data.keySet()) { |
| assertEquals(0, storeStgy.getFromStore(key)); |
| |
| assertEquals(val, cacheSkipStore.get(key)); |
| assertEquals(val, cache.get(key)); |
| } |
| |
| cache.removeAll(data.keySet()); |
| checkEmpty(cache, cacheSkipStore); |
| } |
| |
| /** |
| * @param cache Cache instance. |
| * @param cacheSkipStore Cache skip store projection. |
| * @throws Exception If failed. |
| */ |
| private void checkEmpty(IgniteCache<String, Integer> cache, IgniteCache<String, Integer> cacheSkipStore) |
| throws Exception { |
| assertTrue(cache.size(ALL) == 0); |
| assertTrue(cacheSkipStore.size(ALL) == 0); |
| assertTrue(storeStgy.getStoreSize() == 0); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-11850") |
| @Test |
| public void testGetOutTx() throws Exception { |
| checkGetOutTx(false, false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-11850") |
| @Test |
| public void testGetOutTxAsyncOld() throws Exception { |
| checkGetOutTx(true, true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-11850") |
| @Test |
| public void testGetOutTxAsync() throws Exception { |
| checkGetOutTx(true, false); |
| } |
| |
| /** |
| * @param async Use async API. |
| * @param oldAsync Uase old style async API. |
| * @throws Exception If failed. |
| */ |
| private void checkGetOutTx(boolean async, boolean oldAsync) throws Exception { |
| final AtomicInteger lockEvtCnt = new AtomicInteger(); |
| |
| IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| lockEvtCnt.incrementAndGet(); |
| |
| return true; |
| } |
| }; |
| |
| try { |
| IgniteCache<String, Integer> cache = grid(0).cache(cacheName()).withAllowAtomicOpsInTx(); |
| |
| List<String> keys = primaryKeysForCache(0, 2, 1); |
| |
| assertEquals(2, keys.size()); |
| |
| cache.put(keys.get(0), 0); |
| cache.put(keys.get(1), 1); |
| |
| grid(0).events().localListen(lsnr, EVT_CACHE_OBJECT_LOCKED, EVT_CACHE_OBJECT_UNLOCKED); |
| |
| if (async && oldAsync) |
| cache = cache.withAsync(); |
| |
| try (Transaction tx = transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { |
| Integer val0; |
| |
| if (async) { |
| if (oldAsync) { |
| cache.get(keys.get(0)); |
| |
| val0 = cache.<Integer>future().get(); |
| } |
| else |
| val0 = cache.getAsync(keys.get(0)).get(); |
| } |
| else |
| val0 = cache.get(keys.get(0)); |
| |
| assertEquals(0, val0.intValue()); |
| |
| Map<String, Integer> allOutTx; |
| |
| if (async) { |
| if (oldAsync) { |
| cache.getAllOutTx(F.asSet(keys.get(1))); |
| |
| allOutTx = cache.<Map<String, Integer>>future().get(); |
| } |
| else |
| allOutTx = cache.getAllOutTxAsync(F.asSet(keys.get(1))).get(); |
| } |
| else |
| allOutTx = cache.getAllOutTx(F.asSet(keys.get(1))); |
| |
| assertEquals(1, allOutTx.size()); |
| |
| assertTrue(allOutTx.containsKey(keys.get(1))); |
| |
| assertEquals(1, allOutTx.get(keys.get(1)).intValue()); |
| } |
| |
| assertTrue(GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| info("Lock event count: " + lockEvtCnt.get()); |
| if (atomicityMode() == ATOMIC) |
| return lockEvtCnt.get() == 0; |
| |
| if (cacheMode() == PARTITIONED && nearEnabled()) { |
| if (!grid(0).configuration().isClientMode()) |
| return lockEvtCnt.get() == 4; |
| } |
| |
| return lockEvtCnt.get() == 2; |
| } |
| }, 15000)); |
| } |
| finally { |
| grid(0).events().stopLocalListen(lsnr, EVT_CACHE_OBJECT_LOCKED, EVT_CACHE_OBJECT_UNLOCKED); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInvokeException() throws Exception { |
| final IgniteCache cache = jcache(); |
| |
| final IgniteFuture fut = cache.invokeAsync("key2", ERR_PROCESSOR); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| fut.chain(new IgniteClosure<IgniteFuture, Object>() { |
| @Override public Object apply(IgniteFuture o) { |
| return o.get(); |
| } |
| }); |
| |
| fut.get(); |
| |
| return null; |
| } |
| }, EntryProcessorException.class, null); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLockInsideTransaction() throws Exception { |
| if (txEnabled()) { |
| GridTestUtils.assertThrows( |
| log, |
| new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| try (Transaction tx = ignite(0).transactions().txStart()) { |
| jcache(0).lock("key").lock(); |
| } |
| |
| return null; |
| } |
| }, |
| CacheException.class, |
| "Explicit lock can't be acquired within a transaction." |
| ); |
| |
| GridTestUtils.assertThrows( |
| log, |
| new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| try (Transaction tx = ignite(0).transactions().txStart()) { |
| jcache(0).lockAll(Arrays.asList("key1", "key2")).lock(); |
| } |
| |
| return null; |
| } |
| }, |
| CacheException.class, |
| "Explicit lock can't be acquired within a transaction." |
| ); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testContinuousQuery() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| final AtomicInteger updCnt = new AtomicInteger(); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Object, Object>() { |
| @Override public boolean apply(Object key, Object val) { |
| return valueOf(key) >= 3; |
| } |
| })); |
| |
| qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { |
| @Override public void onUpdated( |
| Iterable<CacheEntryEvent<? extends Object, ? extends Object>> evts) throws CacheEntryListenerException { |
| for (CacheEntryEvent<? extends Object, ? extends Object> evt : evts) { |
| int v = valueOf(evt.getKey()); |
| |
| // Check filter. |
| assertTrue("v=" + v, v >= 10 && v < 15); |
| |
| updCnt.incrementAndGet(); |
| } |
| } |
| }); |
| |
| qry.setRemoteFilter(new TestCacheEntryEventSerializableFilter()); |
| |
| IgniteCache<Object, Object> cache = jcache(); |
| |
| for (int i = 0; i < 10; i++) |
| cache.put(key(i), value(i)); |
| |
| try (QueryCursor<Cache.Entry<Object, Object>> cur = cache.query(qry)) { |
| int cnt = 0; |
| |
| for (Cache.Entry<Object, Object> e : cur) { |
| cnt++; |
| |
| int val = valueOf(e.getKey()); |
| |
| assertTrue("v=" + val, val >= 3); |
| } |
| |
| assertEquals(7, cnt); |
| |
| for (int i = 10; i < 20; i++) |
| cache.put(key(i), value(i)); |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicateX() { |
| @Override public boolean applyx() throws IgniteCheckedException { |
| return updCnt.get() == 5; |
| } |
| }, 30_000); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGetEntry() throws Exception { |
| runInAllDataModes(new TestRunnable() { |
| @Override public void run() throws Exception { |
| Map<String, Integer> vals = new HashMap<>(); |
| |
| for (int i = 0; i < CNT; i++) |
| vals.put("key" + i, i); |
| |
| jcache(0).putAll(vals); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| assertEquals(0, jcache(i).getEntry("key0").getValue()); |
| assertEquals(0, jcache(i).getEntryAsync("key0").get().getValue()); |
| |
| assertTrue( |
| F.transform( |
| jcache(i).getEntries(vals.keySet()), |
| new IgniteClosure<CacheEntry<Object, Object>, Object>() { |
| @Override public Object apply(CacheEntry<Object, Object> entry) { |
| return entry.getValue(); |
| } |
| }).containsAll(vals.values())); |
| |
| assertTrue( |
| F.transform( |
| jcache(i).getEntriesAsync(vals.keySet()).get(), |
| new IgniteClosure<CacheEntry<Object, Object>, Object>() { |
| @Override public Object apply(CacheEntry<Object, Object> entry) { |
| return entry.getValue(); |
| } |
| }).containsAll(vals.values())); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * Sets given value, returns old value. |
| */ |
| public static final class SetValueProcessor implements EntryProcessor<String, Integer, Integer> { |
| /** */ |
| private Integer newVal; |
| |
| /** |
| * @param newVal New value to set. |
| */ |
| SetValueProcessor(Integer newVal) { |
| this.newVal = newVal; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Integer process(MutableEntry<String, Integer> entry, |
| Object... arguments) throws EntryProcessorException { |
| Integer val = entry.getValue(); |
| |
| entry.setValue(newVal); |
| |
| return val; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class RemoveEntryProcessor implements EntryProcessor<Object, Object, Object>, Serializable { |
| /** {@inheritDoc} */ |
| @Override public Object process(MutableEntry<Object, Object> e, Object... args) { |
| assertNotNull(e.getKey()); |
| |
| Object old = e.getValue(); |
| |
| e.remove(); |
| |
| return old; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class IncrementEntryProcessor implements EntryProcessor<Object, Object, Object>, Serializable { |
| /** {@inheritDoc} */ |
| @Override public Object process(MutableEntry<Object, Object> e, Object... args) { |
| assert !F.isEmpty(args); |
| |
| DataMode mode = (DataMode)args[0]; |
| |
| assertNotNull(e.getKey()); |
| |
| Object old = e.getValue(); |
| |
| e.setValue(old == null ? value(1, mode) : value(valueOf(old) + 1, mode)); |
| |
| return old; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class CheckEntriesTask extends TestIgniteIdxRunnable { |
| /** Keys. */ |
| private final Collection<String> keys; |
| |
| /** */ |
| private String cacheName; |
| |
| /** |
| * @param keys Keys. |
| * @param s Cache. |
| */ |
| CheckEntriesTask(Collection<String> keys, String s) { |
| this.keys = keys; |
| cacheName = s; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run(int idx) throws Exception { |
| GridCacheContext<String, Integer> ctx = |
| ((IgniteKernal)ignite).<String, Integer>internalCache(cacheName).context(); |
| |
| int size = 0; |
| |
| if (ctx.isNear()) |
| ctx = ctx.near().dht().context(); |
| |
| for (String key : keys) { |
| if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) { |
| GridCacheEntryEx e = ctx.cache().entryEx(key); |
| |
| assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']'; |
| assert !e.deleted() : "Entry is deleted: " + e; |
| |
| size++; |
| |
| e.touch(); |
| } |
| } |
| |
| assertEquals("Incorrect size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL)); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class CheckCacheSizeTask extends TestIgniteIdxRunnable { |
| /** */ |
| private final Map<String, Integer> map; |
| |
| /** */ |
| private String cacheName; |
| |
| /** |
| * @param map Map. |
| * @param cacheName Cache name. |
| */ |
| CheckCacheSizeTask(Map<String, Integer> map, String cacheName) { |
| this.map = map; |
| |
| this.cacheName = cacheName; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run(int idx) throws Exception { |
| GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(cacheName).context(); |
| |
| int size = 0; |
| |
| for (String key : map.keySet()) |
| if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) |
| size++; |
| |
| assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL)); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class CheckPrimaryKeysTask implements TestCacheCallable<String, Integer, List<String>> { |
| /** Start from. */ |
| private final int startFrom; |
| |
| /** Count. */ |
| private final int cnt; |
| |
| /** |
| * @param startFrom Start from. |
| * @param cnt Count. |
| */ |
| public CheckPrimaryKeysTask(int startFrom, int cnt) { |
| this.startFrom = startFrom; |
| this.cnt = cnt; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<String> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception { |
| List<String> found = new ArrayList<>(); |
| |
| Affinity<Object> affinity = ignite.affinity(cache.getName()); |
| |
| for (int i = startFrom; i < startFrom + 100_000; i++) { |
| String key = "key" + i; |
| |
| if (affinity.isPrimary(ignite.cluster().localNode(), key)) { |
| found.add(key); |
| |
| if (found.size() == cnt) |
| return found; |
| } |
| } |
| |
| throw new IgniteException("Unable to find " + cnt + " keys as primary for cache."); |
| } |
| } |
| |
| /** */ |
| private static class EntryTtlTask implements TestCacheCallable<String, Integer, IgnitePair<Long>> { |
| /** */ |
| private final String key; |
| |
| /** |
| * @param key Key. |
| */ |
| private EntryTtlTask(String key) { |
| this.key = key; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgnitePair<Long> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception { |
| GridCacheAdapter<?, ?> internalCache = internalCache0(cache); |
| |
| if (internalCache.context().isNear()) |
| internalCache = internalCache.context().near().dht(); |
| |
| GridCacheEntryEx entry = internalCache.entryEx(key); |
| |
| entry.unswap(); |
| |
| if (!entry.hasValue()) { |
| assertEquals(0, entry.ttl()); |
| assertEquals(0, entry.expireTime()); |
| |
| return null; |
| } |
| |
| IgnitePair<Long> pair = new IgnitePair<>(entry.ttl(), entry.expireTime()); |
| |
| if (!entry.isNear()) |
| entry.context().cache().removeEntry(entry); |
| |
| return pair; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class CheckPrimaryTestObjectKeysTask implements TestCacheCallable<Object, Object, List<Object>> { |
| /** Start from. */ |
| private final int startFrom; |
| |
| /** Count. */ |
| private final int cnt; |
| |
| /** */ |
| private final DataMode mode; |
| |
| /** |
| * @param startFrom Start from. |
| * @param cnt Count. |
| */ |
| public CheckPrimaryTestObjectKeysTask(int startFrom, int cnt, DataMode mode) { |
| this.startFrom = startFrom; |
| this.cnt = cnt; |
| this.mode = mode; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<Object> call(Ignite ignite, IgniteCache<Object, Object> cache) throws Exception { |
| List<Object> found = new ArrayList<>(); |
| |
| Affinity<Object> affinity = ignite.affinity(cache.getName()); |
| |
| for (int i = startFrom; i < startFrom + 100_000; i++) { |
| Object key = key(i, mode); |
| |
| if (affinity.isPrimary(ignite.cluster().localNode(), key)) { |
| found.add(key); |
| |
| if (found.size() == cnt) |
| return found; |
| } |
| } |
| |
| throw new IgniteException("Unable to find " + cnt + " keys as primary for cache."); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class CheckIteratorTask extends TestIgniteIdxCallable<Void> { |
| /** */ |
| private String cacheName; |
| |
| /** |
| * @param cacheName Name. |
| */ |
| public CheckIteratorTask(String cacheName) { |
| this.cacheName = cacheName; |
| } |
| |
| /** |
| * @param idx Index. |
| */ |
| @Override public Void call(int idx) throws Exception { |
| GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(cacheName).context(); |
| GridCacheQueryManager queries = ctx.queries(); |
| |
| Map map = GridTestUtils.getFieldValue(queries, GridCacheQueryManager.class, "qryIters"); |
| |
| for (Object obj : map.values()) |
| assertEquals("Iterators not removed for grid " + idx, 0, ((Map)obj).size()); |
| |
| return null; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class RemoveAndReturnNullEntryProcessor implements |
| EntryProcessor<Object, Object, Object>, Serializable { |
| |
| /** {@inheritDoc} */ |
| @Override public Object process(MutableEntry<Object, Object> e, Object... args) { |
| e.remove(); |
| |
| return null; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class CheckEntriesDeletedTask extends TestIgniteIdxRunnable { |
| /** */ |
| private final int cnt; |
| |
| /** */ |
| private String cacheName; |
| |
| /** */ |
| public CheckEntriesDeletedTask(int cnt, String cacheName) { |
| this.cnt = cnt; |
| this.cacheName = cacheName; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run(int idx) throws Exception { |
| for (int i = 0; i < cnt; i++) { |
| String key = String.valueOf(i); |
| |
| GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(cacheName).context(); |
| |
| GridCacheEntryEx entry = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key); |
| |
| if (ignite.affinity(cacheName).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) { |
| assertNotNull(entry); |
| assertTrue(entry.deleted()); |
| } |
| else |
| assertNull(entry); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class CheckKeySizeTask extends TestIgniteIdxRunnable { |
| /** Keys. */ |
| private final Collection<String> keys; |
| |
| /** */ |
| private String cacheName; |
| |
| /** |
| * @param keys Keys. |
| * @param s |
| */ |
| public CheckKeySizeTask(Collection<String> keys, String s) { |
| this.keys = keys; |
| this.cacheName = s; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run(int idx) throws Exception { |
| GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(cacheName).context(); |
| |
| int size = 0; |
| |
| for (String key : keys) |
| if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) |
| size++; |
| |
| assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(cacheName).localSize(ALL)); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class FailedEntryProcessor implements EntryProcessor<Object, Object, Object>, Serializable { |
| /** {@inheritDoc} */ |
| @Override public Object process(MutableEntry<Object, Object> e, Object... args) { |
| throw new EntryProcessorException("Test entry processor exception."); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class TestCacheEntryEventSerializableFilter implements CacheEntryEventSerializableFilter<Object, Object> { |
| /** {@inheritDoc} */ |
| @Override public boolean evaluate( |
| CacheEntryEvent<? extends Object, ? extends Object> evt) throws CacheEntryListenerException { |
| return valueOf(evt.getKey()) < 15; |
| } |
| } |
| |
| } |