blob: cae96434ea28138fb876e0131fd5a68e8984019a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
/**
* Tests near cache with various atomic cache configuration.
*/
public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
/** */
private static final int GRID_CNT = 4;
/** */
private static final int PRIMARY = 0;
/** */
private static final int BACKUP = 1;
/** */
private static final int NOT_PRIMARY_AND_BACKUP = 2;
/** */
private int backups;
/** */
private int lastKey;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(PARTITIONED);
ccfg.setAtomicityMode(ATOMIC);
ccfg.setNearConfiguration(new NearCacheConfiguration());
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setRebalanceMode(SYNC);
ccfg.setBackups(backups);
cfg.setCacheConfiguration(ccfg);
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoBackups() throws Exception {
doStartGrids(0);
checkNearCache();
}
/**
* @throws Exception If failed.
*/
@Test
public void testWithBackups() throws Exception {
doStartGrids(2);
checkNearCache();
}
/**
* @throws Exception If failed.
*/
private void checkNearCache() throws Exception {
checkPut();
checkPutAll();
checkTransform();
checkTransformAll();
checkRemove();
checkReaderEvict();
checkReaderRemove();
checkPutRemoveGet();
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("ZeroLengthArrayAllocation")
private void checkPutAll() throws Exception {
log.info("Check putAll.");
Ignite ignite0 = grid(0);
IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME);
Affinity<Integer> aff = ignite0.affinity(DEFAULT_CACHE_NAME);
UUID id0 = ignite0.cluster().localNode().id();
Map<Integer, Integer> primaryKeys = new HashMap<>();
for (int i = 0; i < 10; i++)
primaryKeys.put(key(ignite0, PRIMARY), 1);
log.info("PutAll from primary.");
cache0.putAll(primaryKeys);
for (int i = 0; i < GRID_CNT; i++) {
for (Integer primaryKey : primaryKeys.keySet())
checkEntry(grid(i), primaryKey, 1, false);
}
if (backups > 0) {
Map<Integer, Integer> backupKeys = new HashMap<>();
for (int i = 0; i < 10; i++)
backupKeys.put(key(ignite0, BACKUP), 2);
log.info("PutAll from backup.");
cache0.putAll(backupKeys);
for (int i = 0; i < GRID_CNT; i++) {
for (Integer backupKey : backupKeys.keySet())
checkEntry(grid(i), backupKey, 2, false);
}
}
Map<Integer, Integer> nearKeys = new HashMap<>();
for (int i = 0; i < 30; i++)
nearKeys.put(key(ignite0, NOT_PRIMARY_AND_BACKUP), 3);
log.info("PutAll from near.");
cache0.putAll(nearKeys);
for (int i = 0; i < GRID_CNT; i++) {
for (Integer nearKey : nearKeys.keySet()) {
UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {};
checkEntry(grid(i), nearKey, 3, i == 0, expReaders);
}
}
Map<Integer, Collection<UUID>> readersMap = new HashMap<>();
for (Integer key : nearKeys.keySet())
readersMap.put(key, new HashSet<UUID>());
int val = 4;
for (int i = 0; i < GRID_CNT; i++) {
IgniteCache<Integer, Integer> cache = grid(i).cache(DEFAULT_CACHE_NAME);
for (Integer key : nearKeys.keySet())
nearKeys.put(key, val);
log.info("PutAll [igniteInstanceName=" + grid(i).name() + ", val=" + val + ']');
cache.putAll(nearKeys);
for (Integer key : nearKeys.keySet()) {
if (!aff.isPrimaryOrBackup(grid(i).localNode(), key))
readersMap.get(key).add(grid(i).localNode().id());
}
for (int j = 0; j < GRID_CNT; j++) {
for (Integer key : nearKeys.keySet()) {
boolean primaryNode = aff.isPrimary(grid(j).localNode(), key);
Collection<UUID> readers = readersMap.get(key);
UUID[] expReaders = primaryNode ? U.toArray(readers, new UUID[readers.size()]) : new UUID[] {};
checkEntry(grid(j), key, val, readers.contains(grid(j).localNode().id()), expReaders);
}
}
val++;
}
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("ZeroLengthArrayAllocation")
private void checkTransform() throws Exception {
log.info("Check transform.");
Ignite ignite0 = grid(0);
IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME);
Affinity<Object> aff = ignite0.affinity(DEFAULT_CACHE_NAME);
UUID id0 = ignite0.cluster().localNode().id();
Integer primaryKey = key(ignite0, PRIMARY);
log.info("Transform from primary.");
cache0.invoke(primaryKey, new Processor(primaryKey));
for (int i = 0; i < GRID_CNT; i++)
checkEntry(grid(i), primaryKey, primaryKey, false);
if (backups > 0) {
Integer backupKey = key(ignite0, BACKUP);
log.info("Transform from backup.");
cache0.invoke(backupKey, new Processor(backupKey));
for (int i = 0; i < GRID_CNT; i++)
checkEntry(grid(i), backupKey, backupKey, false);
}
Integer nearKey = key(ignite0, NOT_PRIMARY_AND_BACKUP);
log.info("Transform from near.");
cache0.invoke(nearKey, new Processor(nearKey));
for (int i = 0; i < GRID_CNT; i++) {
UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {};
checkEntry(grid(i), nearKey, nearKey, i == 0, expReaders);
}
Collection<UUID> readers = new HashSet<>();
readers.add(id0);
int val = nearKey + 1;
for (int i = 0; i < GRID_CNT; i++) {
IgniteCache<Integer, Integer> cache = grid(i).cache(DEFAULT_CACHE_NAME);
log.info("Transform [igniteInstanceName=" + grid(i).name() + ", val=" + val + ']');
cache.invoke(nearKey, new Processor(val));
if (!aff.isPrimaryOrBackup(grid(i).localNode(), nearKey))
readers.add(grid(i).localNode().id());
for (int j = 0; j < GRID_CNT; j++) {
boolean primaryNode = aff.isPrimary(grid(j).localNode(), nearKey);
UUID[] expReaders = primaryNode ? U.toArray(readers, new UUID[readers.size()]) : new UUID[] {};
checkEntry(grid(j), nearKey, val, readers.contains(grid(j).localNode().id()), expReaders);
}
val++;
}
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("ZeroLengthArrayAllocation")
private void checkTransformAll() throws Exception {
log.info("Check transformAll.");
Ignite ignite0 = grid(0);
IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME);
Affinity<Object> aff = ignite0.affinity(DEFAULT_CACHE_NAME);
UUID id0 = ignite0.cluster().localNode().id();
Set<Integer> primaryKeys = new HashSet<>();
for (int i = 0; i < 10; i++)
primaryKeys.add(key(ignite0, PRIMARY));
log.info("TransformAll from primary.");
cache0.invokeAll(primaryKeys, new Processor(1));
for (int i = 0; i < GRID_CNT; i++) {
for (Integer primaryKey : primaryKeys)
checkEntry(grid(i), primaryKey, 1, false);
}
if (backups > 0) {
Set<Integer> backupKeys = new HashSet<>();
for (int i = 0; i < 10; i++)
backupKeys.add(key(ignite0, BACKUP));
log.info("TransformAll from backup.");
cache0.invokeAll(backupKeys, new Processor(2));
for (int i = 0; i < GRID_CNT; i++) {
for (Integer backupKey : backupKeys)
checkEntry(grid(i), backupKey, 2, false);
}
}
Set<Integer> nearKeys = new HashSet<>();
for (int i = 0; i < 30; i++)
nearKeys.add(key(ignite0, NOT_PRIMARY_AND_BACKUP));
log.info("TransformAll from near.");
cache0.invokeAll(nearKeys, new Processor(3));
for (int i = 0; i < GRID_CNT; i++) {
for (Integer nearKey : nearKeys) {
UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {};
checkEntry(grid(i), nearKey, 3, i == 0, expReaders);
}
}
Map<Integer, Collection<UUID>> readersMap = new HashMap<>();
for (Integer key : nearKeys)
readersMap.put(key, new HashSet<UUID>());
int val = 4;
for (int i = 0; i < GRID_CNT; i++) {
IgniteCache<Integer, Integer> cache = grid(i).cache(DEFAULT_CACHE_NAME);
for (Integer key : nearKeys)
nearKeys.add(key);
log.info("TransformAll [igniteInstanceName=" + grid(i).name() + ", val=" + val + ']');
cache.invokeAll(nearKeys, new Processor(val));
for (Integer key : nearKeys) {
if (!aff.isPrimaryOrBackup(grid(i).localNode(), key))
readersMap.get(key).add(grid(i).localNode().id());
}
for (int j = 0; j < GRID_CNT; j++) {
for (Integer key : nearKeys) {
boolean primaryNode = aff.isPrimary(grid(j).localNode(), key);
Collection<UUID> readers = readersMap.get(key);
UUID[] expReaders = primaryNode ? U.toArray(readers, new UUID[readers.size()]) : new UUID[] {};
checkEntry(grid(j), key, val, readers.contains(grid(j).localNode().id()), expReaders);
}
}
val++;
}
}
/**
* @throws Exception If failed.
*/
private void checkPutRemoveGet() throws Exception {
Ignite ignite0 = grid(0);
IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME);
Integer key = key(ignite0, NOT_PRIMARY_AND_BACKUP);
cache0.put(key, key);
for (int i = 0; i < GRID_CNT; i++)
grid(i).cache(DEFAULT_CACHE_NAME).get(key);
cache0.remove(key);
cache0.put(key, key);
for (int i = 0; i < GRID_CNT; i++)
grid(i).cache(DEFAULT_CACHE_NAME).get(key);
cache0.remove(key);
}
/**
* @throws Exception If failed.
*/
private void checkPut() throws Exception {
checkPut(0);
checkPut(GRID_CNT - 1);
}
/**
* @param grid Grid Index.
* @throws Exception If failed.
*/
@SuppressWarnings("ZeroLengthArrayAllocation")
private void checkPut(int grid) throws Exception {
log.info("Check put, grid: " + grid);
Ignite ignite0 = grid(grid);
IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME);
Affinity<Integer> aff = ignite0.affinity(DEFAULT_CACHE_NAME);
UUID id0 = ignite0.cluster().localNode().id();
Integer primaryKey = key(ignite0, PRIMARY);
log.info("Put from primary.");
cache0.put(primaryKey, primaryKey);
for (int i = 0; i < GRID_CNT; i++)
checkEntry(grid(i), primaryKey, primaryKey, false);
if (backups > 0) {
Integer backupKey = key(ignite0, BACKUP);
log.info("Put from backup.");
cache0.put(backupKey, backupKey);
for (int i = 0; i < GRID_CNT; i++)
checkEntry(grid(i), backupKey, backupKey, false);
}
Integer nearKey = key(ignite0, NOT_PRIMARY_AND_BACKUP);
log.info("Put from near.");
cache0.put(nearKey, nearKey);
for (int i = 0; i < GRID_CNT; i++) {
UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {};
checkEntry(grid(i), nearKey, nearKey, i == grid, expReaders);
}
Collection<UUID> readers = new HashSet<>();
readers.add(id0);
int val = nearKey + 1;
for (int i = 0; i < GRID_CNT; i++) {
IgniteCache<Integer, Integer> cache = grid(i).cache(DEFAULT_CACHE_NAME);
log.info("Put [igniteInstanceName=" + grid(i).name() + ", val=" + val + ']');
cache.put(nearKey, val);
if (!aff.isPrimaryOrBackup(grid(i).localNode(), nearKey))
readers.add(grid(i).localNode().id());
for (int j = 0; j < GRID_CNT; j++) {
boolean primaryNode = aff.isPrimary(grid(j).localNode(), nearKey);
UUID[] expReaders = primaryNode ? U.toArray(readers, new UUID[readers.size()]) : new UUID[] {};
checkEntry(grid(j), nearKey, val, readers.contains(grid(j).localNode().id()), expReaders);
}
val++;
}
}
/**
* @throws Exception If failed.
*/
private void checkRemove() throws Exception {
log.info("Check remove.");
Ignite ignite0 = grid(0);
IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME);
Integer primaryKey = key(ignite0, PRIMARY);
log.info("Put from primary.");
cache0.put(primaryKey, primaryKey);
for (int i = 0; i < GRID_CNT; i++)
checkEntry(grid(i), primaryKey, primaryKey, false);
log.info("Remove from primary.");
cache0.remove(primaryKey);
for (int i = 0; i < GRID_CNT; i++)
checkEntry(grid(i), primaryKey, null, false);
if (backups > 0) {
Integer backupKey = key(ignite0, BACKUP);
log.info("Put from backup.");
cache0.put(backupKey, backupKey);
for (int i = 0; i < GRID_CNT; i++)
checkEntry(grid(i), backupKey, backupKey, false);
log.info("Remove from backup.");
cache0.remove(backupKey);
for (int i = 0; i < GRID_CNT; i++)
checkEntry(grid(i), backupKey, null, false);
}
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("ZeroLengthArrayAllocation")
private void checkReaderEvict() throws Exception {
log.info("Check evict.");
Ignite ignite0 = grid(0);
IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME);
Affinity<Integer> aff = ignite0.affinity(DEFAULT_CACHE_NAME);
UUID id0 = ignite0.cluster().localNode().id();
Integer nearKey = key(ignite0, NOT_PRIMARY_AND_BACKUP);
cache0.put(nearKey, 1); // Put should create near entry on grid0.
for (int i = 0; i < GRID_CNT; i++) {
UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {};
checkEntry(grid(i), nearKey, 1, i == 0, expReaders);
}
cache0.localEvict(Collections.singleton(nearKey));
for (int i = 0; i < GRID_CNT; i++) {
UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {};
checkEntry(grid(i), nearKey, 1, false, expReaders);
}
IgniteCache<Integer, Integer> primaryCache = G.ignite(
(String)aff.mapKeyToNode(nearKey).attribute(ATTR_IGNITE_INSTANCE_NAME)).cache(DEFAULT_CACHE_NAME);
primaryCache.put(nearKey, 2); // This put should see that near entry evicted on grid0 and remove reader.
for (int i = 0; i < GRID_CNT; i++)
checkEntry(grid(i), nearKey, 2, false);
assertEquals((Integer)2, cache0.get(nearKey)); // Get should again create near entry on grid0.
for (int i = 0; i < GRID_CNT; i++) {
UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {};
checkEntry(grid(i), nearKey, 2, i == 0, expReaders);
}
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("ZeroLengthArrayAllocation")
private void checkReaderRemove() throws Exception {
Ignite ignite0 = grid(0);
IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME);
Affinity<Integer> aff = ignite0.affinity(DEFAULT_CACHE_NAME);
UUID id0 = ignite0.cluster().localNode().id();
Integer nearKey = key(ignite0, NOT_PRIMARY_AND_BACKUP);
cache0.put(nearKey, 1); // Put should create near entry on grid0.
for (int i = 0; i < GRID_CNT; i++) {
UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[] {id0} : new UUID[] {};
checkEntry(grid(i), nearKey, 1, i == 0, expReaders);
}
cache0.remove(nearKey); // Remove from grid0, this should remove readers on primary node.
for (int i = 0; i < GRID_CNT; i++)
checkEntry(grid(i), nearKey, null, i == 0);
Ignite primaryNode = G.ignite((String)aff.mapKeyToNode(nearKey).attribute(ATTR_IGNITE_INSTANCE_NAME));
IgniteCache<Integer, Integer> primaryCache = primaryNode.cache(DEFAULT_CACHE_NAME);
primaryCache.put(nearKey, 2); // Put from primary, check there are no readers.
checkEntry(primaryNode, nearKey, 2, false);
}
/**
* @param ignite Node.
* @param key Key.
* @param val Expected value.
* @param expectNear If {@code true} then near cache entry is expected.
* @param expReaders Expected readers.
* @throws Exception If failed.
*/
@SuppressWarnings("ConstantConditions")
private void checkEntry(Ignite ignite,
Integer key,
@Nullable Integer val,
boolean expectNear,
final UUID... expReaders) throws Exception
{
GridCacheAdapter<Integer, Integer> near = ((IgniteKernal)ignite).internalCache(DEFAULT_CACHE_NAME);
assertTrue(near.isNear());
GridCacheEntryEx nearEntry = near.peekEx(key);
boolean expectDht = near.affinity().isPrimaryOrBackup(ignite.cluster().localNode(), key);
if (expectNear) {
assertNotNull("No near entry for: " + key + ", grid: " + ignite.name(), nearEntry);
assertEquals("Unexpected value for grid: " + ignite.name(),
val,
CU.value(nearEntry.info().value(), near.context(), false));
assertEquals("Unexpected value for grid: " + ignite.name(),
val,
ignite.cache(near.name()).localPeek(key, CachePeekMode.ONHEAP));
}
else
assertNull("Unexpected near entry: " + nearEntry + ", grid: " + ignite.name(), nearEntry);
GridDhtCacheAdapter<Integer, Integer> dht = ((GridNearCacheAdapter<Integer, Integer>)near).dht();
if (expectDht) {
final GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)dht.entryEx(key);
assertNotNull("No dht entry for: " + key + ", grid: " + ignite.name(), dhtEntry);
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
return dhtEntry.readers().size() == expReaders.length;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}, 5000);
Collection<UUID> readers = dhtEntry.readers();
assertEquals(expReaders.length, readers.size());
for (UUID reader : expReaders)
assertTrue(readers.contains(reader));
if (dhtEntry.peekVisibleValue() == null)
dhtEntry.unswap();
assertEquals("Unexpected value for grid: " + ignite.name(),
val,
CU.value(dhtEntry.info().value(), dht.context(), false));
assertEquals("Unexpected value for grid: " + ignite.name(),
val,
ignite.cache(near.name()).localPeek(key, CachePeekMode.ONHEAP));
}
else {
GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)dht.peekEx(key);
assertNull("Unexpected dht entry: " + dhtEntry + ", grid: " + ignite.name(), dhtEntry);
}
if (!expectNear && !expectDht) {
assertNull("Unexpected peek value for grid: " + ignite.name(), ignite.cache(near.name()).localPeek(key,
CachePeekMode.ONHEAP));
}
}
/**
* @param ignite Grid.
* @param mode One of {@link #PRIMARY}, {@link #BACKUP} or {@link #NOT_PRIMARY_AND_BACKUP}.
* @return Key with properties specified by the given mode.
*/
private Integer key(Ignite ignite, int mode) {
Affinity<Integer> aff = ignite.affinity(DEFAULT_CACHE_NAME);
Integer key = null;
for (int i = lastKey + 1; i < 1_000_000; i++) {
boolean pass = false;
switch (mode) {
case PRIMARY:
pass = aff.isPrimary(ignite.cluster().localNode(), i);
break;
case BACKUP:
pass = aff.isBackup(ignite.cluster().localNode(), i);
break;
case NOT_PRIMARY_AND_BACKUP:
pass = !aff.isPrimaryOrBackup(ignite.cluster().localNode(), i);
break;
default:
fail();
}
lastKey = i;
if (pass) {
key = i;
break;
}
}
assertNotNull(key);
return key;
}
/**
* @param backups Backups number.
* @throws Exception If failed.
*/
private void doStartGrids(int backups) throws Exception {
this.backups = backups;
startGrids(GRID_CNT);
awaitPartitionMapExchange();
log.info("Grids: ");
for (int i = 0; i < GRID_CNT; i++)
log.info(grid(i).name() + ": " + grid(i).localNode().id());
}
/**
*
*/
private static class Processor implements EntryProcessor<Integer, Integer, Void>, Serializable {
/** */
private final Integer newVal;
/**
* @param newVal New value.
*/
private Processor(Integer newVal) {
this.newVal = newVal;
}
/** {@inheritDoc} */
@Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
e.setValue(newVal);
return null;
}
}
}