| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.near; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Lock; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CachePeekMode; |
| import org.apache.ignite.cache.CacheWriteSynchronizationMode; |
| import org.apache.ignite.cache.affinity.Affinity; |
| import org.apache.ignite.cache.affinity.AffinityFunction; |
| import org.apache.ignite.cache.store.CacheStoreAdapter; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.NearCacheConfiguration; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; |
| import org.apache.ignite.internal.util.typedef.C1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.testframework.MvccFeatureChecker; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.transactions.Transaction; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; |
| |
| /** |
| * Multi node test for near cache. |
| */ |
| public class GridCacheNearMultiNodeSelfTest extends GridCommonAbstractTest { |
| /** Grid count. */ |
| private static final int GRID_CNT = 2; |
| |
| /** */ |
| private static final int BACKUPS = 1; |
| |
| /** Cache store. */ |
| private static TestStore store = new TestStore(); |
| |
| /** Grid counter. */ |
| private AtomicInteger cntr = new AtomicInteger(0); |
| |
| /** Affinity based on node index mode. */ |
| private AffinityFunction aff = new GridCacheModuloAffinityFunction(GRID_CNT, BACKUPS); |
| |
| /** Debug flag for mappings. */ |
| private boolean mapDebug = true; |
| |
| /** |
| * |
| */ |
| public GridCacheNearMultiNodeSelfTest() { |
| super(false /* don't start grid. */); |
| } |
| |
| /** */ |
| @Before |
| public void beforeGridCacheNearMultiNodeSelfTest() { |
| MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.NEAR_CACHE); |
| MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE); |
| MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.NEAR_CACHE); |
| MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE); |
| MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK); |
| |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| CacheConfiguration cacheCfg = defaultCacheConfiguration(); |
| |
| cacheCfg.setCacheMode(PARTITIONED); |
| cacheCfg.setCacheStoreFactory(singletonFactory(store)); |
| cacheCfg.setReadThrough(true); |
| cacheCfg.setWriteThrough(true); |
| cacheCfg.setLoadPreviousValue(true); |
| cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); |
| cacheCfg.setAffinity(aff); |
| cacheCfg.setAtomicityMode(atomicityMode()); |
| cacheCfg.setBackups(BACKUPS); |
| cacheCfg.setNearConfiguration(new NearCacheConfiguration()); |
| |
| cfg.setCacheConfiguration(cacheCfg); |
| |
| cfg.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, cntr.getAndIncrement())); |
| |
| return cfg; |
| } |
| |
| /** |
| * @return Atomicity mode. |
| */ |
| protected CacheAtomicityMode atomicityMode() { |
| return TRANSACTIONAL; |
| } |
| |
| /** |
| * @return {@code True} if tests transactional cache. |
| */ |
| protected boolean transactional() { |
| return atomicityMode() == TRANSACTIONAL; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTestsStarted() throws Exception { |
| startGrids(GRID_CNT); |
| |
| awaitPartitionMapExchange(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"SizeReplaceableByIsEmpty"}) |
| @Override protected void beforeTest() throws Exception { |
| for (int i = 0; i < GRID_CNT; i++) { |
| assert jcache(i).localSize() == 0 : "Near cache size is not zero for grid: " + i; |
| assert dht(grid(i)).size() == 0 : "DHT cache size is not zero for grid: " + i; |
| |
| assert jcache(i).localSize() == 0 : "Near cache is not empty for grid: " + i; |
| assert dht(grid(i)).isEmpty() : "DHT cache is not empty for grid: " + i; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| for (int i = 0; i < GRID_CNT; i++) { |
| jcache(i).removeAll(); |
| |
| assertEquals("Near cache size is not zero for grid: " + i, 0, jcache(i).localSize()); |
| assertEquals("DHT cache size is not zero for grid: " + i, 0, dht(grid(i)).size()); |
| |
| assert jcache(i).localSize() == 0 : "Near cache is not empty for grid: " + i; |
| assert dht(grid(i)).isEmpty() : "DHT cache is not empty for grid: " + i; |
| } |
| |
| store.reset(); |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| Transaction tx = grid(i).transactions().tx(); |
| |
| if (tx != null) { |
| error("Ending zombie transaction: " + tx); |
| |
| tx.close(); |
| } |
| } |
| } |
| |
| /** |
| * @param g Grid. |
| * @return Dht cache. |
| */ |
| @SuppressWarnings({"unchecked"}) |
| private GridDhtCacheAdapter<Integer, String> dht(Ignite g) { |
| return ((GridNearCacheAdapter)((IgniteKernal)g).internalCache(DEFAULT_CACHE_NAME)).dht(); |
| } |
| |
| /** |
| * @param idx Index. |
| * @return Affinity. |
| */ |
| private Affinity<Object> affinity(int idx) { |
| return grid(idx).affinity(DEFAULT_CACHE_NAME); |
| } |
| |
| /** @param cnt Count. */ |
| private Map<UUID, T2<Set<Integer>, Set<Integer>>> mapKeys(int cnt) { |
| Affinity<Object> aff = affinity(0); |
| |
| //Mapping primary and backup keys on node |
| Map<UUID, T2<Set<Integer>, Set<Integer>>> map = new HashMap<>(); |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| IgniteEx grid = grid(i); |
| |
| map.put(grid.cluster().localNode().id(), new T2<Set<Integer>, Set<Integer>>(new HashSet<Integer>(), |
| new HashSet<Integer>())); |
| } |
| |
| for (int key = 1; key <= cnt; key++) { |
| Integer part = aff.partition(key); |
| |
| assert part != null; |
| |
| Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(part); |
| |
| ClusterNode primary = F.first(nodes); |
| |
| map.get(primary.id()).get1().add(key); |
| |
| if (mapDebug) |
| info("Mapped key to primary node [key=" + key + ", node=" + U.toShortString(primary)); |
| |
| for (ClusterNode n : nodes) { |
| if (n != primary) { |
| map.get(n.id()).get2().add(key); |
| |
| if (mapDebug) |
| info("Mapped key to backup node [key=" + key + ", node=" + U.toShortString(n)); |
| } |
| } |
| } |
| |
| return map; |
| } |
| |
| /** Test mappings. */ |
| @Test |
| public void testMappings() { |
| mapDebug = false; |
| |
| int cnt = 100000; |
| |
| Map<UUID, T2<Set<Integer>, Set<Integer>>> map = mapKeys(cnt); |
| |
| for (ClusterNode n : grid(0).cluster().nodes()) { |
| Set<Integer> primary = map.get(n.id()).get1(); |
| Set<Integer> backups = map.get(n.id()).get2(); |
| |
| if (backups == null) |
| backups = Collections.emptySet(); |
| |
| info("Grid node [primaries=" + primary.size() + ", backups=" + backups.size() + ']'); |
| |
| assert !F.isEmpty(primary); |
| |
| assertEquals(backups.size(), cnt - primary.size()); |
| } |
| } |
| |
| /** |
| * @param key Key. |
| * @return Primary node for the key. |
| */ |
| @Nullable private ClusterNode primaryNode(Integer key) { |
| return affinity(0).mapKeyToNode(key); |
| } |
| |
| /** |
| * @param key Key. |
| * @return Primary node for the key. |
| */ |
| @Nullable private Ignite primaryGrid(Integer key) { |
| ClusterNode n = affinity(0).mapKeyToNode(key); |
| |
| assert n != null; |
| |
| return G.ignite(n.id()); |
| } |
| |
| /** |
| * @param key Key. |
| * @return Primary node for the key. |
| */ |
| @Nullable private Collection<Ignite> backupGrids(Integer key) { |
| Collection<ClusterNode> nodes = affinity(0).mapKeyToPrimaryAndBackups(key); |
| |
| Collection<ClusterNode> backups = CU.backups(nodes); |
| |
| return F.viewReadOnly(backups, |
| new C1<ClusterNode, Ignite>() { |
| @Override public Ignite apply(ClusterNode node) { |
| return G.ignite(node.id()); |
| } |
| }); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testReadThroughAndPut() throws Exception { |
| Integer key = 100000; |
| |
| Ignite primary; |
| Ignite backup; |
| |
| if (grid(0) == primaryGrid(key)) { |
| primary = grid(0); |
| backup = grid(1); |
| } |
| else { |
| primary = grid(1); |
| backup = grid(0); |
| } |
| |
| assertEquals(String.valueOf(key), backup.cache(DEFAULT_CACHE_NAME).get(key)); |
| |
| primary.cache(DEFAULT_CACHE_NAME).put(key, "a"); |
| |
| assertEquals("a", backup.cache(DEFAULT_CACHE_NAME).get(key)); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testReadThrough() throws Exception { |
| ClusterNode loc = grid(0).localNode(); |
| |
| info("Local node: " + U.toShortString(loc)); |
| |
| IgniteCache<Integer, String> near = jcache(0); |
| |
| int cnt = 10; |
| |
| Map<UUID, T2<Set<Integer>, Set<Integer>>> mapKeys = mapKeys(cnt); |
| |
| for (int key = 1; key <= cnt; key++) { |
| String s = near.get(key); |
| |
| info("Read key [key=" + key + ", val=" + s + ']'); |
| |
| assert s != null; |
| } |
| |
| info("Read all keys."); |
| |
| for (int key = 1; key <= cnt; key++) { |
| ClusterNode n = primaryNode(key); |
| |
| info("Primary node for key [key=" + key + ", node=" + U.toShortString(n) + ']'); |
| |
| assert n != null; |
| |
| assert mapKeys.get(n.id()).get1().contains(key); |
| |
| GridCacheAdapter<Integer, String> dhtCache = dht(G.ignite(n.id())); |
| |
| String s = dhtCache.localPeek(key, null); |
| |
| assert s != null : "Value is null for key: " + key; |
| assertEquals(s, Integer.toString(key)); |
| } |
| } |
| |
| /** |
| * Test Optimistic repeatable read write-through. |
| * |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings({"ConstantConditions"}) |
| @Test |
| public void testOptimisticWriteThrough() throws Exception { |
| IgniteCache<Integer, String> near = jcache(0); |
| |
| if (transactional()) { |
| try (Transaction tx = grid(0).transactions().txStart(OPTIMISTIC, REPEATABLE_READ, 0, 0)) { |
| near.put(2, "2"); |
| |
| String s = near.getAndPut(3, "3"); |
| |
| assertNotNull(s); |
| assertEquals("3", s); |
| |
| assertEquals("2", near.get(2)); |
| assertEquals("3", near.get(3)); |
| |
| GridDhtCacheEntry entry = (GridDhtCacheEntry)dht(primaryGrid(2)).peekEx(2); |
| |
| if (entry != null) |
| assertNull("Unexpected entry: " + entry, entry.rawGet()); |
| |
| assertNotNull(localPeek(dht(primaryGrid(3)), 3)); |
| |
| tx.commit(); |
| } |
| } |
| else { |
| near.put(2, "2"); |
| |
| String s = near.getAndPut(3, "3"); |
| |
| assertNotNull(s); |
| assertEquals("3", s); |
| } |
| |
| assertEquals("2", near.localPeek(2)); |
| assertEquals("3", near.localPeek(3)); |
| |
| assertEquals("2", localPeek(dht(primaryGrid(2)), 2)); |
| assertEquals("3", localPeek(dht(primaryGrid(3)), 3)); |
| |
| assertEquals(2, near.localSize(CachePeekMode.ALL)); |
| assertEquals(2, near.localSize(CachePeekMode.ALL)); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testNoTransactionSinglePutx() throws Exception { |
| IgniteCache<Integer, String> near = jcache(0); |
| |
| near.put(2, "2"); |
| |
| assertEquals("2", near.localPeek(2)); |
| assertEquals("2", near.get(2)); |
| |
| assertEquals("2", localPeek(dht(primaryGrid(2)), 2)); |
| |
| assertEquals(1, near.localSize()); |
| assertEquals(1, near.localSize()); |
| |
| assertEquals(1, dht(primaryGrid(2)).size()); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testNoTransactionSinglePut() throws Exception { |
| IgniteCache<Integer, String> near = jcache(0); |
| |
| // There should be a not-null previously mapped value because |
| // we use a store implementation that just returns values which |
| // are string representations of requesting integer keys. |
| String s = near.getAndPut(3, "3"); |
| |
| assertNotNull(s); |
| assertEquals("3", s); |
| |
| assertEquals("3", near.localPeek(3)); |
| assertEquals("3", near.get(3)); |
| |
| Ignite primaryIgnite = primaryGrid(3); |
| |
| assert primaryIgnite != null; |
| |
| info("Primary grid for key 3: " + U.toShortString(primaryIgnite.cluster().localNode())); |
| |
| assertEquals("3", localPeek(dht(primaryIgnite), 3)); |
| |
| assertEquals(1, near.localSize(CachePeekMode.ALL)); |
| assertEquals(1, near.localSize(CachePeekMode.ALL)); |
| |
| assertEquals(1, dht(primaryIgnite).size()); |
| |
| // Check backup nodes. |
| Collection<Ignite> backups = backupGrids(3); |
| |
| assert backups != null; |
| |
| for (Ignite b : backups) { |
| info("Backup grid for key 3: " + U.toShortString(b.cluster().localNode())); |
| |
| assertEquals("3", localPeek(dht(b), 3)); |
| |
| assertEquals(1, dht(b).size()); |
| } |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testNoTransactionWriteThrough() throws Exception { |
| IgniteCache<Integer, String> near = jcache(0); |
| |
| near.put(2, "2"); |
| |
| String s = near.getAndPut(3, "3"); |
| |
| assertNotNull(s); |
| assertEquals("3", s); |
| |
| assertEquals("2", near.localPeek(2)); |
| assertEquals("3", near.localPeek(3)); |
| |
| assertEquals("2", near.get(2)); |
| assertEquals("3", near.get(3)); |
| |
| assertEquals("2", localPeek(dht(primaryGrid(2)), 2)); |
| assertEquals("3", localPeek(dht(primaryGrid(3)), 3)); |
| |
| assertEquals(2, near.localSize(CachePeekMode.ALL)); |
| assertEquals(2, near.localSize(CachePeekMode.ALL)); |
| } |
| |
| /** |
| * Test Optimistic repeatable read write-through. |
| * |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings({"ConstantConditions"}) |
| @Test |
| public void testPessimisticWriteThrough() throws Exception { |
| IgniteCache<Integer, String> near = jcache(0); |
| |
| if (transactional()) { |
| try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 0)) { |
| near.put(2, "2"); |
| |
| String s = near.getAndPut(3, "3"); |
| |
| assertEquals("3", s); |
| |
| assertEquals("2", near.get(2)); |
| assertEquals("3", near.get(3)); |
| |
| assertNotNull(dht(primaryGrid(3)).localPeek(3, null)); |
| |
| tx.commit(); |
| } |
| } |
| else { |
| near.put(2, "2"); |
| |
| String s = near.getAndPut(3, "3"); |
| |
| assertNotNull(s); |
| assertEquals("3", s); |
| } |
| |
| assertEquals("2", near.localPeek(2)); |
| assertEquals("3", near.localPeek(3)); |
| |
| assertEquals("2", localPeek(dht(primaryGrid(2)), 2)); |
| assertEquals("3", localPeek(dht(primaryGrid(3)), 3)); |
| |
| assertEquals(2, near.localSize(CachePeekMode.ALL)); |
| assertEquals(2, near.localSize(CachePeekMode.ALL)); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testConcurrentOps() throws Exception { |
| // Don't create missing values. |
| store.create(false); |
| |
| IgniteCache<Integer, String> near = jcache(0); |
| |
| int key = 1; |
| |
| assertTrue(near.putIfAbsent(key, "1")); |
| assertFalse(near.putIfAbsent(key, "1")); |
| assertEquals("1", near.getAndPutIfAbsent(key, "2")); |
| |
| assertEquals("1", near.localPeek(key)); |
| assertEquals(1, near.localSize(CachePeekMode.ALL)); |
| assertEquals(1, near.localSize(CachePeekMode.ALL)); |
| |
| assertEquals("1", near.getAndReplace(key, "2")); |
| assertEquals("2", near.localPeek(key)); |
| |
| assertTrue(near.replace(key, "2")); |
| |
| assertEquals("2", near.localPeek(key)); |
| assertEquals(1, near.localSize(CachePeekMode.ALL)); |
| assertEquals(1, near.localSize(CachePeekMode.ALL)); |
| |
| assertTrue(near.remove(key, "2")); |
| |
| assertEquals(0, near.localSize(CachePeekMode.ALL)); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testBackupsLocalAffinity() throws Exception { |
| checkBackupConsistency(2); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testBackupsRemoteAffinity() throws Exception { |
| checkBackupConsistency(1); |
| } |
| |
| /** |
| * @param key Key to check. |
| * @throws Exception If failed. |
| */ |
| private void checkBackupConsistency(int key) throws Exception { |
| IgniteCache<Integer, String> cache = jcache(0); |
| |
| String val = Integer.toString(key); |
| |
| cache.put(key, val); |
| |
| GridDhtCacheAdapter<Integer, String> dht0 = dht(0); |
| GridDhtCacheAdapter<Integer, String> dht1 = dht(1); |
| |
| assertNull(near(0).peekEx(key)); |
| assertNull(near(1).peekEx(key)); |
| |
| assertEquals(val, localPeek(dht0, key)); |
| assertEquals(val, localPeek(dht1, key)); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testSingleLockLocalAffinity() throws Exception { |
| checkSingleLock(2); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testSingleLockRemoteAffinity() throws Exception { |
| checkSingleLock(1); |
| } |
| |
| /** |
| * @param idx Grid index. |
| * @param key Key. |
| * @return Near entry. |
| */ |
| @Nullable private GridNearCacheEntry nearEntry(int idx, int key) { |
| return (GridNearCacheEntry)near(idx).peekEx(key); |
| } |
| |
| /** |
| * @param o Object. |
| * @return Hash value. |
| */ |
| private int hash(Object o) { |
| return System.identityHashCode(o); |
| } |
| |
| /** |
| * @param key Key. |
| * @throws Exception If failed. |
| */ |
| private void checkSingleLock(int key) throws Exception { |
| if (!transactional()) |
| return; |
| |
| IgniteCache<Integer, String> cache = jcache(0); |
| |
| String val = Integer.toString(key); |
| |
| Collection<ClusterNode> affNodes = grid(0).affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key); |
| |
| info("Affinity for key [nodeId=" + U.nodeIds(affNodes) + ", key=" + key + ']'); |
| |
| assertEquals(2, affNodes.size()); |
| |
| ClusterNode primary = F.first(affNodes); |
| |
| assertNotNull(primary); |
| |
| info("Primary local: " + primary.isLocal()); |
| |
| Lock lock = cache.lock(key); |
| |
| lock.lock(); |
| |
| try { |
| AffinityTopologyVersion topVer = grid(0).context().discovery().topologyVersionEx(); |
| |
| GridNearCacheEntry nearEntry1 = nearEntry(0, key); |
| |
| info("Peeked entry after lock [hash=" + hash(nearEntry1) + ", nearEntry=" + nearEntry1 + ']'); |
| |
| assertNotNull(nearEntry1); |
| assertTrue("Invalid near entry: " + nearEntry1, nearEntry1.valid(topVer)); |
| |
| assertTrue(cache.isLocalLocked(key, false)); |
| assertTrue(cache.isLocalLocked(key, true)); |
| |
| cache.put(key, val); |
| |
| GridNearCacheEntry nearEntry2 = nearEntry(0, key); |
| |
| info("Peeked entry after put [hash=" + hash(nearEntry1) + ", nearEntry=" + nearEntry2 + ']'); |
| |
| assert nearEntry1 == nearEntry2; |
| |
| assertNotNull(nearEntry2); |
| assertTrue("Invalid near entry [hash=" + nearEntry2, nearEntry2.valid(topVer)); |
| |
| assertEquals(val, cache.localPeek(key)); |
| assertEquals(val, dhtPeek(0, key)); |
| assertEquals(val, dhtPeek(1, key)); |
| |
| GridNearCacheEntry nearEntry3 = nearEntry(0, key); |
| |
| info("Peeked entry after peeks [hash=" + hash(nearEntry1) + ", nearEntry=" + nearEntry3 + ']'); |
| |
| assert nearEntry2 == nearEntry3; |
| |
| assertNotNull(nearEntry3); |
| assertTrue("Invalid near entry: " + nearEntry3, nearEntry3.valid(topVer)); |
| |
| assertNotNull(near(0).peekEx(key)); |
| assertNull(near(1).peekEx(key)); |
| |
| assertEquals(val, cache.get(key)); |
| assertEquals(val, cache.getAndRemove(key)); |
| |
| assertNull(cache.localPeek(key)); |
| assertNull(localPeek(dht(primaryGrid(key)), key)); |
| |
| assertTrue(cache.isLocalLocked(key, false)); |
| assertTrue(cache.isLocalLocked(key, true)); |
| } |
| finally { |
| lock.unlock(); |
| } |
| |
| assertNull(near(0).peekEx(key)); |
| assertNull(near(1).peekEx(key)); |
| |
| assertFalse(near(0).isLockedNearOnly(key)); |
| assertFalse(cache.isLocalLocked(key, true)); |
| } |
| |
| /** @throws Throwable If failed. */ |
| @Test |
| public void testSingleLockReentryLocalAffinity() throws Throwable { |
| checkSingleLockReentry(2); |
| } |
| |
| /** @throws Throwable If failed. */ |
| @Test |
| public void testSingleLockReentryRemoteAffinity() throws Throwable { |
| checkSingleLockReentry(1); |
| } |
| |
| /** |
| * @param key Key. |
| * @throws Exception If failed. |
| */ |
| private void checkSingleLockReentry(int key) throws Throwable { |
| if (!transactional()) |
| return; |
| |
| IgniteCache<Integer, String> near = jcache(0); |
| |
| String val = Integer.toString(key); |
| |
| Lock lock = near.lock(key); |
| |
| lock.lock(); |
| |
| try { |
| near.put(key, val); |
| |
| assertEquals(val, near.localPeek(key)); |
| assertEquals(val, localPeek(dht(primaryGrid(key)), key)); |
| |
| assertTrue(near.isLocalLocked(key, false)); |
| assertTrue(near.isLocalLocked(key, true)); |
| |
| lock.lock(); // Reentry. |
| |
| try { |
| assertEquals(val, near.get(key)); |
| assertEquals(val, near.getAndRemove(key)); |
| |
| assertNull(near.localPeek(key)); |
| assertNull(localPeek(dht(primaryGrid(key)), key)); |
| |
| assertTrue(near.isLocalLocked(key, false)); |
| assertTrue(near.isLocalLocked(key, true)); |
| } |
| finally { |
| lock.unlock(); |
| } |
| |
| assertTrue(near.isLocalLocked(key, false)); |
| assertTrue(near.isLocalLocked(key, true)); |
| } |
| catch (Throwable t) { |
| error("Test failed.", t); |
| |
| throw t; |
| } |
| finally { |
| lock.unlock(); |
| } |
| |
| assertFalse(near(0).isLockedNearOnly(key)); |
| assertFalse(near.isLocalLocked(key, true)); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testTransactionSingleGetLocalAffinity() throws Exception { |
| checkTransactionSingleGet(2); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testTransactionSingleGetRemoteAffinity() throws Exception { |
| checkTransactionSingleGet(1); |
| } |
| |
| /** |
| * @param key Key. |
| * @throws Exception If failed. |
| */ |
| private void checkTransactionSingleGet(int key) throws Exception { |
| IgniteCache<Integer, String> cache = jcache(0); |
| |
| String val = Integer.toString(key); |
| |
| cache.put(key, val); |
| |
| assertEquals(val, dhtPeek(0, key)); |
| assertEquals(val, dhtPeek(1, key)); |
| |
| assertNull(near(0).peekEx(key)); |
| assertNull(near(1).peekEx(key)); |
| |
| if (transactional()) { |
| |
| try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { |
| // Simple transaction get. |
| assertEquals(val, cache.get(key)); |
| |
| tx.commit(); |
| } |
| } |
| else |
| assertEquals(val, cache.get(key)); |
| |
| assertEquals(val, dhtPeek(0, key)); |
| assertEquals(val, dhtPeek(1, key)); |
| |
| assertNull(near(0).peekEx(key)); |
| assertNull(near(1).peekEx(key)); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testTransactionSingleGetRemoveLocalAffinity() throws Exception { |
| checkTransactionSingleGetRemove(2); |
| } |
| |
| /** @throws Exception If failed. */ |
| @Test |
| public void testTransactionSingleGetRemoveRemoteAffinity() throws Exception { |
| checkTransactionSingleGetRemove(1); |
| } |
| |
| /** |
| * @param key Key |
| * @throws Exception If failed. |
| */ |
| public void checkTransactionSingleGetRemove(int key) throws Exception { |
| IgniteCache<Object, Object> cache = jcache(0); |
| |
| String val = Integer.toString(key); |
| |
| cache.put(key, val); |
| |
| assertEquals(val, dhtPeek(0, key)); |
| assertEquals(val, dhtPeek(1, key)); |
| |
| assertNull(near(0).peekEx(key)); |
| assertNull(near(1).peekEx(key)); |
| |
| if (transactional()) { |
| try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { |
| // Read. |
| assertEquals(val, cache.get(key)); |
| |
| // Remove. |
| assertTrue(cache.remove(key)); |
| |
| tx.commit(); |
| } |
| } |
| else { |
| // Read. |
| assertEquals(val, cache.get(key)); |
| |
| // Remove. |
| assertTrue(cache.remove(key)); |
| } |
| |
| assertNull(dhtPeek(0, key)); |
| assertNull(dhtPeek(1, key)); |
| |
| assertNull(near(0).peekEx(key)); |
| assertNull(near(1).peekEx(key)); |
| } |
| |
| /** |
| * |
| */ |
| private static class TestStore extends CacheStoreAdapter<Integer, String> { |
| /** Map. */ |
| private ConcurrentMap<Integer, String> map = new ConcurrentHashMap<>(); |
| |
| /** Create flag. */ |
| private volatile boolean create = true; |
| |
| /** |
| * |
| */ |
| void reset() { |
| map.clear(); |
| |
| create = true; |
| } |
| |
| /** @param create Create flag. */ |
| void create(boolean create) { |
| this.create = create; |
| } |
| |
| /** @return Create flag. */ |
| boolean isCreate() { |
| return create; |
| } |
| |
| /** |
| * @param key Key. |
| * @return Value. |
| */ |
| String value(Integer key) { |
| return map.get(key); |
| } |
| |
| /** @return {@code True} if empty. */ |
| boolean isEmpty() { |
| return map.isEmpty(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String load(Integer key) { |
| if (!create) |
| return map.get(key); |
| |
| String s = map.putIfAbsent(key, key.toString()); |
| |
| return s == null ? key.toString() : s; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void write(javax.cache.Cache.Entry<? extends Integer, ? extends String> e) { |
| map.put(e.getKey(), e.getValue()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void delete(Object key) { |
| map.remove(key); |
| } |
| } |
| } |