blob: e2b5ade6b404807d5ff07bc8476242bd59f68b88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.junit.Before;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
/**
* Tests {@link CacheInterceptor}.
*/
public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbstractSelfTest {
/** */
private static Interceptor interceptor;
/** {@inheritDoc} */
@Override protected int gridCount() {
return 3;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
interceptor = new Interceptor();
for (Ignite ign : G.allGrids()) {
for (String cacheName: ign.cacheNames())
ign.cache(cacheName).getConfiguration(CacheConfiguration.class).setInterceptor(interceptor);
}
awaitPartitionMapExchange();
}
/** */
@Before
public void beforeGridCacheInterceptorAbstractSelfTest() {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.INTERCEPTOR);
if (nearEnabled())
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.NEAR_CACHE);
if (storeEnabled())
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
// No-op.
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
interceptor.reset();
interceptor.disabled = true;
super.afterTest();
interceptor.disabled = false;
assertEquals(0, interceptor.invokeCnt.get());
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
c.getTransactionConfiguration().setTxSerializableEnabled(true);
return c;
}
/** {@inheritDoc} */
@Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.INTERCEPTOR);
if (nearEnabled())
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.NEAR_CACHE);
if (storeEnabled())
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
CacheConfiguration ccfg = super.cacheConfiguration(igniteInstanceName);
if (!storeEnabled()) {
ccfg.setCacheStoreFactory(null);
ccfg.setReadThrough(false);
ccfg.setWriteThrough(false);
}
return ccfg;
}
/** {@inheritDoc} */
@Override protected CacheMode cacheMode() {
return PARTITIONED;
}
/**
* @return {@code True} if cache store is enabled.
*/
protected boolean storeEnabled() {
return false;
}
/**
* @throws Exception If failed.
*/
@Test
public void testGet() throws Exception {
testGet(primaryKey(0), false);
afterTest();
testGet(backupKey(0), false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetEntry() throws Exception {
testGet(primaryKey(0), true);
afterTest();
testGet(backupKey(0), true);
}
/**
* @param key Key.
* @param needVer Need version.
* @throws Exception If failed.
*/
private void testGet(String key, boolean needVer) throws Exception {
// Try when value is not in cache.
interceptor.retInterceptor = new NullGetInterceptor();
log.info("Get 1.");
IgniteCache<String, Integer> cache = jcache(0);
assertEquals(null, needVer ? cache.getEntry(key) : cache.get(key));
assertEquals(1, interceptor.invokeCnt.get());
assertEquals(0, interceptor.getMap.size());
interceptor.reset();
interceptor.retInterceptor = new OneGetInterceptor();
log.info("Get 2.");
assertEquals((Integer)1, needVer ? cache.getEntry(key).getValue() : cache.get(key));
assertEquals(1, interceptor.invokeCnt.get());
assertEquals(0, interceptor.getMap.size());
interceptor.reset();
// Disable interceptor and update cache.
interceptor.disabled = true;
cache.put(key, 100);
interceptor.disabled = false;
// Try when value is in cache.
interceptor.retInterceptor = new NullGetInterceptor();
log.info("Get 3.");
assertEquals(null, needVer ? cache.getEntry(key) : cache.get(key));
assertEquals(1, interceptor.invokeCnt.get());
assertEquals(1, interceptor.getMap.size());
assertEquals(100, interceptor.getMap.get(key));
checkCacheValue(key, 100);
interceptor.reset();
interceptor.retInterceptor = new GetIncrementInterceptor();
log.info("Get 4.");
assertEquals((Integer)101, needVer ? cache.getEntry(key).getValue() : cache.get(key));
assertEquals(1, interceptor.invokeCnt.get());
assertEquals(1, interceptor.getMap.size());
assertEquals(100, interceptor.getMap.get(key));
checkCacheValue(key, 100);
interceptor.reset();
interceptor.retInterceptor = new GetIncrementInterceptor();
log.info("GetAsync 1.");
if (needVer)
assertEquals((Integer)101, cache.getEntryAsync(key).get().getValue());
else
assertEquals((Integer)101, cache.getAsync(key).get());
assertEquals(1, interceptor.invokeCnt.get());
assertEquals(1, interceptor.getMap.size());
assertEquals(100, interceptor.getMap.get(key));
checkCacheValue(key, 100);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetAll() throws Exception {
testGetAll(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetEntries() throws Exception {
testGetAll(true);
}
/**
* @throws Exception If failed.
*/
private void testGetAll(boolean needVer) throws Exception {
Set<String> keys = new LinkedHashSet<>();
for (int i = 0; i < 1000; i++)
keys.add(String.valueOf(i));
interceptor.retInterceptor = new NullGetInterceptor();
IgniteCache<String, Integer> cache = jcache(0);
Collection<CacheEntry<String, Integer>> c;
Map<String, Integer> map;
if (needVer) {
c = cache.getEntries(keys);
assertTrue(c.isEmpty());
}
else {
map = cache.getAll(keys);
for (String key : keys)
assertEquals(null, map.get(key));
}
assertEquals(1000, interceptor.invokeCnt.get());
interceptor.reset();
interceptor.retInterceptor = new GetAllInterceptor1();
if (needVer) {
c = cache.getEntries(keys);
assertEquals(500, c.size());
for (CacheEntry<String, Integer> e : c) {
int k = Integer.valueOf(e.getKey());
assertEquals((Integer)(k * 2), e.getValue());
}
}
else {
map = cache.getAll(keys);
for (String key : keys) {
int k = Integer.valueOf(key);
if (k % 2 == 0)
assertEquals(null, map.get(key));
else
assertEquals((Integer)(k * 2), map.get(key));
}
}
assertEquals(1000, interceptor.invokeCnt.get());
// Put some values in cache.
interceptor.disabled = true;
for (int i = 0; i < 500; i++)
cache.put(String.valueOf(i), i);
interceptor.disabled = false;
for (int j = 0; j < 2; j++) {
interceptor.reset();
interceptor.retInterceptor = new GetAllInterceptor2();
if (needVer) {
if (j == 0)
c = cache.getEntries(keys);
else
c = cache.getEntriesAsync(keys).get();
for (CacheEntry<String, Integer> e : c) {
int k = Integer.valueOf(e.getKey());
switch (k % 3) {
case 1:
Integer exp = k < 500 ? k : null;
assertEquals(exp, e.getValue());
break;
case 2:
assertEquals((Integer)(k * 3), e.getValue());
break;
default:
fail();
}
}
}
else {
if (j == 0)
map = cache.getAll(keys);
else
map = cache.getAllAsync(keys).get();
int i = 0;
for (String key : keys) {
switch (i % 3) {
case 0:
assertEquals(null, map.get(key));
break;
case 1:
Integer exp = i < 500 ? i : null;
assertEquals(exp, map.get(key));
break;
case 2:
assertEquals((Integer)(i * 3), map.get(key));
break;
default:
fail();
}
i++;
}
}
assertEquals(1000, interceptor.invokeCnt.get());
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testCancelUpdate() throws Exception {
for (Operation op : Operation.values()) {
testCancelUpdate(primaryKey(0), op);
afterTest();
testCancelUpdate(backupKey(0), op);
afterTest();
}
}
/**
* @param op Operation type.
* @return {@code True} if this is atomic cache and update is first run on primary node.
*/
private int expectedIgnoreInvokeCount(Operation op) {
int dataNodes = cacheMode() == REPLICATED ? gridCount() : 2;
if (atomicityMode() == TRANSACTIONAL)
return dataNodes + (storeEnabled() ? 1 : 0); // One call before store is updated.
else {
// If update goes through primary node and it is cancelled then backups aren't updated.
return op == Operation.TRANSFORM ? 1 : dataNodes;
}
}
/**
* @param op Operation type.
* @return {@code True} if this is atomic cache and update is first run on primary node.
*/
private int expectedInvokeCount(Operation op) {
int dataNodes = cacheMode() == REPLICATED ? gridCount() : 2;
if (atomicityMode() == TRANSACTIONAL)
// Update + after update + one call before store is updated.
return dataNodes * 2 + (storeEnabled() ? 1 : 0);
else
return op == Operation.TRANSFORM ? 2 : dataNodes * 2;
}
/**
* @param key Key.
* @param op Operation type.
* @throws Exception If failed.
*/
private void testCancelUpdate(String key, Operation op) throws Exception {
// Interceptor returns null to disabled update.
CacheInterceptor retInterceptor = new NullPutInterceptor();
interceptor.retInterceptor = retInterceptor;
// Execute update when value is null, it should not change cache value.
log.info("Update 1 " + op);
update(0, op, key, 1, null);
checkCacheValue(key, null);
// Check values passed to interceptor.
assertEquals(1, interceptor.beforePutMap.size());
IgniteBiTuple t = interceptor.beforePutMap.get(key);
assertEquals(null, t.get1());
assertEquals(1, t.get2());
// Disable interceptor and update cache.
interceptor.reset();
interceptor.disabled = true;
clearCaches();
jcache(0).put(key, 1);
checkCacheValue(key, 1);
// Execute update when value is not null, it should not change cache value.
interceptor.disabled = false;
interceptor.retInterceptor = retInterceptor;
log.info("Update 2 " + op);
update(0, op, key, 2, 1);
checkCacheValue(key, 1);
// Check values passed to interceptor.
assertEquals(1, interceptor.beforePutMap.size());
t = interceptor.beforePutMap.get(key);
assertEquals(1, t.get1());
assertEquals(2, t.get2());
}
/**
* @throws Exception If failed.
*/
@Test
public void testModifyUpdate() throws Exception {
for (Operation op : Operation.values()) {
testModifyUpdate(primaryKey(0), op);
afterTest();
testModifyUpdate(backupKey(0), op);
afterTest();
}
}
/**
* @param key Key.
* @param op Operation type.
* @throws Exception If failed.
*/
private void testModifyUpdate(String key, Operation op) throws Exception {
// Interceptor returns incremented new value.
CacheInterceptor retInterceptor = new PutIncrementInterceptor();
// Execute update when value is null.
interceptor.retInterceptor = retInterceptor;
log.info("Update 1 " + op);
update(0, op, key, 1, null);
checkCacheValue(key, 2);
// Check values passed to interceptor.
assertEquals(1, interceptor.beforePutMap.size());
IgniteBiTuple t = interceptor.beforePutMap.get(key);
assertEquals(null, t.get1());
assertEquals(1, t.get2());
assertEquals(1, interceptor.afterPutMap.size());
assertEquals(2, interceptor.afterPutMap.get(key));
// Execute update when value is not null.
interceptor.reset();
interceptor.retInterceptor = retInterceptor;
log.info("Update 2 " + op);
update(0, op, key, 3, 2);
checkCacheValue(key, 4);
// Check values passed to interceptor.
assertEquals(1, interceptor.beforePutMap.size());
t = interceptor.beforePutMap.get(key);
assertEquals(2, t.get1());
assertEquals(3, t.get2());
assertEquals(1, interceptor.afterPutMap.size());
assertEquals(4, interceptor.afterPutMap.get(key));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCancelRemove() throws Exception {
for (Operation op : Operation.values()) {
testCancelRemove(primaryKey(0), op);
afterTest();
testCancelRemove(backupKey(0), op);
afterTest();
}
}
/**
* @param key Key.
* @param op Operation type.
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
private void testCancelRemove(String key, Operation op) throws Exception {
// Interceptor disables remove and returns null.
interceptor.retInterceptor = new BeforeRemoveInterceptor(new IgniteBiTuple(true, null));
// Execute remove when value is null.
log.info("Remove 1 " + op);
remove(0, op, key, null, null);
checkCacheValue(key, null);
// Check values passed to interceptor.
assertEquals(0, interceptor.beforeRmvMap.size());
assertEquals(null, interceptor.beforeRmvMap.get(key));
log.info("Remove 2 " + op);
interceptor.reset();
// Interceptor disables remove and changes return value.
interceptor.retInterceptor = new BeforeRemoveInterceptor(new IgniteBiTuple(true, 900));
// Execute remove when value is null, interceptor changes return value.
remove(0, op, key, null, 900);
checkCacheValue(key, null);
// Check values passed to interceptor.
assertEquals(0, interceptor.beforeRmvMap.size());
assertEquals(null, interceptor.beforeRmvMap.get(key));
// Disable interceptor and update cache.
interceptor.reset();
interceptor.disabled = true;
clearCaches();
jcache(0).put(key, 1);
checkCacheValue(key, 1);
// Execute remove when value is not null, it should not change cache value.
interceptor.reset();
interceptor.disabled = false;
// Interceptor disables remove and returns null.
interceptor.retInterceptor = new BeforeRemoveInterceptor(new IgniteBiTuple(true, null));
log.info("Remove 3 " + op);
remove(0, op, key, 1, null);
checkCacheValue(key, 1);
// Check values passed to interceptor.
assertEquals(1, interceptor.beforeRmvMap.size());
assertEquals(1, interceptor.beforeRmvMap.get(key));
interceptor.reset();
// Interceptor disables remove and changes return value.
interceptor.retInterceptor = new BeforeRemoveInterceptor(new IgniteBiTuple(true, 1000));
log.info("Remove 4 " + op);
remove(0, op, key, 1, 1000);
checkCacheValue(key, 1);
// Check values passed to interceptor.
assertEquals(1, interceptor.beforeRmvMap.size());
assertEquals(1, interceptor.beforeRmvMap.get(key));
}
/**
* @throws Exception If failed.
*/
@Test
public void testRemove() throws Exception {
for (Operation op : Operation.values()) {
testRemove(primaryKey(0), op);
afterTest();
testRemove(backupKey(0), op);
afterTest();
}
}
/**
* @param key Key.
* @param op Operation type.
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
private void testRemove(String key, Operation op) throws Exception {
// Interceptor changes return value to null.
interceptor.retInterceptor = new BeforeRemoveInterceptor( new IgniteBiTuple(false, null));
// Execute remove when value is null.
log.info("Remove 1 " + op);
remove(0, op, key, null, null);
checkCacheValue(key, null);
// Check values passed to interceptor.
assertEquals(0, interceptor.beforeRmvMap.size());
assertEquals(0, interceptor.afterRmvMap.size());
log.info("Remove 2 " + op);
interceptor.reset();
// Interceptor changes return value.
interceptor.retInterceptor = new BeforeRemoveInterceptor(new IgniteBiTuple(false, 900));
// Execute remove when value is null.
remove(0, op, key, null, 900);
checkCacheValue(key, null);
// Check values passed to interceptor.
assertEquals(0, interceptor.beforeRmvMap.size());
assertEquals(0, interceptor.afterRmvMap.size());
// Disable interceptor and update cache.
interceptor.reset();
interceptor.disabled = true;
clearCaches();
jcache(0).put(key, 1);
checkCacheValue(key, 1);
// Execute remove when value is not null.
interceptor.reset();
interceptor.disabled = false;
// Interceptor changes return value to null.
interceptor.retInterceptor = new BeforeRemoveInterceptor(new IgniteBiTuple(false, null));
log.info("Remove 3 " + op);
remove(0, op, key, 1, null);
checkCacheValue(key, null);
// Check values passed to interceptor.
assertEquals(1, interceptor.beforeRmvMap.size());
assertEquals(1, interceptor.beforeRmvMap.get(key));
assertEquals(1, interceptor.afterRmvMap.size());
assertEquals(1, interceptor.afterRmvMap.get(key));
// Disable interceptor and update cache.
interceptor.disabled = true;
clearCaches();
jcache(0).put(key, 2);
checkCacheValue(key, 2);
// Execute remove when value is not null.
interceptor.reset();
interceptor.disabled = false;
// Interceptor changes return value.
interceptor.retInterceptor = new BeforeRemoveInterceptor(new IgniteBiTuple(false, 1000));
log.info("Remove 4 " + op);
remove(0, op, key, 2, 1000);
checkCacheValue(key, null);
// Check values passed to interceptor.
assertEquals(1, interceptor.beforeRmvMap.size());
assertEquals(2, interceptor.beforeRmvMap.get(key));
assertEquals(1, interceptor.afterRmvMap.size());
assertEquals(2, interceptor.afterRmvMap.get(key));
}
/**
* @throws Exception If failed.
*/
@Test
public void testNearNodeKey() throws Exception {
if (cacheMode() != PARTITIONED)
return;
if (atomicityMode() == TRANSACTIONAL) {
for (TransactionConcurrency txConcurrency : TransactionConcurrency.values()) {
for (TransactionIsolation txIsolation : TransactionIsolation.values()) {
for (Operation op : Operation.values()) {
testNearNodeKey(txConcurrency, txIsolation, op);
afterTest();
}
}
}
}
testNearNodeKey(null, null, null);
}
/**
* @param txConcurrency Transaction concurrency.
* @param txIsolation Transaction isolation.
* @param op Operation type.
* @throws Exception If failed.
*/
private void testNearNodeKey(@Nullable TransactionConcurrency txConcurrency,
@Nullable TransactionIsolation txIsolation, @Nullable Operation op) throws Exception {
// Interceptor returns incremented new value.
interceptor.retInterceptor = new PutIncrementInterceptor();
String key1 = primaryKey(0);
String key2 = backupKey(0);
String key3 = nearKey(0);
interceptor.disabled = true;
// Put from grid 1 to be sure grid 0 does not have value for near key.
jcache(1).putAll(F.asMap(key1, 1, key2, 2, key3, 3));
interceptor.disabled = false;
log.info("Update [op=" + op + ", key1=" + key1 + ", key2=" + key2 + ", key3=" + key3 +
", txConcurrency=" + txConcurrency + ", txIsolation=" + txIsolation + ']');
if (txConcurrency != null) {
assertNotNull(txIsolation);
assertNotNull(op);
try (Transaction tx = ignite(0).transactions().txStart(txConcurrency, txIsolation)) {
update(0, op, key1, 100, 1);
update(0, op, key2, 200, 2);
update(0, op, key3, 300, 3);
tx.commit();
}
}
else
jcache(0).putAll(F.asMap(key1, 100, key2, 200, key3, 300));
checkCacheValue(key1, 101);
checkCacheValue(key2, 201);
checkCacheValue(key3, 301);
}
/**
* @throws Exception If failed.
*/
@Test
public void testBatchUpdate() throws Exception {
testBatchUpdate(Operation.UPDATE);
afterTest();
testBatchUpdate(Operation.TRANSFORM);
}
/**
* @param op Operation type.
* @throws Exception If failed.
*/
private void testBatchUpdate(Operation op) throws Exception {
// Interceptor returns incremented new value.
interceptor.retInterceptor = new PutIncrementInterceptor();
Map<String, Integer> map = new TreeMap<>();
final String key1;
String key2;
String key3;
List<String> keys = primaryKeys(0, 2);
key1 = keys.get(0); // Need two keys for the same node to test atomic cache batch store upadte.
key2 = keys.get(1);
key3 = backupKey(0);
map.put(key1, 1);
map.put(key2, 2);
map.put(key3, 3);
log.info("Batch update 1: " + op);
batchUpdate(0, op, map);
checkCacheValue(key1, 2);
checkCacheValue(key2, 3);
checkCacheValue(key3, 4);
assertEquals(3, interceptor.beforePutMap.size());
assertBeforePutValue(key1, null, 1);
assertBeforePutValue(key2, null, 2);
assertBeforePutValue(key3, null, 3);
assertEquals(3, interceptor.afterPutMap.size());
assertEquals(2, interceptor.afterPutMap.get(key1));
assertEquals(3, interceptor.afterPutMap.get(key2));
assertEquals(4, interceptor.afterPutMap.get(key3));
interceptor.reset();
// Interceptor returns incremented new value, cancels update for one key.
interceptor.retInterceptor = new BatchPutInterceptor1(key1);
map.put(key1, 100);
map.put(key2, 200);
map.put(key3, 300);
log.info("Batch update 2: " + op);
batchUpdate(0, op, map);
checkCacheValue(key1, 2);
checkCacheValue(key2, 201);
checkCacheValue(key3, 301);
assertEquals(3, interceptor.beforePutMap.size());
assertBeforePutValue(key1, 2, 100);
assertBeforePutValue(key2, 3, 200);
assertBeforePutValue(key3, 4, 300);
assertEquals(2, interceptor.afterPutMap.size());
assertEquals(201, interceptor.afterPutMap.get(key2));
assertEquals(301, interceptor.afterPutMap.get(key3));
}
/**
* @throws Exception If failed.
*/
@Test
public void testBatchRemove() throws Exception {
testBatchRemove(Operation.UPDATE);
afterTest();
testBatchRemove(Operation.TRANSFORM);
}
/**
* @param op Operation type.
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
private void testBatchRemove(Operation op) throws Exception {
Map<String, Integer> map = new TreeMap<>();
final String key1;
String key2;
String key3;
List<String> keys = primaryKeys(0, 2);
key1 = keys.get(0);
key2 = keys.get(1);
key3 = backupKey(0);
map.put(key1, 1);
map.put(key2, 2);
map.put(key3, 3);
// Interceptor does not cancel update.
interceptor.retInterceptor = new BeforeRemoveInterceptor(new IgniteBiTuple(false, 999));
log.info("Batch remove 1: " + op);
batchRemove(0, op, map);
checkCacheValue(key1, null);
checkCacheValue(key2, null);
checkCacheValue(key3, null);
assertEquals(0, interceptor.beforeRmvMap.size());
assertEquals(0, interceptor.afterRmvMap.size());
// Disable interceptor and put some values in cache.
interceptor.disabled = true;
jcache(0).putAll(map);
interceptor.disabled = false;
interceptor.reset();
// Interceptor does not cancel update.
interceptor.retInterceptor = new BeforeRemoveInterceptor(new IgniteBiTuple(false, 999));
log.info("Batch remove 2: " + op);
batchRemove(0, op, map);
checkCacheValue(key1, null);
checkCacheValue(key2, null);
checkCacheValue(key3, null);
assertEquals(3, interceptor.beforeRmvMap.size());
assertEquals(1, interceptor.beforeRmvMap.get(key1));
assertEquals(2, interceptor.beforeRmvMap.get(key2));
assertEquals(3, interceptor.beforeRmvMap.get(key3));
assertEquals(3, interceptor.afterRmvMap.size());
assertEquals(1, interceptor.afterRmvMap.get(key1));
assertEquals(2, interceptor.afterRmvMap.get(key2));
assertEquals(3, interceptor.afterRmvMap.get(key3));
// Disable interceptor and put some values in cache.
interceptor.disabled = true;
jcache(0).putAll(map);
interceptor.disabled = false;
interceptor.reset();
// Interceptor cancels update for one key.
interceptor.retInterceptor = new BatchRemoveInterceptor(key1);
log.info("Batch remove 3: " + op);
batchRemove(0, op, map);
checkCacheValue(key1, 1);
checkCacheValue(key2, null);
checkCacheValue(key3, null);
assertEquals(3, interceptor.beforeRmvMap.size());
assertEquals(1, interceptor.beforeRmvMap.get(key1));
assertEquals(2, interceptor.beforeRmvMap.get(key2));
assertEquals(3, interceptor.beforeRmvMap.get(key3));
assertEquals(2, interceptor.afterRmvMap.size());
assertEquals(2, interceptor.afterRmvMap.get(key2));
assertEquals(3, interceptor.afterRmvMap.get(key3));
}
/**
* @param key Key.
* @param oldVal Expected old value.
* @param newVal Expected new value.
*/
private void assertBeforePutValue(String key, @Nullable Object oldVal, @Nullable Object newVal) {
IgniteBiTuple t = interceptor.beforePutMap.get(key);
assertNotNull(t);
assertEquals(t.get1(), oldVal);
assertEquals(t.get2(), newVal);
}
/**
* @param grid Grid index.
* @param op Operation type.
* @param key Key.
* @param val Value.
* @param expOld Expected expOld value.
* @throws Exception If failed.
*/
private void update(int grid, Operation op, String key, final Integer val, @Nullable final Integer expOld)
throws Exception {
cacheUpdate(grid, false, op, key, val, expOld, null);
}
/**
* @param grid Grid index.
* @param op Operation type.
* @param key Key.
* @param expOld Expected expOld value.
* @param expRmvRet Expected remove result.
* @throws Exception If failed.
*/
private void remove(int grid, Operation op, String key, @Nullable final Integer expOld,
@Nullable final Integer expRmvRet) throws Exception {
cacheUpdate(grid, true, op, key, null, expOld, expRmvRet);
}
/**
* @param grid Grid index.
* @param rmv If {@code true} then executes remove.
* @param op Operation type.
* @param key Key.
* @param val Value.
* @param expOld Expected expOld value.
* @param expRmvRet Expected remove result.
* @throws Exception If failed.
*/
private void cacheUpdate(int grid, boolean rmv, Operation op, String key, final Integer val,
@Nullable final Integer expOld, @Nullable final Integer expRmvRet)
throws Exception {
IgniteCache<String, Integer> cache = jcache(grid);
if (rmv) {
assertNull(val);
switch (op) {
case UPDATE: {
assertEquals(expRmvRet, cache.getAndRemove(key));
break;
}
case UPDATEX: {
cache.remove(key);
break;
}
case TRANSFORM: {
cache.invoke(key, new EntryProcessor<String, Integer, Void>() {
@Override public Void process(MutableEntry<String, Integer> e, Object... args) {
Integer old = e.getValue();
assertEquals(expOld, old);
e.remove();
return null;
}
});
break;
}
default:
fail();
}
}
else {
switch (op) {
case UPDATE: {
assertEquals(expOld, cache.getAndPut(key, val));
break;
}
case UPDATEX: {
cache.put(key, val);
break;
}
case TRANSFORM: {
cache.invoke(key, new EntryProcessor<String, Integer, Void>() {
@Override public Void process(MutableEntry<String, Integer> e, Object... args) {
Integer old = e.getValue();
assertEquals(expOld, old);
e.setValue(val);
return null;
}
});
break;
}
default:
fail();
}
}
}
/**
* @param grid Grid index.
* @param op Operation type.
* @param map Key/values map.
* @throws Exception If failed.
*/
private void batchUpdate(int grid, Operation op, final Map<String, Integer> map) throws Exception {
cacheBatchUpdate(grid, false, op, map);
}
/**
* @param grid Grid index.
* @param op Operation type.
* @param map Key/values map.
* @throws Exception If failed.
*/
private void batchRemove(int grid, Operation op, final Map<String, Integer> map) throws Exception {
cacheBatchUpdate(grid, true, op, map);
}
/**
* @param grid Grid index.
* @param rmv If {@code true} then executes remove.
* @param op Operation type.
* @param map Key/values map.
* @throws Exception If failed.
*/
private void cacheBatchUpdate(int grid, boolean rmv, Operation op, final Map<String, Integer> map)
throws Exception {
IgniteCache<String, Integer> cache = jcache(grid);
if (rmv) {
switch (op) {
case UPDATE: {
cache.removeAll(map.keySet());
break;
}
case TRANSFORM: {
cache.invokeAll(map.keySet(), new EntryProcessor<String, Integer, Void>() {
@Override public Void process(MutableEntry<String, Integer> e, Object... args) {
e.remove();
return null;
}
});
break;
}
default:
fail();
}
}
else {
switch (op) {
case UPDATE: {
cache.putAll(map);
break;
}
case TRANSFORM: {
cache.invokeAll(map.keySet(), new EntryProcessor<String, Integer, Void>() {
@Override public Void process(MutableEntry<String, Integer> e, Object... args) {
e.setValue(map.get(e.getKey()));
return null;
}
});
break;
}
default:
fail();
}
}
}
/**
* @param idx Grid index.
* @return Primary key for grid.
*/
private String primaryKey(int idx) {
return primaryKeys(idx, 1).get(0);
}
/**
* @param idx Grid index.
* @param cnt Number of keys.
* @return Primary keys for grid.
*/
private List<String> primaryKeys(int idx, int cnt) {
assert cnt > 0;
Affinity aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
List<String> keys = new ArrayList<>(cnt);
for (int i = 0; i < 10_000; i++) {
String key = String.valueOf(i);
if (aff.isPrimary(grid(idx).localNode(), key)) {
keys.add(key);
if (keys.size() == cnt)
break;
}
}
assertEquals(cnt, keys.size());
return keys;
}
/**
* @param idx Grid index.
* @return Primary key for grid.
*/
private String backupKey(int idx) {
Affinity aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
String key = null;
for (int i = 0; i < 10_000; i++) {
if (aff.isBackup(grid(idx).localNode(), String.valueOf(i))) {
key = String.valueOf(i);
break;
}
}
assertNotNull(key);
return key;
}
/**
* @param idx Grid index.
* @return Key which does not belong to the grid.
*/
private String nearKey(int idx) {
Affinity aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
String key = null;
for (int i = 0; i < 10_000; i++) {
if (!aff.isPrimaryOrBackup(grid(idx).localNode(), String.valueOf(i))) {
key = String.valueOf(i);
break;
}
}
assertNotNull(key);
return key;
}
/**
* @param key Key.
* @param expVal Expected value.
* @throws Exception If failed.
*/
private void checkCacheValue(Object key, @Nullable Object expVal) throws Exception {
interceptor.disabled = true;
if (storeEnabled())
assertEquals("Unexpected store value", expVal, storeStgy.getFromStore(key));
try {
for (int i = 0; i < gridCount(); i++)
assertEquals("Unexpected value for grid " + i, expVal, grid(i).cache(DEFAULT_CACHE_NAME).get(key));
}
finally {
interceptor.disabled = false;
}
}
/**
* @throws Exception If failed.
*/
private void clearCaches() throws Exception {
for (int i = 0; i < gridCount(); i++)
jcache(i).removeAll();
}
/**
*
*/
private enum Operation {
/**
*
*/
UPDATE,
/**
*
*/
UPDATEX,
/**
*
*/
TRANSFORM,
}
/**
*
*/
private static class InterceptorAdapter implements CacheInterceptor {
/** */
@Nullable @Override public Object onGet(Object key, Object val) {
fail("onGet not expected");
return null;
}
/** */
@Nullable @Override public Object onBeforePut(Cache.Entry entry, Object newVal) {
fail("onBeforePut not expected");
return null;
}
/** */
@Override public void onAfterPut(Cache.Entry entry) {
fail("onAfterPut not expected");
}
/** */
@Nullable @Override public IgniteBiTuple onBeforeRemove(Cache.Entry entry) {
fail("onBeforeRemove not expected");
return null;
}
/** */
@Override public void onAfterRemove(Cache.Entry entry) {
fail("onAfterRemove not expected");
}
}
/**
*
*/
private static class BeforeRemoveInterceptor extends InterceptorAdapter {
/**
*
*/
private IgniteBiTuple ret;
/**
* @param ret Return value.
*/
private BeforeRemoveInterceptor(IgniteBiTuple ret) {
this.ret = ret;
}
/** {@inheritDoc} */
@Nullable @Override public IgniteBiTuple onBeforeRemove(Cache.Entry entry) {
return ret;
}
}
/**
*
*/
private static class Interceptor implements CacheInterceptor {
/** */
private final Map<Object, Object> getMap = new ConcurrentHashMap<>();
/** */
private final Map<Object, Object> afterPutMap = new ConcurrentHashMap<>();
/** */
private final Map<Object, IgniteBiTuple> beforePutMap = new ConcurrentHashMap<>();
/** */
private final Map<Object, Object> beforeRmvMap = new ConcurrentHashMap<>();
/** */
private final Map<Object, Object> afterRmvMap = new ConcurrentHashMap<>();
/** */
private final AtomicInteger invokeCnt = new AtomicInteger();
/** */
private volatile boolean disabled;
/** */
private volatile CacheInterceptor retInterceptor;
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Nullable @Override public Object onGet(Object key, Object val) {
if (disabled)
return val;
assertNotNull(retInterceptor);
Object ret = retInterceptor.onGet(key, val);
System.out.println("Get [key=" + key + ", val=" + val + ", ret=" + ret + ']');
if (val != null) {
Object old = getMap.put(key, val);
assertNull(old); // Fot get interceptor is called on near node only.
}
invokeCnt.incrementAndGet();
return ret;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Nullable @Override public Object onBeforePut(Cache.Entry entry, Object newVal) {
if (disabled)
return newVal;
assertNotNull(retInterceptor);
Object ret = retInterceptor.onBeforePut(entry, newVal);
System.out.println("Before put [key=" + entry.getKey() + ", oldVal=" + entry.getValue() + ", newVal=" + newVal
+ ", ret=" + ret + ']');
invokeCnt.incrementAndGet();
IgniteBiTuple t = beforePutMap.put(entry.getKey(), new IgniteBiTuple(entry.getValue(), newVal));
if (t != null) {
assertEquals("Interceptor called with different old values for key " + entry.getKey(), t.get1(),
entry.getValue());
assertEquals("Interceptor called with different new values for key " + entry.getKey(), t.get2(),
newVal);
}
return ret;
}
/** {@inheritDoc} */
@Override public void onAfterPut(Cache.Entry entry) {
if (disabled)
return;
System.out.println("After put [key=" + entry.getKey() + ", val=" + entry.getValue() + ']');
invokeCnt.incrementAndGet();
Object old = afterPutMap.put(entry.getKey(), entry.getValue());
if (old != null)
assertEquals(old, entry.getValue());
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override @Nullable public IgniteBiTuple onBeforeRemove(Cache.Entry entry) {
if (disabled)
return new IgniteBiTuple(false, entry.getValue());
assertNotNull(retInterceptor);
IgniteBiTuple ret = retInterceptor.onBeforeRemove(entry);
System.out.println("Before remove [key=" + entry.getKey() + ", val=" + entry.getValue() + ", ret=" + ret + ']');
invokeCnt.incrementAndGet();
if (entry.getValue() != null) {
Object old = beforeRmvMap.put(entry.getKey(), entry.getValue());
if (old != null)
assertEquals(old, entry.getValue());
}
return ret;
}
/** {@inheritDoc} */
@Override public void onAfterRemove(Cache.Entry entry) {
if (disabled)
return;
System.out.println("After remove [key=" + entry.getKey() + ", val=" + entry.getValue() + ']');
invokeCnt.incrementAndGet();
if (entry.getValue() != null) {
Object old = afterRmvMap.put(entry.getKey(), entry.getValue());
if (old != null)
assertEquals(old, entry.getValue());
}
}
/**
*
*/
public void reset() {
invokeCnt.set(0);
getMap.clear();
beforePutMap.clear();
afterPutMap.clear();
afterRmvMap.clear();
beforeRmvMap.clear();
retInterceptor = null;
}
}
/**
*
*/
private static class BatchRemoveInterceptor extends InterceptorAdapter {
/** */
private final String key1;
/**
* @param key1 Key.
*/
public BatchRemoveInterceptor(String key1) {
this.key1 = key1;
}
/** {@inheritDoc} */
@Nullable @Override public IgniteBiTuple onBeforeRemove(Cache.Entry entry) {
return new IgniteBiTuple(entry.getKey().equals(key1), 999);
}
}
/**
*
*/
private static class PutIncrementInterceptor extends InterceptorAdapter {
/** {@inheritDoc} */
@Nullable @Override public Object onBeforePut(Cache.Entry entry, Object newVal) {
return (Integer)newVal + 1;
}
}
/**
*
*/
private static class NullPutInterceptor extends InterceptorAdapter {
/** {@inheritDoc} */
@Nullable @Override public Object onBeforePut(Cache.Entry entry, Object newVal) {
return null;
}
}
/**
*
*/
private static class NullGetInterceptor extends InterceptorAdapter {
/** {@inheritDoc} */
@Nullable @Override public Object onGet(Object key, Object val) {
return null;
}
}
/**
*
*/
private static class GetAllInterceptor1 extends InterceptorAdapter {
/** {@inheritDoc} */
@Nullable @Override public Object onGet(Object key, Object val) {
int k = Integer.valueOf((String)key);
return k % 2 == 0 ? null : (k * 2);
}
}
/**
*
*/
private static class GetAllInterceptor2 extends InterceptorAdapter {
/** {@inheritDoc} */
@Nullable @Override public Object onGet(Object key, Object val) {
int k = Integer.valueOf((String)key);
switch (k % 3) {
case 0:
return null;
case 1:
return val;
case 2:
return k * 3;
default:
fail();
}
return null;
}
}
/**
*
*/
private static class BatchPutInterceptor1 extends InterceptorAdapter {
/** */
private final String key1;
/**
* @param key1 Key.
*/
public BatchPutInterceptor1(String key1) {
this.key1 = key1;
}
/** {@inheritDoc} */
@Nullable @Override public Object onBeforePut(Cache.Entry entry, Object newVal) {
if (entry.getKey().equals(key1))
return null;
return (Integer)newVal + 1;
}
}
/**
*
*/
private static class GetIncrementInterceptor extends InterceptorAdapter {
/** {@inheritDoc} */
@Nullable @Override public Object onGet(Object key, Object val) {
return (Integer)val + 1;
}
}
/**
*
*/
private static class OneGetInterceptor extends InterceptorAdapter {
/** {@inheritDoc} */
@Nullable @Override public Object onGet(Object key, Object val) {
return 1;
}
}
}