blob: dc1b5e90e69078b66040e67dd89d58190c65f741 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.thin;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.ClientTransaction;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.client.Person;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Checks entry processor invocation for thin client.
*/
@RunWith(Parameterized.class)
public class InvokeTest extends AbstractThinClientTest {
/** */
private static final int NODE_CNT = 3;
/** Client. */
private static IgniteClient client;
/** Client. */
private static ClientCache<Integer, Object> cache;
/** */
@Parameterized.Parameter
public boolean atomic;
/** */
@Parameterized.Parameters(name = "Atomic: {0}")
public static Collection<Object> params() {
return F.asList(true, false);
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGrids(NODE_CNT);
client = startClient(0);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
cache = client.getOrCreateCache(new ClientCacheConfiguration()
.setName(DEFAULT_CACHE_NAME)
.setAtomicityMode(atomic ? CacheAtomicityMode.ATOMIC : CacheAtomicityMode.TRANSACTIONAL)
);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
client.destroyCache(DEFAULT_CACHE_NAME);
}
/**
* Test cache invoke operations, simple case.
*/
@Test
public void testInvokeSimpleCase() {
assertEquals(0, (int)cache.invoke(0, new IncrementProcessor()));
assertEquals(1, (int)cache.invoke(0, new IncrementProcessor()));
}
/**
* Test cache invokeAll operations, simple case.
*/
@Test
public void testInvokeAllSimpleCase() {
Map<Integer, EntryProcessorResult<Integer>> map = cache.invokeAll(new HashSet<>(
F.asList(0, 1)), new IncrementProcessor());
assertEquals(2, map.size());
assertEquals((Integer)0, map.get(0).get());
assertEquals((Integer)0, map.get(1).get());
map = cache.invokeAll(new HashSet<>(F.asList(1, 2)), new IncrementProcessor());
assertEquals(2, map.size());
assertEquals((Integer)1, map.get(1).get());
assertEquals((Integer)0, map.get(2).get());
}
/**
* Test async cache invoke/invokeAll.
*/
@Test
public void testAsync() throws Exception {
assertEquals(3, cache.invokeAsync(0, new TestEntryProcessor(), 1, 2, 3).get());
assertEquals(2, cache.get(0));
Map<Integer, EntryProcessorResult<Object>> map = cache.invokeAllAsync(new HashSet<>(
F.asList(0, 1)), new TestEntryProcessor(), 1, 2, 3).get();
assertEquals(2, map.size());
assertEquals(3, map.get(0).get());
assertEquals(3, map.get(1).get());
assertEquals(1, cache.get(0));
assertEquals(2, cache.get(1));
}
/**
* Test exception handling.
*/
@Test
public void testExceptionHandling() {
try {
cache.invoke(0, new FailingEntryProcessor());
fail();
}
catch (EntryProcessorException e) {
assertTrue("Failed".equals(e.getMessage()));
}
Map<Integer, EntryProcessorResult<Object>> res = cache.invokeAll(
new HashSet<>(F.asList(0, 1)), new FailingEntryProcessor());
assertEquals(2, res.size());
try {
res.get(0).get();
fail();
}
catch (EntryProcessorException e) {
assertTrue("Failed".equals(e.getMessage()));
}
try {
res.get(1).get();
fail();
}
catch (EntryProcessorException e) {
assertTrue("Failed".equals(e.getMessage()));
}
}
/**
* Test exception handling by async operations.
*/
@Test
public void testAsyncExceptionHandling() throws Exception {
IgniteClientFuture<?> fut = cache.invokeAsync(0, new FailingEntryProcessor());
try {
fut.get();
fail();
}
catch (ExecutionException e) {
assertTrue(X.hasCause(e, "Failed", EntryProcessorException.class));
}
Map<Integer, EntryProcessorResult<Object>> res = cache.invokeAllAsync(
new HashSet<>(F.asList(0, 1)), new FailingEntryProcessor()).get();
assertEquals(2, res.size());
try {
res.get(0).get();
fail();
}
catch (EntryProcessorException e) {
assertTrue("Failed".equals(e.getMessage()));
}
try {
res.get(1).get();
fail();
}
catch (EntryProcessorException e) {
assertTrue("Failed".equals(e.getMessage()));
}
}
/**
* Test withKeepBinary flag.
*/
@Test
public void testWithKeepBinary() {
Person person = new Person(0, "name");
ClientCache<Integer, Object> keepBinaryCache = cache.withKeepBinary();
Object res = keepBinaryCache.invoke(0, new BinaryObjectEntryProcessor(), person);
assertEquals(client.binary().toBinary(person), res);
try {
cache.invoke(0, new BinaryObjectEntryProcessor(), person);
fail();
}
catch (EntryProcessorException ignore) {
// Expected.
}
}
/**
* Test arguments and result serialization.
*/
@Test
public void testSerialization() {
checkSerialization(1, 2, 3);
Person p1 = new Person(1, "name1");
Person p2 = new Person(2, "name2");
Person p3 = new Person(3, "name3");
checkSerialization(p1, p2, p3);
checkSerialization(new Object[] {p1, p2}, new Object[] {p2, p3}, new Object[] {p3, p1});
checkSerialization(F.asList(p1, p2), F.asList(p2, p3), F.asList(p3, p1));
}
/** */
private void checkSerialization(Object valIfExists, Object valIfNotExists, Object retVal) {
// Remove keys.
cache.invoke(0, new TestEntryProcessor(), null, null, null);
cache.invokeAll(new HashSet<>(F.asList(0, 1)), new TestEntryProcessor(), null, null, null);
Object res = cache.invoke(0, new TestEntryProcessor(), valIfExists, valIfNotExists, retVal);
assertEqualsArraysAware(res, retVal);
assertEqualsArraysAware(valIfNotExists, cache.get(0));
cache.put(0, 0); // Reset value for existing key.
cache.invoke(0, new TestEntryProcessor(), valIfExists, valIfNotExists, retVal);
assertEqualsArraysAware(valIfExists, cache.get(0));
cache.put(0, 0); // Reset value for existing key.
Map<Integer, EntryProcessorResult<Object>> resMap = cache.invokeAll(new HashSet<>(F.asList(0, 1)),
new TestEntryProcessor(), valIfExists, valIfNotExists, retVal);
assertEquals(2, resMap.size());
assertEqualsArraysAware(retVal, resMap.get(0).get());
assertEqualsArraysAware(retVal, resMap.get(1).get());
assertEqualsArraysAware(valIfExists, cache.get(0));
assertEqualsArraysAware(valIfNotExists, cache.get(1));
}
/**
* Test that invoke/invokeAll is transactional.
*/
@Test
public void testExplicitTx() {
Assume.assumeFalse(atomic);
try (ClientTransaction tx = client.transactions().txStart()) {
cache.invoke(0, new TestEntryProcessor(), 1, 2, 3);
assertEquals(2, cache.get(0));
cache.invoke(0, new TestEntryProcessor(), 1, 2, 3);
assertEquals(1, cache.get(0));
cache.invokeAll(new HashSet<>(F.asList(0, 1)), new TestEntryProcessor(), 1, 2, 3);
assertEquals(F.asMap(0, 1, 1, 2), cache.getAll(new HashSet<>(F.asList(0, 1))));
tx.rollback();
}
assertFalse(cache.containsKey(0));
assertFalse(cache.containsKey(1));
try (ClientTransaction tx = client.transactions().txStart()) {
cache.invoke(0, new TestEntryProcessor(), 1, 2, 3);
cache.invokeAll(new HashSet<>(F.asList(0, 1)), new TestEntryProcessor(), 1, 2, 3);
tx.commit();
}
assertEquals(F.asMap(0, 1, 1, 2), cache.getAll(new HashSet<>(F.asList(0, 1))));
}
/** */
protected static class IncrementProcessor implements EntryProcessor<Integer, Object, Integer> {
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<Integer, Object> e, Object... arguments) {
Integer val = (Integer)e.getValue();
e.setValue(val == null ? 0 : val + 1);
return (Integer)e.getValue();
}
}
/** */
protected static class TestEntryProcessor implements EntryProcessor<Integer, Object, Object> {
/** {@inheritDoc} */
@Override public Object process(MutableEntry<Integer, Object> e, Object... arguments) {
// arguments[0] - value if exists.
// arguments[1] - value if not exists.
// arguments[2] - returned value.
if (arguments == null || arguments.length < 3)
throw new EntryProcessorException("Unexpected arguments: " + Arrays.toString(arguments));
if (arguments[0] == null)
e.remove();
else
e.setValue(e.exists() ? arguments[0] : arguments[1]);
return arguments[2];
}
}
/** */
protected static class FailingEntryProcessor implements EntryProcessor<Integer, Object, Object> {
/** {@inheritDoc} */
@Override public Object process(MutableEntry<Integer, Object> e, Object... arguments) {
throw new EntryProcessorException("Failed");
}
}
/** */
protected static class BinaryObjectEntryProcessor implements EntryProcessor<Integer, Object, Object> {
/** {@inheritDoc} */
@Override public Object process(MutableEntry<Integer, Object> e, Object... arguments) {
if (arguments == null || arguments.length < 1 || !(arguments[0] instanceof BinaryObject))
throw new EntryProcessorException("Expected binary object argument");
return arguments[0];
}
}
}