blob: 0c186d9133ac06413c983038a8d454461ca7f20a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.testframework.junits.IgniteCacheConfigVariationsAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.testframework.junits.IgniteConfigVariationsAbstractTest.DataMode.PLANE_OBJECT;
import static org.apache.ignite.testframework.junits.IgniteConfigVariationsAbstractTest.DataMode.SERIALIZABLE;
/**
*
*/
@SuppressWarnings("unchecked")
public class WithKeepBinaryCacheFullApiTest extends IgniteCacheConfigVariationsAbstractTest {
/** */
protected static volatile boolean interceptorBinaryObjExp = true;
/** */
public static final int CNT = 10;
/** */
public static final CacheEntryProcessor NOOP_ENTRY_PROC = new CacheEntryProcessor() {
@Override public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException {
return null;
}
};
/** */
public static final CacheEntryProcessor INC_ENTRY_PROC_USER_OBJ = new CacheEntryProcessor() {
@Override public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException {
assertTrue(entry.getKey() instanceof BinaryObject);
Object val = entry.getValue();
int valId = 0;
if (val != null) {
assertTrue(val instanceof BinaryObject);
valId = valueOf(((BinaryObject)val).deserialize()) + 1;
}
Object newVal = value(valId, (DataMode)arguments[0]);
assertFalse(newVal instanceof BinaryObject);
entry.setValue(newVal);
return val == null ? null : ((BinaryObject)val).deserialize();
}
};
/** */
public static final CacheEntryProcessor INC_ENTRY_PROC_BINARY_OBJ = new CacheEntryProcessor() {
@Override public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException {
assertTrue(entry.getKey() instanceof BinaryObject);
Object val = entry.getValue();
int valId = 0;
if (val != null) {
assertTrue(val instanceof BinaryObject);
valId = valueOf(((BinaryObject)val).deserialize()) + 1;
}
Object newVal = value(valId, (DataMode)arguments[0]);
Object newBinaryVal = ((Ignite)entry.unwrap(Ignite.class)).binary().toBinary(newVal);
entry.setValue(newBinaryVal);
return val;
}
};
/** {@inheritDoc} */
@Override protected CacheConfiguration cacheConfiguration() {
CacheConfiguration cc = super.cacheConfiguration();
cc.setStoreKeepBinary(true);
return cc;
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("serial")
@Test
public void testRemovePutGet() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
final IgniteCache cache = jcache().withKeepBinary();
final Set keys = new LinkedHashSet();
for (int i = 0; i < CNT; i++)
keys.add(key(i));
runInAllTxModes(new TestRunnable() {
@Override public void run() throws Exception {
for (Object key : keys)
cache.remove(key);
}
});
runInAllTxModes(new TestRunnable() {
@Override public void run() throws Exception {
for (Object key : keys) {
assertNull(cache.get(key));
assertNull(cache.getEntry(key));
}
}
});
for (final Object key : keys) {
runInAllTxModes(new TestRunnable() {
@Override public void run() throws Exception {
Object val = value(valueOf(key));
cache.put(key, val);
BinaryObject retVal = (BinaryObject)cache.get(key);
assertEquals(val, retVal.deserialize());
CacheEntry<BinaryObject, BinaryObject> entry = cache.getEntry(key);
assertTrue(entry.getKey() instanceof BinaryObject);
assertEquals(val, entry.getValue().deserialize());
assertTrue(cache.remove(key));
}
});
}
}
}, PLANE_OBJECT, SERIALIZABLE);
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("serial")
@Test
public void testRemovePutGetAsync() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
final IgniteCache cache = jcache().withKeepBinary();
final Set keys = new LinkedHashSet();
for (int i = 0; i < CNT; i++)
keys.add(key(i));
runInAllTxModes(new TestRunnable() {
@Override public void run() throws Exception {
for (Object key : keys)
cache.removeAsync(key).get();
}
});
runInAllTxModes(new TestRunnable() {
@Override public void run() throws Exception {
for (Object key : keys) {
assertNull(cache.getAsync(key).get());
assertNull(cache.getEntryAsync(key).get());
}
}
});
for (final Object key : keys) {
runInAllTxModes(new TestRunnable() {
@Override public void run() throws Exception {
Object val = value(valueOf(key));
cache.putAsync(key, val).get();
BinaryObject retVal = (BinaryObject)cache.getAsync(key).get();
assertEquals(val, retVal.deserialize());
CacheEntry<BinaryObject, BinaryObject> e =
(CacheEntry<BinaryObject, BinaryObject>)cache.getEntryAsync(key).get();
assertEquals(key, deserializeBinary(e.getKey()));
assertEquals(val, e.getValue().deserialize());
}
});
}
}
}, PLANE_OBJECT, SERIALIZABLE);
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("serial")
@Test
public void testPutAllGetAll() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
final IgniteCache cache = jcache().withKeepBinary();
final Set keys = new LinkedHashSet();
for (int i = 0; i < CNT; i++)
keys.add(key(i));
runInAllTxModes(new TestRunnable() {
@Override public void run() throws Exception {
for (Object val : cache.getAll(keys).values())
assertNull(val);
}
});
runInAllTxModes(new TestRunnable() {
@Override public void run() throws Exception {
Collection<CacheEntry> entries = cache.<CacheEntry>getEntries(keys);
for (CacheEntry e : entries)
assertNull(e.getValue());
}
});
runInAllTxModes(new TestRunnable() {
@Override public void run() throws Exception {
Map keyValMap = new LinkedHashMap();
for (Object key : keys) {
Object val = value(valueOf(key));
keyValMap.put(key, val);
}
cache.putAll(keyValMap);
Set<Map.Entry<BinaryObject, BinaryObject>> set = cache.getAll(keys).entrySet();
for (Map.Entry<BinaryObject, BinaryObject> e : set) {
Object expVal = value(valueOf(e.getKey().deserialize()));
assertEquals(expVal, e.getValue().deserialize());
}
Collection<CacheEntry<BinaryObject, BinaryObject>> entries = cache.getEntries(keys);
for (CacheEntry<BinaryObject, BinaryObject> e : entries) {
assertTrue(e.getKey() instanceof BinaryObject);
Object expVal = value(valueOf(e.getKey().deserialize()));
assertEquals(expVal, e.getValue().deserialize());
}
cache.removeAll(keys);
}
});
}
}, PLANE_OBJECT, SERIALIZABLE);
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("serial")
@Test
public void testPutAllGetAllAsync() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
final IgniteCache cache = jcache().withKeepBinary();
final Set keys = new LinkedHashSet();
for (int i = 0; i < CNT; i++)
keys.add(key(i));
runInAllTxModes(new TestRunnable() {
@Override public void run() throws Exception {
Map res = (Map)cache.getAllAsync(keys).get();
for (Object val : res.values())
assertNull(val);
}
});
runInAllTxModes(new TestRunnable() {
@Override public void run() throws Exception {
Collection<CacheEntry> entries =
(Collection<CacheEntry>)cache.<CacheEntry>getEntriesAsync(keys).get();
for (CacheEntry e : entries)
assertNull(e.getValue());
}
});
runInAllTxModes(new TestRunnable() {
@Override public void run() throws Exception {
Map keyValMap = new LinkedHashMap();
for (Object key : keys) {
Object val = value(valueOf(key));
keyValMap.put(key, val);
}
cache.putAllAsync(keyValMap).get();
Set<Map.Entry<BinaryObject, BinaryObject>> set =
((Map)cache.getAllAsync(keys).get()).entrySet();
for (Map.Entry<BinaryObject, BinaryObject> e : set) {
Object expVal = value(valueOf(e.getKey().deserialize()));
assertEquals(expVal, e.getValue().deserialize());
}
Collection<CacheEntry<BinaryObject, BinaryObject>> entries =
(Collection<CacheEntry<BinaryObject, BinaryObject>>)cache.getEntriesAsync(keys).get();
for (CacheEntry<BinaryObject, BinaryObject> e : entries) {
assertTrue(e.getKey() instanceof BinaryObject);
Object expVal = value(valueOf(e.getKey().deserialize()));
assertEquals(expVal, e.getValue().deserialize());
}
cache.removeAllAsync(keys).get();
}
});
}
}, PLANE_OBJECT, SERIALIZABLE);
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("serial")
@Test
public void testInvoke() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
final IgniteCache cache = jcache().withKeepBinary();
Set keys = new LinkedHashSet();
for (int i = 0; i < CNT; i++)
keys.add(key(i));
for (final Object key : keys) {
Object res = cache.invoke(key, NOOP_ENTRY_PROC);
assertNull(res);
assertNull(cache.get(key));
}
for (final Object key : keys) {
Object res = cache.invoke(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode);
assertNull(res);
assertEquals(value(0), deserializeBinary(cache.get(key)));
res = cache.invoke(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode);
assertEquals(value(0), deserializeBinary(res));
assertEquals(value(1), deserializeBinary(cache.get(key)));
assertTrue(cache.remove(key));
}
// TODO IGNITE-2973: should be always false.
interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL;
try {
for (final Object key : keys) {
Object res = cache.invoke(key, INC_ENTRY_PROC_USER_OBJ, dataMode);
assertNull(res);
assertEquals(value(0), deserializeBinary(cache.get(key)));
res = cache.invoke(key, INC_ENTRY_PROC_USER_OBJ, dataMode);
// TODO IGNITE-2953: uncomment the following assert when the issue will be fixed.
// assertEquals(value(0), res);
assertEquals(value(1), deserializeBinary(cache.get(key)));
assertTrue(cache.remove(key));
}
}
finally {
interceptorBinaryObjExp = true;
}
}
}, PLANE_OBJECT, SERIALIZABLE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testInvokeTx() throws Exception {
if (!txShouldBeUsed())
return;
for (TransactionConcurrency conc : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
info(">>>>> Executing test using explicit txs [concurrency=" + conc + ", isolation=" + isolation + "]");
checkInvokeTx(conc, isolation);
jcache().removeAll();
}
}
}
/**
* @param conc Concurrency
* @param isolation Isolation.
* @throws Exception If failed.
*/
@SuppressWarnings("serial")
public void checkInvokeTx(final TransactionConcurrency conc, final TransactionIsolation isolation) throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
final IgniteCache cache = jcache().withKeepBinary();
Set keys = new LinkedHashSet();
for (int i = 0; i < CNT; i++)
keys.add(key(i));
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
for (final Object key : keys) {
Object res = cache.invoke(key, NOOP_ENTRY_PROC);
assertNull(res);
assertNull(cache.get(key));
}
tx.commit();
}
for (final Object key : keys) {
Object res;
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
res = cache.invoke(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode);
tx.commit();
}
assertNull(res);
assertEquals(value(0), deserializeBinary(cache.get(key)));
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
res = cache.invoke(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode);
tx.commit();
}
assertEquals(value(0), deserializeBinary(res));
assertEquals(value(1), deserializeBinary(cache.get(key)));
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
assertTrue(cache.remove(key));
tx.commit();
}
}
// TODO IGNITE-2973: should be always false.
interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL;
try {
for (final Object key : keys) {
Object res;
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
res = cache.invoke(key, INC_ENTRY_PROC_USER_OBJ, dataMode);
tx.commit();
}
assertNull(res);
assertEquals(value(0), deserializeBinary(cache.get(key)));
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
res = cache.invoke(key, INC_ENTRY_PROC_USER_OBJ, dataMode);
tx.commit();
}
// TODO IGNITE-2953: uncomment the following assert when the issue will be fixed.
// assertEquals(value(0), res);
assertEquals(value(1), deserializeBinary(cache.get(key)));
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
assertTrue(cache.remove(key));
tx.commit();
}
}
}
finally {
interceptorBinaryObjExp = true;
}
}
}, PLANE_OBJECT, SERIALIZABLE);
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("serial")
@Test
public void testInvokeAsync() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
final IgniteCache cache = jcache().withKeepBinary();
Set keys = new LinkedHashSet();
for (int i = 0; i < CNT; i++)
keys.add(key(i));
for (final Object key : keys) {
Object res = cache.invokeAsync(key, NOOP_ENTRY_PROC).get();
assertNull(res);
assertNull(cache.getAsync(key).get());
}
for (final Object key : keys) {
Object res = cache.invokeAsync(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode).get();
assertNull(res);
assertEquals(value(0), deserializeBinary(cache.getAsync(key).get()));
res = cache.invokeAsync(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode).get();
assertEquals(value(0), deserializeBinary(res));
assertEquals(value(1), deserializeBinary(cache.getAsync(key).get()));
assertTrue((Boolean)cache.removeAsync(key).get());
}
// TODO IGNITE-2973: should be always false.
interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL;
try {
for (final Object key : keys) {
Object res = cache.invokeAsync(key, INC_ENTRY_PROC_USER_OBJ, dataMode).get();
assertNull(res);
assertEquals(value(0), deserializeBinary(cache.getAsync(key).get()));
res = cache.invokeAsync(key, INC_ENTRY_PROC_USER_OBJ, dataMode).get();
// TODO IGNITE-2953: uncomment the following assert when the issue will be fixed.
// assertEquals(value(0), res);
assertEquals(value(1), deserializeBinary(cache.getAsync(key).get()));
assertTrue((Boolean)cache.removeAsync(key).get());
}
}
finally {
interceptorBinaryObjExp = true;
}
}
}, PLANE_OBJECT, SERIALIZABLE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testInvokeAsyncTx() throws Exception {
if (!txShouldBeUsed())
return;
for (TransactionConcurrency conc : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
checkInvokeAsyncTx(conc, isolation);
jcache().removeAll();
}
}
}
/**
* @param conc Concurrency.
* @param isolation Isolation.
* @throws Exception If failed.
*/
@SuppressWarnings("serial")
public void checkInvokeAsyncTx(final TransactionConcurrency conc, final TransactionIsolation isolation) throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
final IgniteCache cache = jcache().withKeepBinary();
Set keys = new LinkedHashSet();
for (int i = 0; i < CNT; i++)
keys.add(key(i));
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
for (final Object key : keys) {
Object res = cache.invokeAsync(key, NOOP_ENTRY_PROC).get();
assertNull(res);
assertNull(cache.getAsync(key).get());
}
tx.commit();
}
for (final Object key : keys) {
Object res;
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
res = cache.invokeAsync(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode).get();
tx.commit();
}
assertNull(res);
assertEquals(value(0), deserializeBinary(cache.getAsync(key).get()));
IgniteFuture f;
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
f = cache.invokeAsync(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode);
tx.commit();
}
res = f.get();
assertEquals(value(0), deserializeBinary(res));
assertEquals(value(1), deserializeBinary(cache.getAsync(key).get()));
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
assertTrue((Boolean)cache.removeAsync(key).get());
tx.commit();
}
}
// TODO IGNITE-2973: should be always false.
interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL;
try {
for (final Object key : keys) {
Object res;
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
res = cache.invokeAsync(key, INC_ENTRY_PROC_USER_OBJ, dataMode).get();
tx.commit();
}
assertNull(res);
assertEquals(value(0), deserializeBinary(cache.getAsync(key).get()));
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
res = cache.invokeAsync(key, INC_ENTRY_PROC_USER_OBJ, dataMode).get();
tx.commit();
}
// TODO IGNITE-2953: uncomment the following assert when the issue will be fixed.
// assertEquals(value(0), res);
assertEquals(value(1), deserializeBinary(cache.getAsync(key).get()));
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
assertTrue((Boolean)cache.removeAsync(key).get());
tx.commit();
}
}
}
finally {
interceptorBinaryObjExp = true;
}
}
}, PLANE_OBJECT, SERIALIZABLE);
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("serial")
@Ignore("https://issues.apache.org/jira/browse/IGNITE-11884")
@Test
public void testInvokeAll() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
final IgniteCache cache = jcache().withKeepBinary();
Set keys = new LinkedHashSet();
for (int i = 0; i < CNT; i++)
keys.add(key(i));
Map<Object, EntryProcessorResult<Object>> resMap = cache.invokeAll(keys, NOOP_ENTRY_PROC);
for (Map.Entry<Object, EntryProcessorResult<Object>> e : resMap.entrySet()) {
assertTrue("Key:" + e.getKey(), e.getKey() instanceof BinaryObject);
assertNull(e.getValue().get());
}
resMap = cache.invokeAll(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode);
checkInvokeAllResult(cache, resMap, null, value(0), true);
resMap = cache.invokeAll(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode);
checkInvokeAllResult(cache, resMap, value(0), value(1), true);
cache.removeAll(keys);
// TODO IGNITE-2973: should be always false.
interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL;
try {
resMap = cache.invokeAll(keys, INC_ENTRY_PROC_USER_OBJ, dataMode);
checkInvokeAllResult(cache, resMap, null, value(0), false);
resMap = cache.invokeAll(keys, INC_ENTRY_PROC_USER_OBJ, dataMode);
checkInvokeAllResult(cache, resMap, value(0), value(1), false);
}
finally {
interceptorBinaryObjExp = true;
}
}
}, PLANE_OBJECT, SERIALIZABLE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testInvokeAllTx() throws Exception {
if (!txShouldBeUsed())
return;
for (TransactionConcurrency conc : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
checkInvokeAllTx(conc, isolation);
jcache().removeAll();
}
}
}
/**
* @param conc Concurrency.
* @param isol Isolation.
* @throws Exception If failed.
*/
@SuppressWarnings("serial")
private void checkInvokeAllTx(final TransactionConcurrency conc, final TransactionIsolation isol) throws Exception {
if (!txShouldBeUsed())
return;
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
final IgniteCache cache = jcache().withKeepBinary();
final Set keys = new LinkedHashSet();
for (int i = 0; i < CNT; i++)
keys.add(key(i));
try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) {
Map<Object, EntryProcessorResult<Object>> resMap = cache.invokeAll(keys, NOOP_ENTRY_PROC);
for (Map.Entry<Object, EntryProcessorResult<Object>> e : resMap.entrySet()) {
assertTrue("Key:" + e.getKey(), e.getKey() instanceof BinaryObject);
assertNull(e.getValue().get());
}
tx.commit();
}
Map<Object, EntryProcessorResult<Object>> resMap;
try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) {
resMap = cache.invokeAll(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode);
tx.commit();
}
checkInvokeAllResult(cache, resMap, null, value(0), true);
try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) {
resMap = cache.invokeAll(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode);
tx.commit();
}
checkInvokeAllResult(cache, resMap, value(0), value(1), true);
try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) {
cache.removeAll(keys);
tx.commit();
}
// TODO IGNITE-2973: should be always false.
interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL;
try {
try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) {
resMap = cache.invokeAll(keys, INC_ENTRY_PROC_USER_OBJ, dataMode);
tx.commit();
}
checkInvokeAllResult(cache, resMap, null, value(0), false);
try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) {
resMap = cache.invokeAll(keys, INC_ENTRY_PROC_USER_OBJ, dataMode);
tx.commit();
}
checkInvokeAllResult(cache, resMap, value(0), value(1), false);
try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) {
cache.removeAll(keys);
tx.commit();
}
}
finally {
interceptorBinaryObjExp = true;
}
}
}, PLANE_OBJECT, SERIALIZABLE);
}
/**
* @param cache Cache.
* @param resMap Result map.
* @param expRes Expected result.
* @param cacheVal Expected cache value for key.
* @param deserializeRes Deseriallize result flag.
*/
private void checkInvokeAllResult(IgniteCache cache, Map<Object, EntryProcessorResult<Object>> resMap,
Object expRes, Object cacheVal, boolean deserializeRes) {
for (Map.Entry<Object, EntryProcessorResult<Object>> e : resMap.entrySet()) {
info("Key: " + e.getKey());
assertTrue("Wrong key type, binary object expected: " + e.getKey(), e.getKey() instanceof BinaryObject);
Object res = e.getValue().get();
// TODO IGNITE-2953: delete the following if when the issue wiil be fixed.
if (deserializeRes)
assertEquals(expRes, deserializeRes ? deserializeBinary(res) : res);
if (cache.get(e.getKey()) == null)
cache.get(e.getKey());
assertEquals(cacheVal, deserializeBinary(cache.get(e.getKey())));
}
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("serial")
@Ignore("https://issues.apache.org/jira/browse/IGNITE-11884")
@Test
public void testInvokeAllAsync() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
final IgniteCache cache = jcache().withKeepBinary();
final Set keys = new LinkedHashSet();
for (int i = 0; i < CNT; i++)
keys.add(key(i));
Map<Object, EntryProcessorResult<Object>> resMap =
(Map<Object, EntryProcessorResult<Object>>)cache.invokeAllAsync(keys, NOOP_ENTRY_PROC).get();
for (Map.Entry<Object, EntryProcessorResult<Object>> e : resMap.entrySet()) {
assertTrue("Wrong key type, binary object expected: " + e.getKey(), e.getKey() instanceof BinaryObject);
assertNull(e.getValue().get());
}
resMap = (Map<Object, EntryProcessorResult<Object>>)
cache.invokeAllAsync(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode).get();
checkInvokeAllAsyncResult(cache, resMap, null, value(0), true);
resMap = (Map<Object, EntryProcessorResult<Object>>)
cache.invokeAllAsync(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode).get();
checkInvokeAllAsyncResult(cache, resMap, value(0), value(1), true);
cache.removeAllAsync(keys).get();
// TODO IGNITE-2973: should be always false.
interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL;
try {
resMap = (Map<Object, EntryProcessorResult<Object>>)
cache.invokeAllAsync(keys, INC_ENTRY_PROC_USER_OBJ, dataMode).get();
checkInvokeAllAsyncResult(cache, resMap, null, value(0), false);
resMap = (Map<Object, EntryProcessorResult<Object>>)
cache.invokeAllAsync(keys, INC_ENTRY_PROC_USER_OBJ, dataMode).get();
checkInvokeAllAsyncResult(cache, resMap, value(0), value(1), false);
cache.removeAllAsync(keys).get();
}
finally {
interceptorBinaryObjExp = true;
}
}
}, PLANE_OBJECT, SERIALIZABLE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testInvokeAllAsyncTx() throws Exception {
if (!txShouldBeUsed())
return;
for (TransactionConcurrency conc : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
checkInvokeAllAsycnTx(conc, isolation);
jcache().removeAll();
}
}
}
/**
*
* @param conc Concurrency.
* @param isolation Isolation.
* @throws Exception If failed.
*/
private void checkInvokeAllAsycnTx(final TransactionConcurrency conc, final TransactionIsolation isolation) throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
final IgniteCache cache = jcache().withKeepBinary();
final Set keys = new LinkedHashSet();
for (int i = 0; i < CNT; i++)
keys.add(key(i));
Map<Object, EntryProcessorResult<Object>> resMap;
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
resMap = (Map<Object, EntryProcessorResult<Object>>)
cache.invokeAllAsync(keys, NOOP_ENTRY_PROC).get();
tx.commit();
}
for (Map.Entry<Object, EntryProcessorResult<Object>> e : resMap.entrySet()) {
assertTrue("Key:" + e.getKey(), e.getKey() instanceof BinaryObject);
assertNull(e.getValue().get());
}
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
resMap = (Map<Object, EntryProcessorResult<Object>>)
cache.invokeAllAsync(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode).get();
tx.commit();
}
checkInvokeAllAsyncResult(cache, resMap, null, value(0), true);
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
resMap = (Map<Object, EntryProcessorResult<Object>>)
cache.invokeAllAsync(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode).get();
tx.commit();
}
checkInvokeAllAsyncResult(cache, resMap, value(0), value(1), true);
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
cache.removeAllAsync(keys).get();
tx.commit();
}
// TODO IGNITE-2973: should be always false.
interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL;
try {
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
resMap = (Map<Object, EntryProcessorResult<Object>>)
cache.invokeAllAsync(keys, INC_ENTRY_PROC_USER_OBJ, dataMode).get();
tx.commit();
}
checkInvokeAllAsyncResult(cache, resMap, null, value(0), false);
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
resMap = (Map<Object, EntryProcessorResult<Object>>)
cache.invokeAllAsync(keys, INC_ENTRY_PROC_USER_OBJ, dataMode).get();
tx.commit();
}
checkInvokeAllAsyncResult(cache, resMap, value(0), value(1), false);
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
cache.removeAllAsync(keys).get();
tx.commit();
}
}
finally {
interceptorBinaryObjExp = true;
}
}
}, PLANE_OBJECT, SERIALIZABLE);
}
/**
* @param cache Cache.
* @param resMap Result map.
* @param expRes Expected result.
* @param cacheVal Expected cache value for key.
* @param deserializeRes Deseriallize result flag.
*/
private void checkInvokeAllAsyncResult(IgniteCache cache, Map<Object, EntryProcessorResult<Object>> resMap,
Object expRes, Object cacheVal, boolean deserializeRes) {
for (Map.Entry<Object, EntryProcessorResult<Object>> e : resMap.entrySet()) {
info("Key: " + e.getKey());
assertTrue("Wrong key type, binary object expected: " + e.getKey(), e.getKey() instanceof BinaryObject);
Object res = e.getValue().get();
// TODO IGNITE-2953: delete the following if when the issue wiil be fixed.
if (deserializeRes)
assertEquals(expRes, deserializeRes ? deserializeBinary(res) : res);
assertEquals(cacheVal, deserializeBinary(cache.getAsync(e.getKey()).get()));
}
}
/**
* @param val Value
* @return User object.
*/
private static Object deserializeBinary(Object val) {
assertTrue("Val: " + val, val instanceof BinaryObject);
return ((BinaryObject)val).deserialize();
}
/**
* @param task Task.
* @throws Exception If failed.
*/
protected void runInAllTxModes(TestRunnable task) throws Exception {
info("Executing implicite tx");
task.run();
if (txShouldBeUsed()) {
for (TransactionConcurrency conc : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) {
info("Executing explicite tx [isolation" + isolation + ", concurrency=" + conc + "]");
task.run();
tx.commit();
}
}
}
}
}
}