| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht; |
| |
| import java.io.Serializable; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import javax.cache.processor.EntryProcessor; |
| import javax.cache.processor.MutableEntry; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.cache.CachePeekMode; |
| import org.apache.ignite.cache.affinity.Affinity; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.NearCacheConfiguration; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; |
| import org.apache.ignite.internal.util.lang.GridAbsPredicate; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; |
| |
| /** |
| * Tests near cache with various atomic cache configuration. |
| */ |
| public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest { |
| /** */ |
| private static final int GRID_CNT = 4; |
| |
| /** */ |
| private static final int PRIMARY = 0; |
| |
| /** */ |
| private static final int BACKUP = 1; |
| |
| /** */ |
| private static final int NOT_PRIMARY_AND_BACKUP = 2; |
| |
| /** */ |
| private int backups; |
| |
| /** */ |
| private int lastKey; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); |
| |
| ccfg.setCacheMode(PARTITIONED); |
| ccfg.setAtomicityMode(ATOMIC); |
| ccfg.setNearConfiguration(new NearCacheConfiguration()); |
| ccfg.setWriteSynchronizationMode(FULL_SYNC); |
| ccfg.setRebalanceMode(SYNC); |
| ccfg.setBackups(backups); |
| |
| cfg.setCacheConfiguration(ccfg); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| stopAllGrids(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testNoBackups() throws Exception { |
| doStartGrids(0); |
| |
| checkNearCache(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testWithBackups() throws Exception { |
| doStartGrids(2); |
| |
| checkNearCache(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| private void checkNearCache() throws Exception { |
| checkPut(); |
| |
| checkPutAll(); |
| |
| checkTransform(); |
| |
| checkTransformAll(); |
| |
| checkRemove(); |
| |
| checkReaderEvict(); |
| |
| checkReaderRemove(); |
| |
| checkPutRemoveGet(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings("ZeroLengthArrayAllocation") |
| private void checkPutAll() throws Exception { |
| log.info("Check putAll."); |
| |
| Ignite ignite0 = grid(0); |
| |
| IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME); |
| |
| Affinity<Integer> aff = ignite0.affinity(DEFAULT_CACHE_NAME); |
| |
| UUID id0 = ignite0.cluster().localNode().id(); |
| |
| Map<Integer, Integer> primaryKeys = new HashMap<>(); |
| |
| for (int i = 0; i < 10; i++) |
| primaryKeys.put(key(ignite0, PRIMARY), 1); |
| |
| log.info("PutAll from primary."); |
| |
| cache0.putAll(primaryKeys); |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| for (Integer primaryKey : primaryKeys.keySet()) |
| checkEntry(grid(i), primaryKey, 1, false); |
| } |
| |
| if (backups > 0) { |
| Map<Integer, Integer> backupKeys = new HashMap<>(); |
| |
| for (int i = 0; i < 10; i++) |
| backupKeys.put(key(ignite0, BACKUP), 2); |
| |
| log.info("PutAll from backup."); |
| |
| cache0.putAll(backupKeys); |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| for (Integer backupKey : backupKeys.keySet()) |
| checkEntry(grid(i), backupKey, 2, false); |
| } |
| } |
| |
| Map<Integer, Integer> nearKeys = new HashMap<>(); |
| |
| for (int i = 0; i < 30; i++) |
| nearKeys.put(key(ignite0, NOT_PRIMARY_AND_BACKUP), 3); |
| |
| log.info("PutAll from near."); |
| |
| cache0.putAll(nearKeys); |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| for (Integer nearKey : nearKeys.keySet()) { |
| UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {}; |
| |
| checkEntry(grid(i), nearKey, 3, i == 0, expReaders); |
| } |
| } |
| |
| Map<Integer, Collection<UUID>> readersMap = new HashMap<>(); |
| |
| for (Integer key : nearKeys.keySet()) |
| readersMap.put(key, new HashSet<UUID>()); |
| |
| int val = 4; |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| IgniteCache<Integer, Integer> cache = grid(i).cache(DEFAULT_CACHE_NAME); |
| |
| for (Integer key : nearKeys.keySet()) |
| nearKeys.put(key, val); |
| |
| log.info("PutAll [igniteInstanceName=" + grid(i).name() + ", val=" + val + ']'); |
| |
| cache.putAll(nearKeys); |
| |
| for (Integer key : nearKeys.keySet()) { |
| if (!aff.isPrimaryOrBackup(grid(i).localNode(), key)) |
| readersMap.get(key).add(grid(i).localNode().id()); |
| } |
| |
| for (int j = 0; j < GRID_CNT; j++) { |
| for (Integer key : nearKeys.keySet()) { |
| boolean primaryNode = aff.isPrimary(grid(j).localNode(), key); |
| |
| Collection<UUID> readers = readersMap.get(key); |
| |
| UUID[] expReaders = primaryNode ? U.toArray(readers, new UUID[readers.size()]) : new UUID[] {}; |
| |
| checkEntry(grid(j), key, val, readers.contains(grid(j).localNode().id()), expReaders); |
| } |
| } |
| |
| val++; |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings("ZeroLengthArrayAllocation") |
| private void checkTransform() throws Exception { |
| log.info("Check transform."); |
| |
| Ignite ignite0 = grid(0); |
| |
| IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME); |
| |
| Affinity<Object> aff = ignite0.affinity(DEFAULT_CACHE_NAME); |
| |
| UUID id0 = ignite0.cluster().localNode().id(); |
| |
| Integer primaryKey = key(ignite0, PRIMARY); |
| |
| log.info("Transform from primary."); |
| |
| cache0.invoke(primaryKey, new Processor(primaryKey)); |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| checkEntry(grid(i), primaryKey, primaryKey, false); |
| |
| if (backups > 0) { |
| Integer backupKey = key(ignite0, BACKUP); |
| |
| log.info("Transform from backup."); |
| |
| cache0.invoke(backupKey, new Processor(backupKey)); |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| checkEntry(grid(i), backupKey, backupKey, false); |
| } |
| |
| Integer nearKey = key(ignite0, NOT_PRIMARY_AND_BACKUP); |
| |
| log.info("Transform from near."); |
| |
| cache0.invoke(nearKey, new Processor(nearKey)); |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {}; |
| |
| checkEntry(grid(i), nearKey, nearKey, i == 0, expReaders); |
| } |
| |
| Collection<UUID> readers = new HashSet<>(); |
| |
| readers.add(id0); |
| |
| int val = nearKey + 1; |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| IgniteCache<Integer, Integer> cache = grid(i).cache(DEFAULT_CACHE_NAME); |
| |
| log.info("Transform [igniteInstanceName=" + grid(i).name() + ", val=" + val + ']'); |
| |
| cache.invoke(nearKey, new Processor(val)); |
| |
| if (!aff.isPrimaryOrBackup(grid(i).localNode(), nearKey)) |
| readers.add(grid(i).localNode().id()); |
| |
| for (int j = 0; j < GRID_CNT; j++) { |
| boolean primaryNode = aff.isPrimary(grid(j).localNode(), nearKey); |
| |
| UUID[] expReaders = primaryNode ? U.toArray(readers, new UUID[readers.size()]) : new UUID[] {}; |
| |
| checkEntry(grid(j), nearKey, val, readers.contains(grid(j).localNode().id()), expReaders); |
| } |
| |
| val++; |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings("ZeroLengthArrayAllocation") |
| private void checkTransformAll() throws Exception { |
| log.info("Check transformAll."); |
| |
| Ignite ignite0 = grid(0); |
| |
| IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME); |
| |
| Affinity<Object> aff = ignite0.affinity(DEFAULT_CACHE_NAME); |
| |
| UUID id0 = ignite0.cluster().localNode().id(); |
| |
| Set<Integer> primaryKeys = new HashSet<>(); |
| |
| for (int i = 0; i < 10; i++) |
| primaryKeys.add(key(ignite0, PRIMARY)); |
| |
| log.info("TransformAll from primary."); |
| |
| cache0.invokeAll(primaryKeys, new Processor(1)); |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| for (Integer primaryKey : primaryKeys) |
| checkEntry(grid(i), primaryKey, 1, false); |
| } |
| |
| if (backups > 0) { |
| Set<Integer> backupKeys = new HashSet<>(); |
| |
| for (int i = 0; i < 10; i++) |
| backupKeys.add(key(ignite0, BACKUP)); |
| |
| log.info("TransformAll from backup."); |
| |
| cache0.invokeAll(backupKeys, new Processor(2)); |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| for (Integer backupKey : backupKeys) |
| checkEntry(grid(i), backupKey, 2, false); |
| } |
| } |
| |
| Set<Integer> nearKeys = new HashSet<>(); |
| |
| for (int i = 0; i < 30; i++) |
| nearKeys.add(key(ignite0, NOT_PRIMARY_AND_BACKUP)); |
| |
| log.info("TransformAll from near."); |
| |
| cache0.invokeAll(nearKeys, new Processor(3)); |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| for (Integer nearKey : nearKeys) { |
| UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {}; |
| |
| checkEntry(grid(i), nearKey, 3, i == 0, expReaders); |
| } |
| } |
| |
| Map<Integer, Collection<UUID>> readersMap = new HashMap<>(); |
| |
| for (Integer key : nearKeys) |
| readersMap.put(key, new HashSet<UUID>()); |
| |
| int val = 4; |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| IgniteCache<Integer, Integer> cache = grid(i).cache(DEFAULT_CACHE_NAME); |
| |
| for (Integer key : nearKeys) |
| nearKeys.add(key); |
| |
| log.info("TransformAll [igniteInstanceName=" + grid(i).name() + ", val=" + val + ']'); |
| |
| cache.invokeAll(nearKeys, new Processor(val)); |
| |
| for (Integer key : nearKeys) { |
| if (!aff.isPrimaryOrBackup(grid(i).localNode(), key)) |
| readersMap.get(key).add(grid(i).localNode().id()); |
| } |
| |
| for (int j = 0; j < GRID_CNT; j++) { |
| for (Integer key : nearKeys) { |
| boolean primaryNode = aff.isPrimary(grid(j).localNode(), key); |
| |
| Collection<UUID> readers = readersMap.get(key); |
| |
| UUID[] expReaders = primaryNode ? U.toArray(readers, new UUID[readers.size()]) : new UUID[] {}; |
| |
| checkEntry(grid(j), key, val, readers.contains(grid(j).localNode().id()), expReaders); |
| } |
| } |
| |
| val++; |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| private void checkPutRemoveGet() throws Exception { |
| Ignite ignite0 = grid(0); |
| |
| IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME); |
| |
| Integer key = key(ignite0, NOT_PRIMARY_AND_BACKUP); |
| |
| cache0.put(key, key); |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| grid(i).cache(DEFAULT_CACHE_NAME).get(key); |
| |
| cache0.remove(key); |
| |
| cache0.put(key, key); |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| grid(i).cache(DEFAULT_CACHE_NAME).get(key); |
| |
| cache0.remove(key); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| private void checkPut() throws Exception { |
| checkPut(0); |
| |
| checkPut(GRID_CNT - 1); |
| } |
| |
| /** |
| * @param grid Grid Index. |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings("ZeroLengthArrayAllocation") |
| private void checkPut(int grid) throws Exception { |
| log.info("Check put, grid: " + grid); |
| |
| Ignite ignite0 = grid(grid); |
| |
| IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME); |
| |
| Affinity<Integer> aff = ignite0.affinity(DEFAULT_CACHE_NAME); |
| |
| UUID id0 = ignite0.cluster().localNode().id(); |
| |
| Integer primaryKey = key(ignite0, PRIMARY); |
| |
| log.info("Put from primary."); |
| |
| cache0.put(primaryKey, primaryKey); |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| checkEntry(grid(i), primaryKey, primaryKey, false); |
| |
| if (backups > 0) { |
| Integer backupKey = key(ignite0, BACKUP); |
| |
| log.info("Put from backup."); |
| |
| cache0.put(backupKey, backupKey); |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| checkEntry(grid(i), backupKey, backupKey, false); |
| } |
| |
| Integer nearKey = key(ignite0, NOT_PRIMARY_AND_BACKUP); |
| |
| log.info("Put from near."); |
| |
| cache0.put(nearKey, nearKey); |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {}; |
| |
| checkEntry(grid(i), nearKey, nearKey, i == grid, expReaders); |
| } |
| |
| Collection<UUID> readers = new HashSet<>(); |
| |
| readers.add(id0); |
| |
| int val = nearKey + 1; |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| IgniteCache<Integer, Integer> cache = grid(i).cache(DEFAULT_CACHE_NAME); |
| |
| log.info("Put [igniteInstanceName=" + grid(i).name() + ", val=" + val + ']'); |
| |
| cache.put(nearKey, val); |
| |
| if (!aff.isPrimaryOrBackup(grid(i).localNode(), nearKey)) |
| readers.add(grid(i).localNode().id()); |
| |
| for (int j = 0; j < GRID_CNT; j++) { |
| boolean primaryNode = aff.isPrimary(grid(j).localNode(), nearKey); |
| |
| UUID[] expReaders = primaryNode ? U.toArray(readers, new UUID[readers.size()]) : new UUID[] {}; |
| |
| checkEntry(grid(j), nearKey, val, readers.contains(grid(j).localNode().id()), expReaders); |
| } |
| |
| val++; |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| private void checkRemove() throws Exception { |
| log.info("Check remove."); |
| |
| Ignite ignite0 = grid(0); |
| |
| IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME); |
| |
| Integer primaryKey = key(ignite0, PRIMARY); |
| |
| log.info("Put from primary."); |
| |
| cache0.put(primaryKey, primaryKey); |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| checkEntry(grid(i), primaryKey, primaryKey, false); |
| |
| log.info("Remove from primary."); |
| |
| cache0.remove(primaryKey); |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| checkEntry(grid(i), primaryKey, null, false); |
| |
| if (backups > 0) { |
| Integer backupKey = key(ignite0, BACKUP); |
| |
| log.info("Put from backup."); |
| |
| cache0.put(backupKey, backupKey); |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| checkEntry(grid(i), backupKey, backupKey, false); |
| |
| log.info("Remove from backup."); |
| |
| cache0.remove(backupKey); |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| checkEntry(grid(i), backupKey, null, false); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings("ZeroLengthArrayAllocation") |
| private void checkReaderEvict() throws Exception { |
| log.info("Check evict."); |
| |
| Ignite ignite0 = grid(0); |
| |
| IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME); |
| |
| Affinity<Integer> aff = ignite0.affinity(DEFAULT_CACHE_NAME); |
| |
| UUID id0 = ignite0.cluster().localNode().id(); |
| |
| Integer nearKey = key(ignite0, NOT_PRIMARY_AND_BACKUP); |
| |
| cache0.put(nearKey, 1); // Put should create near entry on grid0. |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {}; |
| |
| checkEntry(grid(i), nearKey, 1, i == 0, expReaders); |
| } |
| |
| cache0.localEvict(Collections.singleton(nearKey)); |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {}; |
| |
| checkEntry(grid(i), nearKey, 1, false, expReaders); |
| } |
| |
| IgniteCache<Integer, Integer> primaryCache = G.ignite( |
| (String)aff.mapKeyToNode(nearKey).attribute(ATTR_IGNITE_INSTANCE_NAME)).cache(DEFAULT_CACHE_NAME); |
| |
| primaryCache.put(nearKey, 2); // This put should see that near entry evicted on grid0 and remove reader. |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| checkEntry(grid(i), nearKey, 2, false); |
| |
| assertEquals((Integer)2, cache0.get(nearKey)); // Get should again create near entry on grid0. |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {}; |
| |
| checkEntry(grid(i), nearKey, 2, i == 0, expReaders); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings("ZeroLengthArrayAllocation") |
| private void checkReaderRemove() throws Exception { |
| Ignite ignite0 = grid(0); |
| |
| IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME); |
| |
| Affinity<Integer> aff = ignite0.affinity(DEFAULT_CACHE_NAME); |
| |
| UUID id0 = ignite0.cluster().localNode().id(); |
| |
| Integer nearKey = key(ignite0, NOT_PRIMARY_AND_BACKUP); |
| |
| cache0.put(nearKey, 1); // Put should create near entry on grid0. |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {}; |
| |
| checkEntry(grid(i), nearKey, 1, i == 0, expReaders); |
| } |
| |
| cache0.remove(nearKey); // Remove from grid0, this should remove readers on primary node. |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| checkEntry(grid(i), nearKey, null, i == 0); |
| |
| Ignite primaryNode = G.ignite((String)aff.mapKeyToNode(nearKey).attribute(ATTR_IGNITE_INSTANCE_NAME)); |
| |
| IgniteCache<Integer, Integer> primaryCache = primaryNode.cache(DEFAULT_CACHE_NAME); |
| |
| primaryCache.put(nearKey, 2); // Put from primary, check there are no readers. |
| |
| checkEntry(primaryNode, nearKey, 2, false); |
| } |
| |
| /** |
| * @param ignite Node. |
| * @param key Key. |
| * @param val Expected value. |
| * @param expectNear If {@code true} then near cache entry is expected. |
| * @param expReaders Expected readers. |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings("ConstantConditions") |
| private void checkEntry(Ignite ignite, |
| Integer key, |
| @Nullable Integer val, |
| boolean expectNear, |
| final UUID... expReaders) throws Exception |
| { |
| GridCacheAdapter<Integer, Integer> near = ((IgniteKernal)ignite).internalCache(DEFAULT_CACHE_NAME); |
| |
| assertTrue(near.isNear()); |
| |
| GridCacheEntryEx nearEntry = near.peekEx(key); |
| |
| boolean expectDht = near.affinity().isPrimaryOrBackup(ignite.cluster().localNode(), key); |
| |
| if (expectNear) { |
| assertNotNull("No near entry for: " + key + ", grid: " + ignite.name(), nearEntry); |
| |
| assertEquals("Unexpected value for grid: " + ignite.name(), |
| val, |
| CU.value(nearEntry.info().value(), near.context(), false)); |
| |
| assertEquals("Unexpected value for grid: " + ignite.name(), |
| val, |
| ignite.cache(near.name()).localPeek(key, CachePeekMode.ONHEAP)); |
| } |
| else |
| assertNull("Unexpected near entry: " + nearEntry + ", grid: " + ignite.name(), nearEntry); |
| |
| GridDhtCacheAdapter<Integer, Integer> dht = ((GridNearCacheAdapter<Integer, Integer>)near).dht(); |
| |
| if (expectDht) { |
| final GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)dht.entryEx(key); |
| |
| assertNotNull("No dht entry for: " + key + ", grid: " + ignite.name(), dhtEntry); |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| try { |
| return dhtEntry.readers().size() == expReaders.length; |
| } |
| catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| }, 5000); |
| |
| Collection<UUID> readers = dhtEntry.readers(); |
| |
| assertEquals(expReaders.length, readers.size()); |
| |
| for (UUID reader : expReaders) |
| assertTrue(readers.contains(reader)); |
| |
| if (dhtEntry.peekVisibleValue() == null) |
| dhtEntry.unswap(); |
| |
| assertEquals("Unexpected value for grid: " + ignite.name(), |
| val, |
| CU.value(dhtEntry.info().value(), dht.context(), false)); |
| |
| assertEquals("Unexpected value for grid: " + ignite.name(), |
| val, |
| ignite.cache(near.name()).localPeek(key, CachePeekMode.ONHEAP)); |
| } |
| else { |
| GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)dht.peekEx(key); |
| |
| assertNull("Unexpected dht entry: " + dhtEntry + ", grid: " + ignite.name(), dhtEntry); |
| } |
| |
| if (!expectNear && !expectDht) { |
| assertNull("Unexpected peek value for grid: " + ignite.name(), ignite.cache(near.name()).localPeek(key, |
| CachePeekMode.ONHEAP)); |
| } |
| } |
| |
| /** |
| * @param ignite Grid. |
| * @param mode One of {@link #PRIMARY}, {@link #BACKUP} or {@link #NOT_PRIMARY_AND_BACKUP}. |
| * @return Key with properties specified by the given mode. |
| */ |
| private Integer key(Ignite ignite, int mode) { |
| Affinity<Integer> aff = ignite.affinity(DEFAULT_CACHE_NAME); |
| |
| Integer key = null; |
| |
| for (int i = lastKey + 1; i < 1_000_000; i++) { |
| boolean pass = false; |
| |
| switch (mode) { |
| case PRIMARY: |
| pass = aff.isPrimary(ignite.cluster().localNode(), i); |
| break; |
| |
| case BACKUP: |
| pass = aff.isBackup(ignite.cluster().localNode(), i); |
| break; |
| |
| case NOT_PRIMARY_AND_BACKUP: |
| pass = !aff.isPrimaryOrBackup(ignite.cluster().localNode(), i); |
| break; |
| |
| default: |
| fail(); |
| } |
| |
| lastKey = i; |
| |
| if (pass) { |
| key = i; |
| |
| break; |
| } |
| } |
| |
| assertNotNull(key); |
| |
| return key; |
| } |
| |
| /** |
| * @param backups Backups number. |
| * @throws Exception If failed. |
| */ |
| private void doStartGrids(int backups) throws Exception { |
| this.backups = backups; |
| |
| startGrids(GRID_CNT); |
| |
| awaitPartitionMapExchange(); |
| |
| log.info("Grids: "); |
| |
| for (int i = 0; i < GRID_CNT; i++) |
| log.info(grid(i).name() + ": " + grid(i).localNode().id()); |
| } |
| |
| /** |
| * |
| */ |
| private static class Processor implements EntryProcessor<Integer, Integer, Void>, Serializable { |
| /** */ |
| private final Integer newVal; |
| |
| /** |
| * @param newVal New value. |
| */ |
| private Processor(Integer newVal) { |
| this.newVal = newVal; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { |
| e.setValue(newVal); |
| |
| return null; |
| } |
| } |
| } |