blob: 29bba06908247d7556be68fcdaeeda971464b324 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
*
*/
public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractTest {
/** */
private Integer lastKey = 0;
/**
* @throws Exception If failed.
*/
@Test
public void testInvoke() throws Exception {
IgniteCache<Integer, Integer> cache = jcache();
invoke(cache, null);
if (atomicityMode() != ATOMIC) {
invoke(cache, PESSIMISTIC); // Tx or Mvcc tx.
if (atomicityMode() == TRANSACTIONAL)
invoke(cache, OPTIMISTIC);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testInternalInvokeNullable() throws Exception {
IgniteInternalCache<Integer, Integer> cache = grid(0).cachex(DEFAULT_CACHE_NAME);
EntryProcessor<Integer, Integer, Void> processor = new NullableProcessor();
for (final Integer key : keys()) {
log.info("Test invoke with a nullable result [key=" + key + ']');
EntryProcessorResult<Void> result = cache.invoke(key, processor);
EntryProcessorResult<Void> resultAsync = cache.invokeAsync(key, processor).get();
assertNotNull(result);
assertNotNull(resultAsync);
assertNull(result.get());
assertNull(resultAsync.get());
}
}
/**
* @param cache Cache.
* @param txMode Not null transaction concurrency mode if explicit transaction should be started.
* @throws Exception If failed.
*/
private void invoke(final IgniteCache<Integer, Integer> cache, @Nullable TransactionConcurrency txMode)
throws Exception {
IncrementProcessor incProcessor = new IncrementProcessor();
for (final Integer key : keys()) {
log.info("Test invoke [key=" + key + ", txMode=" + txMode + ']');
cache.remove(key);
Transaction tx = startTx(txMode);
Integer res = cache.invoke(key, incProcessor);
if (tx != null)
tx.commit();
assertEquals(-1, (int)res);
checkValue(key, 1);
tx = startTx(txMode);
res = cache.invoke(key, incProcessor);
if (tx != null)
tx.commit();
assertEquals(1, (int)res);
checkValue(key, 2);
tx = startTx(txMode);
res = cache.invoke(key, incProcessor);
if (tx != null)
tx.commit();
assertEquals(2, (int)res);
checkValue(key, 3);
tx = startTx(txMode);
res = cache.invoke(key, new ArgumentsSumProcessor(), 10, 20, 30);
if (tx != null)
tx.commit();
assertEquals(3, (int)res);
checkValue(key, 63);
tx = startTx(txMode);
String strRes = cache.invoke(key, new ToStringProcessor());
if (tx != null)
tx.commit();
assertEquals("63", strRes);
checkValue(key, 63);
tx = startTx(txMode);
TestValue testVal = cache.invoke(key, new UserClassValueProcessor());
if (tx != null)
tx.commit();
assertEquals("63", testVal.value());
checkValue(key, 63);
tx = startTx(txMode);
Collection<TestValue> testValCol = cache.invoke(key, new CollectionReturnProcessor());
if (tx != null)
tx.commit();
assertEquals(10, testValCol.size());
for (TestValue val : testValCol)
assertEquals("64", val.value());
checkValue(key, 63);
tx = startTx(txMode);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.invoke(key, new ExceptionProcessor(63));
return null;
}
}, EntryProcessorException.class, "Test processor exception.");
if (tx != null)
tx.commit();
checkValue(key, 63);
IgniteFuture<Integer> fut = cache.invokeAsync(key, incProcessor);
assertNotNull(fut);
assertEquals(63, (int)fut.get());
checkValue(key, 64);
tx = startTx(txMode);
assertNull(cache.invoke(key, new RemoveProcessor(64)));
if (tx != null)
tx.commit();
checkValue(key, null);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testInvokeAll() throws Exception {
IgniteCache<Integer, Integer> cache = jcache();
invokeAll(cache, null);
if (atomicityMode() != ATOMIC) {
invokeAll(cache, PESSIMISTIC);
if (atomicityMode() == TRANSACTIONAL)
invokeAll(cache, OPTIMISTIC);
}
}
/**
*
*/
private static class MyKey {
/** */
String key;
/** */
@AffinityKeyMapped
String affkey = "affkey";
/** */
public MyKey(String key) {
this.key = key;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof MyKey))
return false;
MyKey key1 = (MyKey)o;
return Objects.equals(key, key1.key) &&
Objects.equals(affkey, key1.affkey);
}
/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(key, affkey);
}
/** {@inheritDoc} */
@Override public String toString() {
return "MyKey{" +
"key='" + key + '\'' +
'}';
}
}
/** */
static class MyClass1{}
/** */
static class MyClass2{}
/** */
static class MyClass3{}
/** */
Object[] results = new Object[] {
new MyClass1(),
new MyClass2(),
new MyClass3()
};
/**
* @throws Exception If failed.
*/
@Test
public void testInvokeAllAppliedOnceOnBinaryTypeRegistration() {
IgniteCache<MyKey, Integer> cache = jcache();
Affinity<Object> affinity = grid(0).affinity(cache.getName());
for (int i = 0; i < gridCount(); i++) {
if (!affinity.isPrimary(grid(i).localNode(), new MyKey(""))) {
cache = jcache(i);
break;
}
}
LinkedHashSet<MyKey> keys = new LinkedHashSet<>(Arrays.asList(
new MyKey("remove_0"), new MyKey("1"), new MyKey("2"),
new MyKey("remove_3"), new MyKey("remove_4"), new MyKey("register_type_0"),
new MyKey("6"), new MyKey("remove_7"), new MyKey("register_type_1"),
new MyKey("9"), new MyKey("remove_10"), new MyKey("11"), new MyKey("12"), new MyKey("register_type_2")
));
for (MyKey key : keys)
cache.put(key, 0);
cache.invokeAll(keys,
new CacheEntryProcessor<MyKey, Integer, Object>() {
@IgniteInstanceResource
Ignite ignite;
@Override public Object process(MutableEntry<MyKey, Integer> entry,
Object... objects) throws EntryProcessorException {
String key = entry.getKey().key;
if (key.startsWith("register_type")) {
BinaryObjectBuilder bo = ignite.binary().builder(key);
bo.build();
}
if (key.startsWith("remove")) {
entry.remove();
}
else {
Integer value = entry.getValue() == null ? 0 : entry.getValue();
entry.setValue(++value);
}
if (key.startsWith("register_type"))
return results[Integer.parseInt(key.substring(key.lastIndexOf("_") + 1))];
return null;
}
});
Map<MyKey, Integer> all = cache.getAll(keys);
for (Map.Entry<MyKey, Integer> entry : all.entrySet()) {
MyKey key = entry.getKey();
if (key.key.startsWith("remove")) {
assertNull(entry.getValue());
if (cacheStoreFactory() != null)
assertNull(storeMap.get(keys));
}
else {
int value = entry.getValue();
assertEquals("\"" + key + "' entry has wrong value, exp=1 actl=" + value, 1, value);
if (cacheStoreFactory() != null)
assertEquals("\"" + key + "' entry has wrong value in cache store, exp=1 actl=" + value,
1, (int)storeMap.get(key));
}
}
}
/**
* @param cache Cache.
* @param txMode Not null transaction concurrency mode if explicit transaction should be started.
* @throws Exception If failed.
*/
private void invokeAll(IgniteCache<Integer, Integer> cache, @Nullable TransactionConcurrency txMode) throws Exception {
invokeAll(cache, new HashSet<>(primaryKeys(cache, 3, 0)), txMode);
if (gridCount() > 1) {
invokeAll(cache, new HashSet<>(backupKeys(cache, 3, 0)), txMode);
invokeAll(cache, new HashSet<>(nearKeys(cache, 3, 0)), txMode);
Set<Integer> keys = new HashSet<>();
keys.addAll(primaryKeys(jcache(0), 3, 0));
keys.addAll(primaryKeys(jcache(1), 3, 0));
keys.addAll(primaryKeys(jcache(2), 3, 0));
invokeAll(cache, keys, txMode);
}
Set<Integer> keys = new HashSet<>();
for (int i = 0; i < 1000; i++)
keys.add(i);
invokeAll(cache, keys, txMode);
}
/**
* @param cache Cache.
* @param keys Keys.
* @param txMode Not null transaction concurrency mode if explicit transaction should be started.
* @throws Exception If failed.
*/
private void invokeAll(IgniteCache<Integer, Integer> cache,
Set<Integer> keys,
@Nullable TransactionConcurrency txMode) throws Exception {
cache.removeAll(keys);
log.info("Test invokeAll [keys=" + keys + ", txMode=" + txMode + ']');
IncrementProcessor incProcessor = new IncrementProcessor();
{
Transaction tx = startTx(txMode);
Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, incProcessor);
if (tx != null)
tx.commit();
Map<Object, Object> exp = new HashMap<>();
for (Integer key : keys)
exp.put(key, -1);
checkResult(resMap, exp);
for (Integer key : keys)
checkValue(key, 1);
}
{
Transaction tx = startTx(txMode);
Map<Integer, EntryProcessorResult<TestValue>> resMap = cache.invokeAll(keys, new UserClassValueProcessor());
if (tx != null)
tx.commit();
Map<Object, Object> exp = new HashMap<>();
for (Integer key : keys)
exp.put(key, new TestValue("1"));
checkResult(resMap, exp);
for (Integer key : keys)
checkValue(key, 1);
}
{
Transaction tx = startTx(txMode);
Map<Integer, EntryProcessorResult<Collection<TestValue>>> resMap =
cache.invokeAll(keys, new CollectionReturnProcessor());
if (tx != null)
tx.commit();
Map<Object, Object> exp = new HashMap<>();
for (Integer key : keys) {
List<TestValue> expCol = new ArrayList<>();
for (int i = 0; i < 10; i++)
expCol.add(new TestValue("2"));
exp.put(key, expCol);
}
checkResult(resMap, exp);
for (Integer key : keys)
checkValue(key, 1);
}
{
Transaction tx = startTx(txMode);
Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, incProcessor);
if (tx != null)
tx.commit();
Map<Object, Object> exp = new HashMap<>();
for (Integer key : keys)
exp.put(key, 1);
checkResult(resMap, exp);
for (Integer key : keys)
checkValue(key, 2);
}
{
Transaction tx = startTx(txMode);
Map<Integer, EntryProcessorResult<Integer>> resMap =
cache.invokeAll(keys, new ArgumentsSumProcessor(), 10, 20, 30);
if (tx != null)
tx.commit();
Map<Object, Object> exp = new HashMap<>();
for (Integer key : keys)
exp.put(key, 3);
checkResult(resMap, exp);
for (Integer key : keys)
checkValue(key, 62);
}
{
Transaction tx = startTx(txMode);
Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, new ExceptionProcessor(null));
if (tx != null)
tx.commit();
for (Integer key : keys) {
final EntryProcessorResult<Integer> res = resMap.get(key);
assertNotNull("No result for " + key);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
res.get();
return null;
}
}, EntryProcessorException.class, "Test processor exception.");
}
for (Integer key : keys)
checkValue(key, 62);
}
{
Transaction tx = startTx(txMode);
Map<Integer, EntryProcessor<Integer, Integer, Integer>> invokeMap = new HashMap<>();
for (Integer key : keys) {
switch (key % 4) {
case 0: invokeMap.put(key, new IncrementProcessor()); break;
case 1: invokeMap.put(key, new RemoveProcessor(62)); break;
case 2: invokeMap.put(key, new ArgumentsSumProcessor()); break;
case 3: invokeMap.put(key, new ExceptionProcessor(62)); break;
default:
fail();
}
}
Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(invokeMap, 10, 20, 30);
if (tx != null)
tx.commit();
for (Integer key : keys) {
final EntryProcessorResult<Integer> res = resMap.get(key);
switch (key % 4) {
case 0: {
assertNotNull("No result for " + key, res);
assertEquals(62, (int)res.get());
checkValue(key, 63);
break;
}
case 1: {
assertNull(res);
checkValue(key, null);
break;
}
case 2: {
assertNotNull("No result for " + key, res);
assertEquals(3, (int)res.get());
checkValue(key, 122);
break;
}
case 3: {
assertNotNull("No result for " + key, res);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
res.get();
return null;
}
}, EntryProcessorException.class, "Test processor exception.");
checkValue(key, 62);
break;
}
}
}
}
cache.invokeAll(keys, new IncrementProcessor());
{
Transaction tx = startTx(txMode);
Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, new RemoveProcessor(null));
if (tx != null)
tx.commit();
assertEquals("Unexpected results: " + resMap, 0, resMap.size());
for (Integer key : keys)
checkValue(key, null);
}
IgniteFuture<Map<Integer, EntryProcessorResult<Integer>>> fut = cache.invokeAllAsync(keys, new IncrementProcessor());
Map<Integer, EntryProcessorResult<Integer>> resMap = fut.get();
Map<Object, Object> exp = new HashMap<>();
for (Integer key : keys)
exp.put(key, -1);
checkResult(resMap, exp);
for (Integer key : keys)
checkValue(key, 1);
Map<Integer, EntryProcessor<Integer, Integer, Integer>> invokeMap = new HashMap<>();
for (Integer key : keys)
invokeMap.put(key, incProcessor);
fut = cache.invokeAllAsync(invokeMap);
resMap = fut.get();
for (Integer key : keys)
exp.put(key, 1);
checkResult(resMap, exp);
for (Integer key : keys)
checkValue(key, 2);
}
/**
* @param resMap Result map.
* @param exp Expected results.
*/
private void checkResult(Map resMap, Map<Object, Object> exp) {
assertNotNull(resMap);
assertEquals(exp.size(), resMap.size());
for (Map.Entry<Object, Object> expVal : exp.entrySet()) {
EntryProcessorResult<?> res = (EntryProcessorResult)resMap.get(expVal.getKey());
assertNotNull("No result for " + expVal.getKey(), res);
assertEquals("Unexpected result for " + expVal.getKey(), res.get(), expVal.getValue());
}
}
/**
* @param key Key.
* @param expVal Expected value.
*/
protected void checkValue(Object key, @Nullable Object expVal) {
if (expVal != null) {
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Object, Object> cache = jcache(i);
Object val = cache.localPeek(key);
if (val == null)
assertFalse(ignite(0).affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(ignite(i).cluster().localNode(), key));
else
assertEquals("Unexpected value for grid " + i, expVal, val);
}
}
else {
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Object, Object> cache = jcache(i);
assertNull("Unexpected non null value for grid " + i, cache.localPeek(key, CachePeekMode.ONHEAP));
}
}
}
/**
* @return Test keys.
* @throws Exception If failed.
*/
protected Collection<Integer> keys() throws Exception {
IgniteCache<Integer, Object> cache = jcache(0);
ArrayList<Integer> keys = new ArrayList<>();
keys.add(primaryKeys(cache, 1, lastKey).get(0));
if (gridCount() > 1) {
keys.add(backupKeys(cache, 1, lastKey).get(0));
if (cache.getConfiguration(CacheConfiguration.class).getCacheMode() != REPLICATED)
keys.add(nearKeys(cache, 1, lastKey).get(0));
}
lastKey = Collections.max(keys) + 1;
return keys;
}
/**
* @param txMode Transaction concurrency mode.
* @return Transaction.
*/
@Nullable private Transaction startTx(@Nullable TransactionConcurrency txMode) {
return txMode == null ? null : ignite(0).transactions().txStart(txMode, REPEATABLE_READ);
}
/**
*
*/
private static class ArgumentsSumProcessor implements EntryProcessor<Integer, Integer, Integer> {
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<Integer, Integer> e, Object... args)
throws EntryProcessorException {
assertEquals(3, args.length);
assertEquals(10, args[0]);
assertEquals(20, args[1]);
assertEquals(30, args[2]);
assertTrue(e.exists());
Integer res = e.getValue();
for (Object arg : args)
res += (Integer)arg;
e.setValue(res);
return args.length;
}
}
/**
*
*/
protected static class ToStringProcessor implements EntryProcessor<Integer, Integer, String> {
/** {@inheritDoc} */
@Override public String process(MutableEntry<Integer, Integer> e, Object... arguments)
throws EntryProcessorException {
return String.valueOf(e.getValue());
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ToStringProcessor.class, this);
}
}
/**
*
*/
protected static class UserClassValueProcessor implements EntryProcessor<Integer, Integer, TestValue> {
/** {@inheritDoc} */
@Override public TestValue process(MutableEntry<Integer, Integer> e, Object... arguments)
throws EntryProcessorException {
return new TestValue(String.valueOf(e.getValue()));
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(UserClassValueProcessor.class, this);
}
}
/**
*
*/
protected static class CollectionReturnProcessor implements
EntryProcessor<Integer, Integer, Collection<TestValue>> {
/** {@inheritDoc} */
@Override public Collection<TestValue> process(MutableEntry<Integer, Integer> e, Object... arguments)
throws EntryProcessorException {
List<TestValue> vals = new ArrayList<>();
for (int i = 0; i < 10; i++)
vals.add(new TestValue(String.valueOf(e.getValue() + 1)));
return vals;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CollectionReturnProcessor.class, this);
}
}
/**
*
*/
protected static class IncrementProcessor implements EntryProcessor<Integer, Integer, Integer> {
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<Integer, Integer> e,
Object... arguments) throws EntryProcessorException {
Ignite ignite = e.unwrap(Ignite.class);
assertNotNull(ignite);
if (e.exists()) {
Integer val = e.getValue();
assertNotNull(val);
e.setValue(val + 1);
assertTrue(e.exists());
assertEquals(val + 1, (int)e.getValue());
return val;
}
else {
e.setValue(1);
return -1;
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IncrementProcessor.class, this);
}
}
/**
*
*/
private static class RemoveProcessor implements EntryProcessor<Integer, Integer, Integer> {
/** */
private Integer expVal;
/**
* @param expVal Expected value.
*/
RemoveProcessor(@Nullable Integer expVal) {
this.expVal = expVal;
}
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<Integer, Integer> e,
Object... arguments) throws EntryProcessorException {
assertTrue(e.exists());
if (expVal != null)
assertEquals(expVal, e.getValue());
e.remove();
assertFalse(e.exists());
return null;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(RemoveProcessor.class, this);
}
}
/**
*
*/
private static class ExceptionProcessor implements EntryProcessor<Integer, Integer, Integer> {
/** */
private Integer expVal;
/**
* @param expVal Expected value.
*/
ExceptionProcessor(@Nullable Integer expVal) {
this.expVal = expVal;
}
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<Integer, Integer> e,
Object... arguments) throws EntryProcessorException {
assertTrue(e.exists());
if (expVal != null)
assertEquals(expVal, e.getValue());
throw new EntryProcessorException("Test processor exception.");
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ExceptionProcessor.class, this);
}
}
/**
* EntryProcessor which always returns {@code null}.
*/
private static class NullableProcessor implements EntryProcessor<Integer, Integer, Void> {
/** {@inheritDoc} */
@Override public Void process(MutableEntry<Integer, Integer> e,
Object... arguments) throws EntryProcessorException {
return null;
}
}
/**
*
*/
static class TestValue {
/** */
private String val;
/**
* @param val Value.
*/
public TestValue(String val) {
this.val = val;
}
/**
* @return Value.
*/
public String value() {
return val;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TestValue testVal = (TestValue)o;
return val.equals(testVal.val);
}
/** {@inheritDoc} */
@Override public int hashCode() {
return val.hashCode();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TestValue.class, this);
}
}
}