blob: 4a67210ca3e56397e34e00084e7c29e489a0aa94 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoaderException;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionOptimisticException;
import org.junit.Ignore;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
*
*/
public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
/** */
private static final boolean FAST = false;
/** */
private static Map<Integer, Integer> storeMap = new ConcurrentHashMap<>();
/** */
private static final int SRVS = 4;
/** */
private static final int CLIENTS = 3;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setPeerClassLoadingEnabled(false);
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGridsMultiThreaded(SRVS);
startClientGridsMultiThreaded(SRVS, CLIENTS);
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 5 * 60_000;
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxStreamerLoad() throws Exception {
txStreamerLoad(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxStreamerLoadAllowOverwrite() throws Exception {
txStreamerLoad(true);
}
/**
* @param allowOverwrite Streamer flag.
* @throws Exception If failed.
*/
private void txStreamerLoad(boolean allowOverwrite) throws Exception {
Ignite ignite0 = ignite(0);
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
if (ccfg.getCacheStoreFactory() == null)
continue;
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
awaitPartitionMapExchange();
List<Integer> keys = testKeys(cache);
for (Integer key : keys)
txStreamerLoad(ignite0, key, cache.getName(), allowOverwrite);
txStreamerLoad(ignite(SRVS), 10_000, cache.getName(), allowOverwrite);
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @param ignite Node.
* @param key Key.
* @param cacheName Cache name.
* @param allowOverwrite Streamer flag.
* @throws Exception If failed.
*/
private void txStreamerLoad(Ignite ignite,
Integer key,
String cacheName,
boolean allowOverwrite) throws Exception {
IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
log.info("Test key: " + key);
Integer loadVal = -1;
IgniteTransactions txs = ignite.transactions();
try (IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(cache.getName())) {
streamer.allowOverwrite(allowOverwrite);
streamer.addData(key, loadVal);
}
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache.get(key);
assertEquals(loadVal, val);
tx.commit();
}
checkValue(key, loadVal, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache.get(key);
assertEquals(loadVal, val);
cache.put(key, 0);
tx.commit();
}
checkValue(key, 0, cache.getName());
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxLoadFromStore() throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
if (ccfg.getCacheStoreFactory() == null)
continue;
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (Integer key : keys) {
log.info("Test key: " + key);
Integer storeVal = -1;
storeMap.put(key, storeVal);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache.get(key);
assertEquals(storeVal, val);
tx.commit();
}
checkValue(key, storeVal, cache.getName());
cache.remove(key);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache.get(key);
assertNull(val);
tx.commit();
}
checkValue(key, null, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxCommitReadOnly1() throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (Integer key : keys) {
log.info("Test key: " + key);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache.get(key);
assertNull(val);
tx.commit();
}
checkValue(key, null, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache.get(key);
assertNull(val);
tx.rollback();
}
checkValue(key, null, cache.getName());
cache.put(key, 1);
cache.remove(key);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache.get(key);
assertNull(val);
tx.commit();
}
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxCommitReadOnly2() throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (final Integer key : keys) {
log.info("Test key: " + key);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache.get(key);
assertNull(val);
txAsync(cache, OPTIMISTIC, SERIALIZABLE,
new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
@Override public Void apply(IgniteCache<Integer, Integer> cache) {
cache.get(key);
return null;
}
}
);
tx.commit();
}
checkValue(key, null, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache.get(key);
assertNull(val);
txAsync(cache, PESSIMISTIC, REPEATABLE_READ,
new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
@Override public Void apply(IgniteCache<Integer, Integer> cache) {
cache.get(key);
return null;
}
}
);
tx.commit();
}
checkValue(key, null, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxCommit() throws Exception {
Ignite ignite0 = ignite(0);
Ignite ignite1 = ignite(1);
final IgniteTransactions txs0 = ignite0.transactions();
final IgniteTransactions txs1 = ignite1.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache0 = ignite0.createCache(ccfg);
IgniteCache<Integer, Integer> cache1 = ignite1.cache(ccfg.getName());
List<Integer> keys = testKeys(cache0);
final int ITERATIONS_COUNT = SF.applyLB(100, 5);
for (Integer key : keys) {
log.info("Test key: " + key);
Integer expVal = null;
for (int i = 0; i < ITERATIONS_COUNT; i++) {
try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache0.get(key);
assertEquals(expVal, val);
cache0.put(key, i);
tx.commit();
expVal = i;
}
try (Transaction tx = txs1.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache1.get(key);
assertEquals(expVal, val);
cache1.put(key, val);
tx.commit();
}
try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache0.get(key);
assertEquals(expVal, val);
cache0.put(key, val);
tx.commit();
}
}
checkValue(key, expVal, cache0.getName());
cache0.remove(key);
try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache0.get(key);
assertNull(val);
cache0.put(key, expVal + 1);
tx.commit();
}
checkValue(key, expVal + 1, cache0.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxRollback() throws Exception {
Ignite ignite0 = ignite(0);
Ignite ignite1 = ignite(1);
final IgniteTransactions txs0 = ignite0.transactions();
final IgniteTransactions txs1 = ignite1.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache0 = ignite0.createCache(ccfg);
IgniteCache<Integer, Integer> cache1 = ignite1.cache(ccfg.getName());
List<Integer> keys = testKeys(cache0);
for (Integer key : keys) {
log.info("Test key: " + key);
Integer expVal = null;
for (int i = 0; i < SF.applyLB(100, 10); i++) {
try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache0.get(key);
assertEquals(expVal, val);
cache0.put(key, i);
tx.rollback();
}
try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache0.get(key);
assertEquals(expVal, val);
cache0.put(key, i);
tx.commit();
expVal = i;
}
try (Transaction tx = txs1.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache1.get(key);
assertEquals(expVal, val);
cache1.put(key, val);
tx.commit();
}
}
checkValue(key, expVal, cache0.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxCommitReadOnlyGetAll() throws Exception {
testTxCommitReadOnlyGetAll(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxCommitReadOnlyGetEntries() throws Exception {
testTxCommitReadOnlyGetAll(true);
}
/**
* @throws Exception If failed.
*/
private void testTxCommitReadOnlyGetAll(boolean needVer) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
Set<Integer> keys = new HashSet<>();
for (int i = 0; i < 100; i++)
keys.add(i);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
if (needVer) {
Collection<CacheEntry<Integer, Integer>> c = cache.getEntries(keys);
assertTrue(c.isEmpty());
}
else {
Map<Integer, Integer> map = cache.getAll(keys);
assertTrue(map.isEmpty());
}
tx.commit();
}
for (Integer key : keys)
checkValue(key, null, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
if (needVer) {
Collection<CacheEntry<Integer, Integer>> c = cache.getEntries(keys);
assertTrue(c.isEmpty());
}
else {
Map<Integer, Integer> map = cache.getAll(keys);
assertTrue(map.isEmpty());
}
tx.rollback();
}
for (Integer key : keys)
checkValue(key, null, cache.getName());
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxCommitReadWriteTwoNodes() throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
Integer key0 = primaryKey(ignite(0).cache(DEFAULT_CACHE_NAME));
Integer key1 = primaryKey(ignite(1).cache(DEFAULT_CACHE_NAME));
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key0, key0);
cache.get(key1);
tx.commit();
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictRead1() throws Exception {
txConflictRead(true, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictRead2() throws Exception {
txConflictRead(false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictReadEntry1() throws Exception {
txConflictRead(true, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictReadEntry2() throws Exception {
txConflictRead(false, true);
}
/**
* @param noVal If {@code true} there is no cache value when read in tx.
* @param needVer If {@code true} then gets entry, otherwise just value.
* @throws Exception If failed.
*/
private void txConflictRead(boolean noVal, boolean needVer) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (Integer key : keys) {
log.info("Test key: " + key);
Integer expVal = null;
if (!noVal) {
expVal = -1;
cache.put(key, expVal);
}
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
if (needVer) {
CacheEntry<Integer, Integer> val = cache.getEntry(key);
assertEquals(expVal, val == null ? null : val.getValue());
}
else {
Integer val = cache.get(key);
assertEquals(expVal, val);
}
updateKey(cache, key, 1);
tx.commit();
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
if (needVer) {
CacheEntry<Integer, Integer> val = cache.getEntry(key);
assertEquals((Integer)1, val.getValue());
}
else {
Object val = cache.get(key);
assertEquals(1, val);
}
tx.commit();
}
checkValue(key, 1, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictReadWrite1() throws Exception {
txConflictReadWrite(true, false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictReadWrite2() throws Exception {
txConflictReadWrite(false, false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictReadRemove1() throws Exception {
txConflictReadWrite(true, true, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictReadRemove2() throws Exception {
txConflictReadWrite(false, true, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictReadEntryWrite1() throws Exception {
txConflictReadWrite(true, false, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictReadEntryWrite2() throws Exception {
txConflictReadWrite(false, false, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictReadEntryRemove1() throws Exception {
txConflictReadWrite(true, true, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictReadEntryRemove2() throws Exception {
txConflictReadWrite(false, true, true);
}
/**
* @param noVal If {@code true} there is no cache value when read in tx.
* @param rmv If {@code true} tests remove, otherwise put.
* @throws Exception If failed.
*/
private void txConflictReadWrite(boolean noVal, boolean rmv, boolean needVer) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (Integer key : keys) {
log.info("Test key: " + key);
Integer expVal = null;
if (!noVal) {
expVal = -1;
cache.put(key, expVal);
}
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
if (needVer) {
CacheEntry<Integer, Integer> val = cache.getEntry(key);
assertEquals(expVal, val == null ? null : val.getValue());
}
else {
Integer val = cache.get(key);
assertEquals(expVal, val);
}
updateKey(cache, key, 1);
if (rmv)
cache.remove(key);
else
cache.put(key, 2);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
if (needVer) {
CacheEntry<Integer, Integer> val = cache.getEntry(key);
assertEquals(1, (Object)val.getValue());
}
else {
Integer val = cache.get(key);
assertEquals(1, (Object)val);
}
if (rmv)
cache.remove(key);
else
cache.put(key, 2);
tx.commit();
}
checkValue(key, rmv ? null : 2, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictReadWrite3() throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> readKeys = new ArrayList<>();
List<Integer> writeKeys = new ArrayList<>();
readKeys.add(primaryKey(cache));
writeKeys.add(primaryKeys(cache, 1, 1000_0000).get(0));
if (ccfg.getBackups() > 0) {
readKeys.add(backupKey(cache));
writeKeys.add(backupKeys(cache, 1, 1000_0000).get(0));
}
if (ccfg.getCacheMode() == PARTITIONED) {
readKeys.add(nearKey(cache));
writeKeys.add(nearKeys(cache, 1, 1000_0000).get(0));
}
try {
for (Integer readKey : readKeys) {
for (Integer writeKey : writeKeys) {
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.get(readKey);
cache.put(writeKey, writeKey);
updateKey(cache, readKey, 0);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException ignored) {
// Expected exception.
}
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.get(readKey);
cache.put(writeKey, writeKey);
tx.commit();
}
}
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed
*/
@Test
public void testTxConflictGetAndPut1() throws Exception {
txConflictGetAndPut(true, false);
}
/**
* @throws Exception If failed
*/
@Test
public void testTxConflictGetAndPut2() throws Exception {
txConflictGetAndPut(false, false);
}
/**
* @throws Exception If failed
*/
@Test
public void testTxConflictGetAndRemove1() throws Exception {
txConflictGetAndPut(true, true);
}
/**
* @throws Exception If failed
*/
@Test
public void testTxConflictGetAndRemove2() throws Exception {
txConflictGetAndPut(false, true);
}
/**
* @param noVal If {@code true} there is no cache value when read in tx.
* @param rmv If {@code true} tests remove, otherwise put.
* @throws Exception If failed.
*/
private void txConflictGetAndPut(boolean noVal, boolean rmv) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (Integer key : keys) {
log.info("Test key: " + key);
Integer expVal = null;
if (!noVal) {
expVal = -1;
cache.put(key, expVal);
}
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = rmv ? cache.getAndRemove(key) : cache.getAndPut(key, 2);
assertEquals(expVal, val);
updateKey(cache, key, 1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object val = rmv ? cache.getAndRemove(key) : cache.getAndPut(key, 2);
assertEquals(1, val);
tx.commit();
}
checkValue(key, rmv ? null : 2, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed
*/
@Test
public void testTxConflictInvoke1() throws Exception {
txConflictInvoke(true, false);
}
/**
* @throws Exception If failed
*/
@Test
public void testTxConflictInvoke2() throws Exception {
txConflictInvoke(false, false);
}
/**
* @throws Exception If failed
*/
@Test
public void testTxConflictInvoke3() throws Exception {
txConflictInvoke(true, true);
}
/**
* @throws Exception If failed
*/
@Test
public void testTxConflictInvoke4() throws Exception {
txConflictInvoke(false, true);
}
/**
* @param noVal If {@code true} there is no cache value when read in tx.
* @param rmv If {@code true} invoke does remove value, otherwise put.
* @throws Exception If failed.
*/
private void txConflictInvoke(boolean noVal, boolean rmv) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (Integer key : keys) {
log.info("Test key: " + key);
Integer expVal = null;
if (!noVal) {
expVal = -1;
cache.put(key, expVal);
}
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache.invoke(key, new SetValueProcessor(rmv ? null : 2));
assertEquals(expVal, val);
updateKey(cache, key, 1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object val = cache.invoke(key, new SetValueProcessor(rmv ? null : 2));
assertEquals(1, val);
tx.commit();
}
checkValue(key, rmv ? null : 2, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed
*/
@Test
public void testTxConflictInvokeAll() throws Exception {
Ignite ignite0 = ignite(0);
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache0 = ignite0.createCache(ccfg);
final Integer key1 = primaryKey(ignite(0).cache(cache0.getName()));
final Integer key2 = primaryKey(ignite(1).cache(cache0.getName()));
Map<Integer, Integer> vals = new HashMap<>();
int newVal = 0;
for (Ignite ignite : G.allGrids()) {
log.info("Test node: " + ignite.name());
IgniteTransactions txs = ignite.transactions();
IgniteCache<Integer, Integer> cache = ignite.cache(cache0.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Map<Integer, EntryProcessorResult<Integer>> res =
cache.invokeAll(F.asSet(key1, key2), new SetValueProcessor(newVal));
if (!vals.isEmpty()) {
EntryProcessorResult<Integer> res1 = res.get(key1);
assertNotNull(res1);
assertEquals(vals.get(key1), res1.get());
EntryProcessorResult<Integer> res2 = res.get(key2);
assertNotNull(res2);
assertEquals(vals.get(key2), res2.get());
}
else
assertTrue(res.isEmpty());
tx.commit();
}
checkValue(key1, newVal, cache.getName());
checkValue(key2, newVal, cache.getName());
vals.put(key1, newVal);
vals.put(key2, newVal);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Map<Integer, EntryProcessorResult<Integer>> res =
cache.invokeAll(F.asSet(key1, key2), new SetValueProcessor(newVal + 1));
EntryProcessorResult<Integer> res1 = res.get(key1);
assertNotNull(res1);
assertEquals(vals.get(key1), res1.get());
EntryProcessorResult<Integer> res2 = res.get(key2);
assertNotNull(res2);
assertEquals(vals.get(key2), res2.get());
updateKey(cache0, key1, -1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key1, -1, cache.getName());
checkValue(key2, newVal, cache.getName());
vals.put(key1, -1);
vals.put(key2, newVal);
newVal++;
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictPutIfAbsent() throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (Integer key : keys) {
log.info("Test key: " + key);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean put = cache.putIfAbsent(key, 2);
assertTrue(put);
updateKey(cache, key, 1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean put = cache.putIfAbsent(key, 2);
assertFalse(put);
tx.commit();
}
checkValue(key, 1, cache.getName());
cache.remove(key);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean put = cache.putIfAbsent(key, 2);
assertTrue(put);
tx.commit();
}
checkValue(key, 2, cache.getName());
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean put = cache.putIfAbsent(key, 2);
assertFalse(put);
updateKey(cache, key, 3);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 3, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictGetAndPutIfAbsent() throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (Integer key : keys) {
log.info("Test key: " + key);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object old = cache.getAndPutIfAbsent(key, 2);
assertNull(old);
updateKey(cache, key, 1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object old = cache.getAndPutIfAbsent(key, 2);
assertEquals(1, old);
tx.commit();
}
checkValue(key, 1, cache.getName());
cache.remove(key);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object old = cache.getAndPutIfAbsent(key, 2);
assertNull(old);
tx.commit();
}
checkValue(key, 2, cache.getName());
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object old = cache.getAndPutIfAbsent(key, 4);
assertEquals(2, old);
updateKey(cache, key, 3);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 3, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictReplace() throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (final Integer key : keys) {
log.info("Test key: " + key);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 2);
assertFalse(replace);
updateKey(cache, key, 1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 2);
assertTrue(replace);
tx.commit();
}
checkValue(key, 2, cache.getName());
cache.remove(key);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 2);
assertFalse(replace);
tx.commit();
}
checkValue(key, null, cache.getName());
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 2);
assertFalse(replace);
updateKey(cache, key, 3);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 3, cache.getName());
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 2);
assertTrue(replace);
txAsync(cache, OPTIMISTIC, SERIALIZABLE,
new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
@Override public Void apply(IgniteCache<Integer, Integer> cache) {
cache.remove(key);
return null;
}
}
);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, null, cache.getName());
cache.put(key, 1);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 2);
assertTrue(replace);
tx.commit();
}
checkValue(key, 2, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictGetAndReplace() throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (final Integer key : keys) {
log.info("Test key: " + key);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object old = cache.getAndReplace(key, 2);
assertNull(old);
updateKey(cache, key, 1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object old = cache.getAndReplace(key, 2);
assertEquals(1, old);
tx.commit();
}
checkValue(key, 2, cache.getName());
cache.remove(key);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object old = cache.getAndReplace(key, 2);
assertNull(old);
tx.commit();
}
checkValue(key, null, cache.getName());
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object old = cache.getAndReplace(key, 2);
assertNull(old);
updateKey(cache, key, 3);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 3, cache.getName());
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object old = cache.getAndReplace(key, 2);
assertEquals(3, old);
txAsync(cache, OPTIMISTIC, SERIALIZABLE,
new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
@Override public Void apply(IgniteCache<Integer, Integer> cache) {
cache.remove(key);
return null;
}
}
);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, null, cache.getName());
cache.put(key, 1);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object old = cache.getAndReplace(key, 2);
assertEquals(1, old);
tx.commit();
}
checkValue(key, 2, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictRemoveWithOldValue() throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (final Integer key : keys) {
log.info("Test key: " + key);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean rmv = cache.remove(key, 2);
assertFalse(rmv);
updateKey(cache, key, 1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean rmv = cache.remove(key, 1);
assertTrue(rmv);
tx.commit();
}
checkValue(key, null, cache.getName());
cache.remove(key);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean rmv = cache.remove(key, 2);
assertFalse(rmv);
tx.commit();
}
checkValue(key, null, cache.getName());
cache.put(key, 2);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean rmv = cache.remove(key, 2);
assertTrue(rmv);
updateKey(cache, key, 3);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 3, cache.getName());
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean rmv = cache.remove(key, 3);
assertTrue(rmv);
txAsync(cache, OPTIMISTIC, SERIALIZABLE,
new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
@Override public Void apply(IgniteCache<Integer, Integer> cache) {
cache.remove(key);
return null;
}
}
);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, null, cache.getName());
cache.put(key, 1);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean rmv = cache.remove(key, 2);
assertFalse(rmv);
tx.commit();
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean rmv = cache.remove(key, 1);
assertTrue(rmv);
tx.commit();
}
checkValue(key, null, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictCasReplace() throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (final Integer key : keys) {
log.info("Test key: " + key);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 1, 2);
assertFalse(replace);
updateKey(cache, key, 1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 1, 2);
assertTrue(replace);
tx.commit();
}
checkValue(key, 2, cache.getName());
cache.remove(key);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 1, 2);
assertFalse(replace);
tx.commit();
}
checkValue(key, null, cache.getName());
cache.put(key, 2);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 2, 1);
assertTrue(replace);
updateKey(cache, key, 3);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, 3, cache.getName());
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 3, 4);
assertTrue(replace);
txAsync(cache, OPTIMISTIC, SERIALIZABLE,
new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
@Override public Void apply(IgniteCache<Integer, Integer> cache) {
cache.remove(key);
return null;
}
}
);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, null, cache.getName());
cache.put(key, 1);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 2, 3);
assertFalse(replace);
tx.commit();
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean replace = cache.replace(key, 1, 3);
assertTrue(replace);
tx.commit();
}
checkValue(key, 3, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictRemoveReturnBoolean1() throws Exception {
txConflictRemoveReturnBoolean(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxConflictRemoveReturnBoolean2() throws Exception {
txConflictRemoveReturnBoolean(true);
}
/**
* @param noVal If {@code true} there is no cache value when do update in tx.
* @throws Exception If failed.
*/
private void txConflictRemoveReturnBoolean(boolean noVal) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (final Integer key : keys) {
log.info("Test key: " + key);
if (!noVal)
cache.put(key, -1);
if (noVal) {
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean res = cache.remove(key);
assertFalse(res);
updateKey(cache, key, -1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, -1, cache.getName());
}
else {
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean res = cache.remove(key);
assertTrue(res);
txAsync(cache, PESSIMISTIC, REPEATABLE_READ,
new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
@Override public Void apply(IgniteCache<Integer, Integer> cache) {
cache.remove(key);
return null;
}
}
);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key, null, cache.getName());
cache.put(key, -1);
}
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean res = cache.remove(key);
assertTrue(res);
updateKey(cache, key, 2);
tx.commit();
}
checkValue(key, null, cache.getName());
// Check no conflict for removeAll with single key.
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.removeAll(Collections.singleton(key));
txAsync(cache, PESSIMISTIC, REPEATABLE_READ,
new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
@Override public Void apply(IgniteCache<Integer, Integer> cache) {
cache.remove(key);
return null;
}
}
);
tx.commit();
}
checkValue(key, null, cache.getName());
cache.put(key, 2);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean res = cache.remove(key);
assertTrue(res);
tx.commit();
}
checkValue(key, null, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean res = cache.remove(key);
assertFalse(res);
tx.commit();
}
checkValue(key, null, cache.getName());
try {
cache.put(key, 1);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object val = cache.get(key);
assertEquals(1, val);
boolean res = cache.remove(key);
assertTrue(res);
updateKey(cache, key, 2);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxNoConflictPut1() throws Exception {
txNoConflictUpdate(true, false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxNoConflictPut2() throws Exception {
txNoConflictUpdate(false, false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxNoConflictPut3() throws Exception {
txNoConflictUpdate(false, false, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxNoConflictRemove1() throws Exception {
txNoConflictUpdate(true, true, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxNoConflictRemove2() throws Exception {
txNoConflictUpdate(false, true, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxNoConflictRemove3() throws Exception {
txNoConflictUpdate(false, true, true);
}
/**
* @throws Exception If failed.
* @param noVal If {@code true} there is no cache value when do update in tx.
* @param rmv If {@code true} tests remove, otherwise put.
* @param getAfterUpdate If {@code true} tries to get value in tx after update.
*/
private void txNoConflictUpdate(boolean noVal, boolean rmv, boolean getAfterUpdate) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (Integer key : keys) {
log.info("Test key: " + key);
if (!noVal)
cache.put(key, -1);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
if (rmv)
cache.remove(key);
else
cache.put(key, 2);
if (getAfterUpdate) {
Object val = cache.get(key);
if (rmv)
assertNull(val);
else
assertEquals(2, val);
}
if (!rmv)
updateKey(cache, key, 1);
tx.commit();
}
checkValue(key, rmv ? null : 2, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key, 3);
tx.commit();
}
checkValue(key, 3, cache.getName());
}
Map<Integer, Integer> map = new HashMap<>();
for (int i = 0; i < 100; i++)
map.put(i, i);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
if (rmv)
cache.removeAll(map.keySet());
else
cache.putAll(map);
if (getAfterUpdate) {
Map<Integer, Integer> res = cache.getAll(map.keySet());
if (rmv) {
for (Integer key : map.keySet())
assertNull(res.get(key));
}
else {
for (Integer key : map.keySet())
assertEquals(map.get(key), res.get(key));
}
}
txAsync(cache, PESSIMISTIC, REPEATABLE_READ,
new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
@Override public Void apply(IgniteCache<Integer, Integer> cache) {
Map<Integer, Integer> map = new HashMap<>();
for (int i = 0; i < 100; i++)
map.put(i, -1);
cache.putAll(map);
return null;
}
}
);
tx.commit();
}
for (int i = 0; i < 100; i++)
checkValue(i, rmv ? null : i, cache.getName());
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxNoConflictContainsKey1() throws Exception {
txNoConflictContainsKey(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxNoConflictContainsKey2() throws Exception {
txNoConflictContainsKey(true);
}
/**
* @param noVal If {@code true} there is no cache value when do update in tx.
* @throws Exception If failed.
*/
private void txNoConflictContainsKey(boolean noVal) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (Integer key : keys) {
log.info("Test key: " + key);
if (!noVal)
cache.put(key, -1);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean res = cache.containsKey(key);
assertEquals(!noVal, res);
updateKey(cache, key, 1);
tx.commit();
}
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean res = cache.containsKey(key);
assertTrue(res);
updateKey(cache, key, 2);
tx.commit();
}
checkValue(key, 2, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean res = cache.containsKey(key);
assertTrue(res);
tx.commit();
}
cache.remove(key);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
boolean res = cache.containsKey(key);
assertFalse(res);
updateKey(cache, key, 3);
tx.commit();
}
checkValue(key, 3, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxRollbackIfLocked1() throws Exception {
Ignite ignite0 = ignite(0);
IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
List<Integer> keys = testKeys(cache);
for (Integer key : keys) {
log.info("Test key: " + key);
CountDownLatch latch = new CountDownLatch(1);
IgniteInternalFuture<?> fut = lockKey(latch, cache, key);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key, 2);
log.info("Commit");
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
latch.countDown();
fut.get();
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key, 2);
tx.commit();
}
checkValue(key, 2, cache.getName());
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxRollbackIfLocked2() throws Exception {
rollbackIfLockedPartialLock(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxRollbackIfLocked3() throws Exception {
rollbackIfLockedPartialLock(true);
}
/**
* @param locKey If {@code true} gets lock for local key.
* @throws Exception If failed.
*/
private void rollbackIfLockedPartialLock(boolean locKey) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
final Integer key1 = primaryKey(ignite(1).cache(cache.getName()));
final Integer key2 = locKey ? primaryKey(cache) : primaryKey(ignite(2).cache(cache.getName()));
CountDownLatch latch = new CountDownLatch(1);
IgniteInternalFuture<?> fut = lockKey(latch, cache, key1);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key1, 2);
cache.put(key2, 2);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
latch.countDown();
fut.get();
checkValue(key1, 1, cache.getName());
checkValue(key2, null, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key1, 2);
cache.put(key2, 2);
tx.commit();
}
checkValue(key1, 2, cache.getName());
checkValue(key2, 2, cache.getName());
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoReadLockConflict() throws Exception {
checkNoReadLockConflict(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoReadLockConflictGetEntry() throws Exception {
checkNoReadLockConflict(true);
}
/**
* @param entry If {@code true} then uses 'getEntry' to read value, otherwise uses 'get'.
* @throws Exception If failed.
*/
private void checkNoReadLockConflict(final boolean entry) throws Exception {
Ignite ignite0 = ignite(0);
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
final AtomicInteger putKey = new AtomicInteger(1_000_000);
ignite0.createCache(ccfg);
CacheConfiguration<Integer, Integer> readCacheCcfg = new CacheConfiguration<>(ccfg);
readCacheCcfg.setName(ccfg.getName() + "-read");
ignite0.createCache(readCacheCcfg);
try {
checkNoReadLockConflict(ignite(0), ccfg.getName(), ccfg.getName(), entry, putKey);
checkNoReadLockConflict(ignite(1), ccfg.getName(), ccfg.getName(), entry, putKey);
checkNoReadLockConflict(ignite(SRVS), ccfg.getName(), ccfg.getName(), entry, putKey);
checkNoReadLockConflict(ignite(0), readCacheCcfg.getName(), ccfg.getName(), entry, putKey);
checkNoReadLockConflict(ignite(1), readCacheCcfg.getName(), ccfg.getName(), entry, putKey);
checkNoReadLockConflict(ignite(SRVS), readCacheCcfg.getName(), ccfg.getName(), entry, putKey);
}
finally {
destroyCache(ccfg.getName());
destroyCache(readCacheCcfg.getName());
}
}
}
/**
* @param ignite Node.
* @param readCacheName Cache name for get.
* @param writeCacheName Cache name for put.
* @param entry If {@code true} then uses 'getEntry' to read value, otherwise uses 'get'.
* @param putKey Write key counter.
* @throws Exception If failed.
*/
private void checkNoReadLockConflict(final Ignite ignite,
String readCacheName,
String writeCacheName,
final boolean entry,
final AtomicInteger putKey) throws Exception {
final int THREADS = 64;
final IgniteCache<Integer, Integer> readCache = ignite.cache(readCacheName);
final IgniteCache<Integer, Integer> writeCache = ignite.cache(writeCacheName);
List<Integer> readKeys = testKeys(readCache);
for (final Integer readKey : readKeys) {
final CyclicBarrier barrier = new CyclicBarrier(THREADS);
readCache.put(readKey, Integer.MIN_VALUE);
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
if (entry)
readCache.get(readKey);
else
readCache.getEntry(readKey);
barrier.await();
writeCache.put(putKey.incrementAndGet(), 0);
tx.commit();
}
return null;
}
}, THREADS, "test-thread");
assertEquals((Integer)Integer.MIN_VALUE, readCache.get(readKey));
readCache.put(readKey, readKey);
assertEquals(readKey, readCache.get(readKey));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoReadLockConflictMultiNode() throws Exception {
Ignite ignite0 = ignite(0);
for (final CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
final AtomicInteger putKey = new AtomicInteger(1_000_000);
ignite0.createCache(ccfg);
try {
final int THREADS = 64;
IgniteCache<Integer, Integer> cache0 = ignite0.cache(ccfg.getName());
List<Integer> readKeys = testKeys(cache0);
for (final Integer readKey : readKeys) {
final CyclicBarrier barrier = new CyclicBarrier(THREADS);
cache0.put(readKey, Integer.MIN_VALUE);
final AtomicInteger idx = new AtomicInteger();
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
Ignite ignite = ignite(idx.incrementAndGet() % (CLIENTS + SRVS));
IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.get(readKey);
barrier.await();
cache.put(putKey.incrementAndGet(), 0);
tx.commit();
}
return null;
}
}, THREADS, "test-thread");
assertEquals((Integer)Integer.MIN_VALUE, cache0.get(readKey));
cache0.put(readKey, readKey);
assertEquals(readKey, cache0.get(readKey));
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("UnnecessaryLocalVariable")
@Test
public void testReadLockPessimisticTxConflict() throws Exception {
Ignite ignite0 = ignite(0);
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
ignite0.createCache(ccfg);
try {
Ignite ignite = ignite0;
IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
Integer writeKey = Integer.MAX_VALUE;
List<Integer> readKeys = testKeys(cache);
for (Integer readKey : readKeys) {
CountDownLatch latch = new CountDownLatch(1);
IgniteInternalFuture<?> fut = lockKey(latch, cache, readKey);
try {
// No conflict for write, conflict with pessimistic tx for read.
try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(writeKey, writeKey);
cache.get(readKey);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
finally {
latch.countDown();
}
fut.get();
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("UnnecessaryLocalVariable")
@Test
public void testReadWriteTxConflict() throws Exception {
Ignite ignite0 = ignite(0);
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
ignite0.createCache(ccfg);
try {
Ignite ignite = ignite0;
IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
Integer writeKey = Integer.MAX_VALUE;
List<Integer> readKeys = testKeys(cache);
for (Integer readKey : readKeys) {
try {
// No conflict for read, conflict for write.
try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.getAndPut(writeKey, writeKey);
cache.get(readKey);
updateKey(cache, writeKey, writeKey + readKey);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
assertEquals((Integer)(writeKey + readKey), cache.get(writeKey));
assertNull(cache.get(readKey));
cache.put(readKey, readKey);
assertEquals(readKey, cache.get(readKey));
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-9226")
@Test
public void testReadWriteTransactionsNoDeadlock() throws Exception {
checkReadWriteTransactionsNoDeadlock(false);
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-9226")
@Test
public void testReadWriteTransactionsNoDeadlockMultinode() throws Exception {
checkReadWriteTransactionsNoDeadlock(true);
}
/**
* @param multiNode Multi-node test flag.
* @throws Exception If failed.
*/
private void checkReadWriteTransactionsNoDeadlock(final boolean multiNode) throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-9226");
final Ignite ignite0 = ignite(0);
for (final CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
ignite0.createCache(ccfg);
try {
final long stopTime = U.currentTimeMillis() + 10_000;
final AtomicInteger idx = new AtomicInteger();
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
Ignite ignite = multiNode ? ignite(idx.incrementAndGet() % (SRVS + CLIENTS)) : ignite0;
IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (U.currentTimeMillis() < stopTime) {
try {
try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
for (int i = 0; i < 10; i++) {
Integer key = rnd.nextInt(30);
if (rnd.nextBoolean())
cache.get(key);
else
cache.put(key, key);
}
tx.commit();
}
}
catch (TransactionOptimisticException ignore) {
// No-op.
}
}
return null;
}
}, 32, "test-thread");
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testReadWriteAccountTx() throws Exception {
final CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED,
FULL_SYNC,
1,
false,
false);
ignite(0).createCache(ccfg);
try {
final int ACCOUNTS = SF.applyLB(50, 5);
final int VAL_PER_ACCOUNT = SF.applyLB(1000, 10);
IgniteCache<Integer, Account> cache0 = ignite(0).cache(ccfg.getName());
final Set<Integer> keys = new HashSet<>();
for (int i = 0; i < ACCOUNTS; i++) {
cache0.put(i, new Account(VAL_PER_ACCOUNT));
keys.add(i);
}
final List<Ignite> clients = clients();
final AtomicBoolean stop = new AtomicBoolean();
final AtomicInteger idx = new AtomicInteger();
IgniteInternalFuture<?> readFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
try {
int threadIdx = idx.getAndIncrement();
int nodeIdx = threadIdx % (SRVS + CLIENTS);
Ignite node = ignite(nodeIdx);
IgniteCache<Integer, Account> cache = node.cache(ccfg.getName());
IgniteTransactions txs = node.transactions();
Integer putKey = ACCOUNTS + threadIdx;
while (!stop.get()) {
int sum;
while (true) {
sum = 0;
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Map<Integer, Account> data = cache.getAll(keys);
for (int i = 0; i < ACCOUNTS; i++) {
Account account = data.get(i);
assertNotNull(account);
sum += account.value();
}
if (ThreadLocalRandom.current().nextBoolean())
cache.put(putKey, new Account(sum));
tx.commit();
}
catch (TransactionOptimisticException ignored) {
continue;
}
break;
}
assertEquals(ACCOUNTS * VAL_PER_ACCOUNT, sum);
}
return null;
}
catch (Throwable e) {
stop.set(true);
log.error("Unexpected error: " + e);
throw e;
}
}
}, (SRVS + CLIENTS) * 2, "update-thread");
IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
try {
int nodeIdx = idx.getAndIncrement() % clients.size();
Ignite node = clients.get(nodeIdx);
IgniteCache<Integer, Account> cache = node.cache(ccfg.getName());
IgniteTransactions txs = node.transactions();
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (!stop.get()) {
int id1 = rnd.nextInt(ACCOUNTS);
int id2 = rnd.nextInt(ACCOUNTS);
while (id2 == id1)
id2 = rnd.nextInt(ACCOUNTS);
while (true) {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Account a1 = cache.get(id1);
Account a2 = cache.get(id2);
assertNotNull(a1);
assertNotNull(a2);
if (a1.value() > 0) {
a1 = new Account(a1.value() - 1);
a2 = new Account(a2.value() + 1);
}
cache.put(id1, a1);
cache.put(id2, a2);
tx.commit();
}
catch (TransactionOptimisticException ignored) {
continue;
}
break;
}
}
return null;
}
catch (Throwable e) {
stop.set(true);
log.error("Unexpected error: " + e);
throw e;
}
}
}, 2, "update-thread");
try {
U.sleep(SF.applyLB(15_000, 2_000));
}
finally {
stop.set(true);
}
readFut.get();
updateFut.get();
int sum = 0;
for (int i = 0; i < ACCOUNTS; i++) {
Account a = cache0.get(i);
assertNotNull(a);
assertTrue(a.value() >= 0);
log.info("Account: " + a.value());
sum += a.value();
}
assertEquals(ACCOUNTS * VAL_PER_ACCOUNT, sum);
}
finally {
ignite(0).destroyCache(ccfg.getName());
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testNearCacheReaderUpdate() throws Exception {
Ignite ignite0 = ignite(0);
IgniteCache<Integer, Integer> cache0 =
ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
final String cacheName = cache0.getName();
try {
Ignite client1 = ignite(SRVS);
Ignite client2 = ignite(SRVS + 1);
IgniteCache<Integer, Integer> cache1 = client1.createNearCache(cacheName,
new NearCacheConfiguration<Integer, Integer>());
IgniteCache<Integer, Integer> cache2 = client2.createNearCache(cacheName,
new NearCacheConfiguration<Integer, Integer>());
Integer key = primaryKey(ignite(0).cache(cacheName));
try (Transaction tx = client1.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
assertNull(cache1.get(key));
cache1.put(key, 1);
tx.commit();
}
try (Transaction tx = client2.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
assertEquals(1, (Object)cache2.get(key));
cache2.put(key, 2);
tx.commit();
}
try (Transaction tx = client1.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
assertEquals(2, (Object)cache1.get(key));
cache1.put(key, 3);
tx.commit();
}
}
finally {
destroyCache(cacheName);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testRollbackNearCache1() throws Exception {
rollbackNearCacheWrite(true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRollbackNearCache2() throws Exception {
rollbackNearCacheWrite(false);
}
/**
* @param near If {@code true} locks entry using the same near cache.
* @throws Exception If failed.
*/
private void rollbackNearCacheWrite(boolean near) throws Exception {
Ignite ignite0 = ignite(0);
IgniteCache<Integer, Integer> cache0 =
ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
final String cacheName = cache0.getName();
try {
Ignite ignite = ignite(SRVS);
IgniteCache<Integer, Integer> cache = ignite.createNearCache(cacheName,
new NearCacheConfiguration<Integer, Integer>());
IgniteTransactions txs = ignite.transactions();
Integer key1 = primaryKey(ignite(0).cache(cacheName));
Integer key2 = primaryKey(ignite(1).cache(cacheName));
Integer key3 = primaryKey(ignite(2).cache(cacheName));
CountDownLatch latch = new CountDownLatch(1);
IgniteInternalFuture<?> fut = null;
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key1, key1);
cache.put(key2, key2);
cache.put(key3, key3);
fut = lockKey(latch, near ? cache : cache0, key2);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
latch.countDown();
assert fut != null;
fut.get();
checkValue(key1, null, cacheName);
checkValue(key2, 1, cacheName);
checkValue(key3, null, cacheName);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key1, key1);
cache.put(key2, key2);
cache.put(key3, key3);
tx.commit();
}
checkValue(key1, key1, cacheName);
checkValue(key2, key2, cacheName);
checkValue(key3, key3, cacheName);
}
finally {
destroyCache(cacheName);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testRollbackNearCache3() throws Exception {
rollbackNearCacheRead(true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRollbackNearCache4() throws Exception {
rollbackNearCacheRead(false);
}
/**
* @param near If {@code true} updates entry using the same near cache.
* @throws Exception If failed.
*/
private void rollbackNearCacheRead(boolean near) throws Exception {
Ignite ignite0 = ignite(0);
IgniteCache<Integer, Integer> cache0 =
ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
final String cacheName = cache0.getName();
try {
Ignite ignite = ignite(SRVS);
IgniteCache<Integer, Integer> cache = ignite.createNearCache(cacheName,
new NearCacheConfiguration<Integer, Integer>());
IgniteTransactions txs = ignite.transactions();
Integer key1 = primaryKey(ignite(0).cache(cacheName));
Integer key2 = primaryKey(ignite(1).cache(cacheName));
Integer key3 = primaryKey(ignite(2).cache(cacheName));
cache0.put(key1, -1);
cache0.put(key2, -1);
cache0.put(key3, -1);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.get(key1);
cache.get(key2);
cache.get(key3);
updateKey(near ? cache : cache0, key2, -2);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key1, -1, cacheName);
checkValue(key2, -2, cacheName);
checkValue(key3, -1, cacheName);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key1, key1);
cache.put(key2, key2);
cache.put(key3, key3);
tx.commit();
}
checkValue(key1, key1, cacheName);
checkValue(key2, key2, cacheName);
checkValue(key3, key3, cacheName);
}
finally {
destroyCache(cacheName);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testCrossCacheTx() throws Exception {
Ignite ignite0 = ignite(0);
final String CACHE1 = "cache1";
final String CACHE2 = "cache2";
try {
CacheConfiguration<Integer, Integer> ccfg1 =
cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false);
ccfg1.setName(CACHE1);
ignite0.createCache(ccfg1);
CacheConfiguration<Integer, Integer> ccfg2 =
cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false);
ccfg2.setName(CACHE2);
ignite0.createCache(ccfg2);
Integer newVal = 0;
List<Integer> keys = testKeys(ignite0.<Integer, Integer>cache(CACHE1));
for (Ignite ignite : G.allGrids()) {
log.info("Test node: " + ignite.name());
IgniteCache<Integer, Integer> cache1 = ignite.cache(CACHE1);
IgniteCache<Integer, Integer> cache2 = ignite.cache(CACHE2);
IgniteTransactions txs = ignite.transactions();
for (Integer key : keys) {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache1.put(key, newVal);
cache2.put(key, newVal);
tx.commit();
}
checkValue(key, newVal, CACHE1);
checkValue(key, newVal, CACHE2);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object val1 = cache1.get(key);
Object val2 = cache2.get(key);
assertEquals(newVal, val1);
assertEquals(newVal, val2);
tx.commit();
}
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache1.put(key, newVal + 1);
cache2.put(key, newVal + 1);
tx.rollback();
}
checkValue(key, newVal, CACHE1);
checkValue(key, newVal, CACHE2);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object val1 = cache1.get(key);
Object val2 = cache2.get(key);
assertEquals(newVal, val1);
assertEquals(newVal, val2);
cache1.put(key, newVal + 1);
cache2.put(key, newVal + 1);
tx.commit();
}
newVal++;
checkValue(key, newVal, CACHE1);
checkValue(key, newVal, CACHE2);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache1.put(key, newVal);
cache2.put(-key, newVal);
tx.commit();
}
checkValue(key, newVal, CACHE1);
checkValue(-key, null, CACHE1);
checkValue(key, newVal, CACHE2);
checkValue(-key, newVal, CACHE2);
}
newVal++;
Integer key1 = primaryKey(ignite(0).cache(CACHE1));
Integer key2 = primaryKey(ignite(1).cache(CACHE1));
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache1.put(key1, newVal);
cache1.put(key2, newVal);
cache2.put(key1, newVal);
cache2.put(key2, newVal);
tx.commit();
}
checkValue(key1, newVal, CACHE1);
checkValue(key2, newVal, CACHE1);
checkValue(key1, newVal, CACHE2);
checkValue(key2, newVal, CACHE2);
CountDownLatch latch = new CountDownLatch(1);
IgniteInternalFuture<?> fut = lockKey(latch, cache1, key1);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache1.put(key1, newVal + 1);
cache2.put(key1, newVal + 1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
latch.countDown();
fut.get();
checkValue(key1, 1, CACHE1);
checkValue(key1, newVal, CACHE2);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache1.put(key1, newVal + 1);
cache2.put(key1, newVal + 1);
tx.commit();
}
newVal++;
cache1.put(key2, newVal);
cache2.put(key2, newVal);
checkValue(key1, newVal, CACHE1);
checkValue(key1, newVal, CACHE2);
latch = new CountDownLatch(1);
fut = lockKey(latch, cache1, key1);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache1.put(key1, newVal + 1);
cache2.put(key2, newVal + 1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
latch.countDown();
fut.get();
checkValue(key1, 1, CACHE1);
checkValue(key2, newVal, CACHE2);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object val1 = cache1.get(key1);
Object val2 = cache2.get(key2);
assertEquals(1, val1);
assertEquals(newVal, val2);
updateKey(cache2, key2, 1);
cache1.put(key1, newVal + 1);
cache2.put(key2, newVal + 1);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
checkValue(key1, 1, CACHE1);
checkValue(key2, 1, CACHE2);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object val1 = cache1.get(key1);
Object val2 = cache2.get(key2);
assertEquals(1, val1);
assertEquals(1, val2);
cache1.put(key1, newVal + 1);
cache2.put(key2, newVal + 1);
tx.commit();
}
newVal++;
checkValue(key1, newVal, CACHE1);
checkValue(key2, newVal, CACHE2);
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object val1 = cache1.get(key1);
Object val2 = cache2.get(key2);
assertEquals(newVal, val1);
assertEquals(newVal, val2);
updateKey(cache2, key2, newVal);
tx.commit();
}
fail();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
}
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Object val1 = cache1.get(key1);
Object val2 = cache2.get(key2);
assertEquals(newVal, val1);
assertEquals(newVal, val2);
tx.commit();
}
}
}
finally {
destroyCache(CACHE1);
destroyCache(CACHE2);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testRandomOperations() throws Exception {
Ignite ignite0 = ignite(0);
long stopTime = U.currentTimeMillis() + getTestTimeout() - 30_000;
for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
logCacheInfo(ccfg);
try {
IgniteCache<Integer, Integer> cache0 = ignite0.createCache(ccfg);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (Ignite ignite : G.allGrids()) {
log.info("Test node: " + ignite.name());
IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
IgniteTransactions txs = ignite.transactions();
final int KEYS = SF.apply(100);
for (int i = 0; i < SF.apply(1000); i++) {
Integer key1 = rnd.nextInt(KEYS);
Integer key2;
if (rnd.nextBoolean()) {
key2 = rnd.nextInt(KEYS);
while (key2.equals(key1))
key2 = rnd.nextInt(KEYS);
}
else
key2 = key1 + 1;
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
randomOperation(rnd, cache, key1);
randomOperation(rnd, cache, key2);
tx.commit();
}
if (i % 100 == 0 && U.currentTimeMillis() > stopTime)
break;
}
for (int key = 0; key < KEYS; key++) {
Integer val = cache0.get(key);
for (int node = 1; node < SRVS + CLIENTS; node++)
assertEquals(val, ignite(node).cache(cache.getName()).get(key));
}
if (U.currentTimeMillis() > stopTime)
break;
}
}
finally {
destroyCache(ccfg.getName());
}
}
}
/**
* @param rnd Random.
* @param cache Cache.
* @param key Key.
*/
private void randomOperation(ThreadLocalRandom rnd, IgniteCache<Integer, Integer> cache, Integer key) {
switch (rnd.nextInt(4)) {
case 0:
cache.put(key, rnd.nextInt());
break;
case 1:
cache.remove(key);
break;
case 2:
cache.invoke(key, new SetValueProcessor(rnd.nextBoolean() ? 1 : null));
break;
case 3:
cache.get(key);
break;
default:
assert false;
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testIncrementTxRestart() throws Exception {
incrementTx(false, false, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIncrementTx1() throws Exception {
incrementTx(false, false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIncrementTx2() throws Exception {
incrementTx(false, true, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIncrementTxNearCache1() throws Exception {
incrementTx(true, false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIncrementTxNearCache2() throws Exception {
incrementTx(true, true, false);
}
/**
* @param nearCache If {@code true} near cache is enabled.
* @param store If {@code true} cache store is enabled.
* @param restart If {@code true} restarts one node.
* @throws Exception If failed.
*/
private void incrementTx(boolean nearCache, boolean store, final boolean restart) throws Exception {
final Ignite srv = ignite(1);
CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, store, false);
final List<Ignite> clients = clients();
final String cacheName = srv.createCache(ccfg).getName();
final AtomicBoolean stop = new AtomicBoolean();
try {
final List<IgniteCache<Integer, Integer>> caches = new ArrayList<>();
for (Ignite client : clients) {
if (nearCache)
caches.add(client.createNearCache(cacheName, new NearCacheConfiguration<Integer, Integer>()));
else
caches.add(client.<Integer, Integer>cache(cacheName));
}
IgniteInternalFuture<?> restartFut = restart ? restartFuture(stop, null) : null;
final long stopTime = U.currentTimeMillis() + getTestTimeout() - 30_000;
for (int i = 0; i < SF.apply(30); i++) {
final AtomicInteger cntr = new AtomicInteger();
final Integer key = i;
final AtomicInteger threadIdx = new AtomicInteger();
final int THREADS = SF.applyLB(10, 2);
final CyclicBarrier barrier = new CyclicBarrier(THREADS);
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int idx = threadIdx.getAndIncrement() % caches.size();
IgniteCache<Integer, Integer> cache = caches.get(idx);
Ignite ignite = cache.unwrap(Ignite.class);
IgniteTransactions txs = ignite.transactions();
log.info("Started update thread: " + ignite.name());
barrier.await();
for (int i = 0; i < SF.apply(1000); i++) {
if (i % 100 == 0 && U.currentTimeMillis() > stopTime)
break;
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache.get(key);
cache.put(key, val == null ? 1 : val + 1);
tx.commit();
}
cntr.incrementAndGet();
}
catch (TransactionOptimisticException ignore) {
// Retry.
}
catch (IgniteException | CacheException e) {
assertTrue("Unexpected exception [err=" + e + ", cause=" + e.getCause() + ']',
restart && X.hasCause(e, ClusterTopologyCheckedException.class));
}
}
return null;
}
}, THREADS, "update-thread").get();
log.info("Iteration [iter=" + i + ", val=" + cntr.get() + ']');
assertTrue(cntr.get() > 0);
checkValue(key, cntr.get(), cacheName, restart);
if (U.currentTimeMillis() > stopTime)
break;
}
stop.set(true);
if (restartFut != null)
restartFut.get();
}
finally {
stop.set(true);
destroyCache(cacheName);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testIncrementTxMultipleNodeRestart() throws Exception {
incrementTxMultiple(false, false, true);
}
/**
* @param nearCache If {@code true} near cache is enabled.
* @param store If {@code true} cache store is enabled.
* @param restart If {@code true} restarts one node.
* @throws Exception If failed.
*/
private void incrementTxMultiple(boolean nearCache, boolean store, final boolean restart) throws Exception {
final Ignite srv = ignite(1);
CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, store, false);
final List<Ignite> clients = clients();
final String cacheName = srv.createCache(ccfg).getName();
final AtomicBoolean stop = new AtomicBoolean();
try {
final List<IgniteCache<Integer, Integer>> caches = new ArrayList<>();
for (Ignite client : clients) {
if (nearCache)
caches.add(client.createNearCache(cacheName, new NearCacheConfiguration<Integer, Integer>()));
else
caches.add(client.<Integer, Integer>cache(cacheName));
}
IgniteInternalFuture<?> restartFut = restart ? restartFuture(stop, null) : null;
for (int i = 0; i < 20; i += 2) {
final AtomicInteger cntr = new AtomicInteger();
final Integer key1 = i;
final Integer key2 = i + 1;
final AtomicInteger threadIdx = new AtomicInteger();
final int THREADS = 10;
final CyclicBarrier barrier = new CyclicBarrier(THREADS);
final ConcurrentSkipListSet<Integer> vals1 = new ConcurrentSkipListSet<>();
final ConcurrentSkipListSet<Integer> vals2 = new ConcurrentSkipListSet<>();
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int idx = threadIdx.getAndIncrement() % caches.size();
IgniteCache<Integer, Integer> cache = caches.get(idx);
Ignite ignite = cache.unwrap(Ignite.class);
IgniteTransactions txs = ignite.transactions();
log.info("Started update thread: " + ignite.name());
barrier.await();
final int ITERATIONS_COUNT = SF.applyLB(1000, 50);
for (int i = 0; i < ITERATIONS_COUNT; i++) {
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val1 = cache.get(key1);
Integer val2 = cache.get(key2);
Integer newVal1 = val1 == null ? 1 : val1 + 1;
Integer newVal2 = val2 == null ? 1 : val2 + 1;
cache.put(key1, newVal1);
cache.put(key2, newVal2);
tx.commit();
assertTrue(vals1.add(newVal1));
assertTrue(vals2.add(newVal2));
}
cntr.incrementAndGet();
}
catch (TransactionOptimisticException ignore) {
// Retry.
}
catch (IgniteException | CacheException e) {
assertTrue("Unexpected exception [err=" + e + ", cause=" + e.getCause() + ']',
restart && X.hasCause(e, ClusterTopologyCheckedException.class));
}
}
return null;
}
}, THREADS, "update-thread").get();
log.info("Iteration [iter=" + i + ", val=" + cntr.get() + ']');
assertTrue(cntr.get() > 0);
checkValue(key1, cntr.get(), cacheName, restart);
checkValue(key2, cntr.get(), cacheName, restart);
}
stop.set(true);
if (restartFut != null)
restartFut.get();
}
finally {
stop.set(true);
destroyCache(cacheName);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetRemoveTx() throws Exception {
getRemoveTx(false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetRemoveTxNearCache1() throws Exception {
getRemoveTx(true, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetRemoveTxNearCache2() throws Exception {
getRemoveTx(true, true);
}
/**
* @param nearCache If {@code true} near cache is enabled.
* @param store If {@code true} cache store is enabled.
* @throws Exception If failed.
*/
private void getRemoveTx(boolean nearCache, boolean store) throws Exception {
long stopTime = U.currentTimeMillis() + getTestTimeout() - 30_000;
final Ignite ignite0 = ignite(0);
CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, store, false);
final List<Ignite> clients = clients();
final String cacheName = ignite0.createCache(ccfg).getName();
try {
final List<IgniteCache<Integer, Integer>> caches = new ArrayList<>();
for (Ignite client : clients) {
if (nearCache)
caches.add(client.createNearCache(cacheName, new NearCacheConfiguration<Integer, Integer>()));
else
caches.add(client.<Integer, Integer>cache(cacheName));
}
for (int i = 0; i < SF.apply(100); i++) {
if (U.currentTimeMillis() > stopTime)
break;
final AtomicInteger cntr = new AtomicInteger();
final Integer key = i;
final AtomicInteger threadIdx = new AtomicInteger();
final int THREADS = SF.applyLB(10, 2);
final CyclicBarrier barrier = new CyclicBarrier(THREADS);
final IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int thread = threadIdx.getAndIncrement();
int idx = thread % caches.size();
IgniteCache<Integer, Integer> cache = caches.get(idx);
Ignite ignite = cache.unwrap(Ignite.class);
IgniteTransactions txs = ignite.transactions();
log.info("Started update thread: " + ignite.name());
Thread.currentThread().setName("update-thread-" + ignite.name() + "-" + thread);
barrier.await();
for (int i = 0; i < SF.apply(50); i++) {
while (true) {
try {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
boolean rmv = rnd.nextInt(3) == 0;
Integer val;
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
val = cache.get(key);
if (rmv)
cache.remove(key);
else
cache.put(key, val == null ? 1 : val + 1);
tx.commit();
if (rmv) {
if (val != null)
cntr.getAndUpdate(x -> x - val);
}
else
cntr.incrementAndGet();
}
break;
}
catch (TransactionOptimisticException ignore) {
// Retry.
}
}
}
return null;
}
}, THREADS, "update-thread");
updateFut.get();
Integer val = cntr.get();
log.info("Iteration [iter=" + i + ", val=" + val + ']');
checkValue(key, val == 0 ? null : val, cacheName);
}
}
finally {
destroyCache(cacheName);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testAccountTx1() throws Exception {
accountTx(false, false, false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testAccountTx2() throws Exception {
accountTx(true, false, false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testAccountTxWithNonSerializable() throws Exception {
accountTx(false, false, true, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testAccountTxNearCache() throws Exception {
accountTx(false, true, false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testAccountTxNodeRestart() throws Exception {
accountTx(false, false, false, true);
}
/**
* @param getAll If {@code true} uses getAll/putAll in transaction.
* @param nearCache If {@code true} near cache is enabled.
* @param nonSer If {@code true} starts threads executing non-serializable transactions.
* @param restart If {@code true} restarts one node.
* @throws Exception If failed.
*/
private void accountTx(final boolean getAll,
final boolean nearCache,
final boolean nonSer,
final boolean restart) throws Exception {
final Ignite srv = ignite(1);
CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false);
final String cacheName = srv.createCache(ccfg).getName();
try {
final List<Ignite> clients = clients();
final int ACCOUNTS = SF.applyLB(100, 10);
final int VAL_PER_ACCOUNT = SF.applyLB(10_000, 50);
IgniteCache<Integer, Account> srvCache = srv.cache(cacheName);
for (int i = 0; i < ACCOUNTS; i++)
srvCache.put(i, new Account(VAL_PER_ACCOUNT));
final AtomicInteger idx = new AtomicInteger();
final int THREADS = SF.applyLB(20, 5);
final long testTime = SF.applyLB(30_000, 5_000);
final long stopTime = System.currentTimeMillis() + testTime;
IgniteInternalFuture<?> nonSerFut = null;
if (nonSer) {
nonSerFut = runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int nodeIdx = idx.getAndIncrement() % clients.size();
Ignite node = clients.get(nodeIdx);
Thread.currentThread().setName("update-pessimistic-" + node.name());
log.info("Pessimistic tx thread: " + node.name());
final IgniteTransactions txs = node.transactions();
final IgniteCache<Integer, Account> cache =
nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<Integer, Account>()) :
node.<Integer, Account>cache(cacheName);
assertNotNull(cache);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (U.currentTimeMillis() < stopTime) {
int id1 = rnd.nextInt(ACCOUNTS);
int id2 = rnd.nextInt(ACCOUNTS);
while (id2 == id1)
id2 = rnd.nextInt(ACCOUNTS);
if (id1 > id2) {
int tmp = id1;
id1 = id2;
id2 = tmp;
}
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
Account a1 = cache.get(id1);
Account a2 = cache.get(id2);
assertNotNull(a1);
assertNotNull(a2);
if (a1.value() > 0) {
a1 = new Account(a1.value() - 1);
a2 = new Account(a2.value() + 1);
}
cache.put(id1, a1);
cache.put(id2, a2);
tx.commit();
}
}
return null;
}
}, 10, "non-ser-thread");
}
final IgniteInternalFuture<?> fut = runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int nodeIdx = idx.getAndIncrement() % clients.size();
Ignite node = clients.get(nodeIdx);
Thread.currentThread().setName("update-" + node.name());
log.info("Tx thread: " + node.name());
final IgniteTransactions txs = node.transactions();
final IgniteCache<Integer, Account> cache =
nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<Integer, Account>()) :
node.<Integer, Account>cache(cacheName);
assertNotNull(cache);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (U.currentTimeMillis() < stopTime) {
int id1 = rnd.nextInt(ACCOUNTS);
int id2 = rnd.nextInt(ACCOUNTS);
while (id2 == id1)
id2 = rnd.nextInt(ACCOUNTS);
try {
while (true) {
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
if (getAll) {
Map<Integer, Account> map = cache.getAll(F.asSet(id1, id2));
Account a1 = cache.get(id1);
Account a2 = cache.get(id2);
assertNotNull(a1);
assertNotNull(a2);
if (a1.value() > 0) {
a1 = new Account(a1.value() - 1);
a2 = new Account(a2.value() + 1);
}
map.put(id1, a1);
map.put(id2, a2);
cache.putAll(map);
}
else {
Account a1 = cache.get(id1);
Account a2 = cache.get(id2);
assertNotNull(a1);
assertNotNull(a2);
if (a1.value() > 0) {
a1 = new Account(a1.value() - 1);
a2 = new Account(a2.value() + 1);
}
cache.put(id1, a1);
cache.put(id2, a2);
}
tx.commit();
}
break;
}
catch (TransactionOptimisticException ignore) {
// Retry.
}
catch (IgniteException | CacheException e) {
assertTrue("Unexpected exception [err=" + e + ", cause=" + e.getCause() + ']',
restart && X.hasCause(e, ClusterTopologyCheckedException.class));
}
}
}
catch (Throwable e) {
log.error("Unexpected error: " + e, e);
throw e;
}
}
return null;
}
}, THREADS, "tx-thread");
IgniteInternalFuture<?> restartFut = restart ? restartFuture(null, fut) : null;
fut.get(testTime + 30_000);
if (nonSerFut != null)
nonSerFut.get();
if (restartFut != null)
restartFut.get();
int sum = 0;
for (int i = 0; i < ACCOUNTS; i++) {
Account a = srvCache.get(i);
assertNotNull(a);
assertTrue(a.value() >= 0);
log.info("Account: " + a.value());
sum += a.value();
}
assertEquals(ACCOUNTS * VAL_PER_ACCOUNT, sum);
for (int node = 0; node < SRVS + CLIENTS; node++) {
log.info("Verify node: " + node);
Ignite ignite = ignite(node);
IgniteCache<Integer, Account> cache = ignite.cache(cacheName);
sum = 0;
try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
Map<Integer, Account> map = new HashMap<>();
for (int i = 0; i < ACCOUNTS; i++) {
Account a = cache.get(i);
assertNotNull(a);
map.put(i, a);
sum += a.value();
}
Account a1 = map.get(0);
Account a2 = map.get(1);
if (a1.value() > 0) {
a1 = new Account(a1.value() - 1);
a2 = new Account(a2.value() + 1);
map.put(0, a1);
map.put(1, a2);
}
cache.putAll(map);
tx.commit();
}
assertEquals(ACCOUNTS * VAL_PER_ACCOUNT, sum);
}
}
finally {
destroyCache(cacheName);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoOptimisticExceptionOnChangingTopology() throws Exception {
if (FAST)
return;
final AtomicBoolean finished = new AtomicBoolean();
final List<String> cacheNames = new ArrayList<>();
Ignite srv = ignite(1);
try {
{
CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false);
ccfg.setName("cache1");
ccfg.setRebalanceMode(SYNC);
srv.createCache(ccfg);
cacheNames.add(ccfg.getName());
}
{
// Store enabled.
CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false);
ccfg.setName("cache2");
ccfg.setRebalanceMode(SYNC);
srv.createCache(ccfg);
cacheNames.add(ccfg.getName());
}
{
// Eviction.
CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false);
ccfg.setName("cache3");
ccfg.setRebalanceMode(SYNC);
LruEvictionPolicy plc = new LruEvictionPolicy();
plc.setMaxSize(100);
ccfg.setEvictionPolicy(plc);
ccfg.setOnheapCacheEnabled(true);
srv.createCache(ccfg);
cacheNames.add(ccfg.getName());
}
IgniteInternalFuture<?> restartFut = restartFuture(finished, null);
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
final int KEYS_PER_THREAD = 100;
for (int i = 1; i < SRVS + CLIENTS; i++) {
final Ignite node = ignite(i);
final int minKey = i * KEYS_PER_THREAD;
final int maxKey = minKey + KEYS_PER_THREAD;
// Threads update non-intersecting keys, optimistic exception should not be thrown.
futs.add(GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
try {
log.info("Started update thread [node=" + node.name() +
", minKey=" + minKey +
", maxKey=" + maxKey + ']');
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
List<IgniteCache<Integer, Integer>> caches = new ArrayList<>();
for (String cacheName : cacheNames)
caches.add(node.<Integer, Integer>cache(cacheName));
assertEquals(3, caches.size());
int iter = 0;
while (!finished.get()) {
int keyCnt = rnd.nextInt(1, 10);
final Set<Integer> keys = new LinkedHashSet<>();
while (keys.size() < keyCnt)
keys.add(rnd.nextInt(minKey, maxKey));
for (final IgniteCache<Integer, Integer> cache : caches) {
doInTransaction(node, OPTIMISTIC, SERIALIZABLE, new Callable<Void>() {
@Override public Void call() throws Exception {
for (Integer key : keys)
randomOperation(rnd, cache, key);
return null;
}
});
}
if (iter % 100 == 0)
log.info("Iteration: " + iter);
iter++;
}
return null;
}
catch (Throwable e) {
log.error("Unexpected error: " + e, e);
throw e;
}
}
}, "update-thread-" + i));
}
U.sleep(SF.applyLB(60_000, 5_000));
finished.set(true);
restartFut.get();
for (IgniteInternalFuture<?> fut : futs)
fut.get();
}
finally {
finished.set(true);
for (String cacheName : cacheNames)
destroyCache(cacheName);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testConflictResolution() throws Exception {
final Ignite ignite = ignite(0);
final String cacheName =
ignite.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
try {
final Map<Integer, Integer> keys = new HashMap<>();
for (int i = 0; i < 500; i++)
keys.put(i, i);
final int THREADS = 5;
for (int i = 0; i < 10; i++) {
final CyclicBarrier barrier = new CyclicBarrier(THREADS);
final AtomicInteger commitCntr = new AtomicInteger(0);
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.putAll(keys);
barrier.await();
tx.commit();
commitCntr.incrementAndGet();
}
catch (TransactionOptimisticException e) {
log.info("Optimistic error: " + e);
}
return null;
}
}, THREADS, "update-thread").get();
int commits = commitCntr.get();
log.info("Iteration [iter=" + i + ", commits=" + commits + ']');
assertTrue(commits > 0);
}
}
finally {
destroyCache(cacheName);
}
}
/**
* Multithreaded transactional reads.
*
* @throws Exception If failed.
*/
@Test
public void testMultipleOptimisticRead() throws Exception {
final Ignite ignite = ignite(0);
final Integer key = 1;
final Integer val = 1;
final int THREADS_CNT = 50;
final String cacheName =
ignite.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
try {
final IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key, val);
tx.commit();
}
assertTrue(cache.get(key).equals(val));
for (int i = 0; i < 10; i++) {
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
assertTrue(cache.get(key).equals(val));
tx.commit();
}
return null;
}
}, THREADS_CNT, "multiple-reads-thread").get();
}
}
finally {
destroyCache(cacheName);
}
}
/**
* Transactional read in parallel with changing the same data.
*
* @throws Exception If failed.
*/
@Test
public void testTxReadInParallerTxWrite() throws Exception {
final Ignite ignite = ignite(0);
final Integer key = 1;
final Integer val = 1;
final CountDownLatch readLatch = new CountDownLatch(1);
final CountDownLatch writeLatch = new CountDownLatch(1);
final Exception[] err = {null};
final String cacheName =
ignite.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
final IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key, val);
tx.commit();
}
try {
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
assertTrue(cache.get(key).equals(val));
readLatch.countDown();
writeLatch.await(10, TimeUnit.SECONDS);
try {
tx.commit();
}
catch (TransactionOptimisticException e) {
log.info("Expected exception: " + e);
err[0] = e;
}
}
return null;
}
}, "read-thread");
GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
readLatch.await(10, TimeUnit.SECONDS);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key, val);
tx.commit();
}
writeLatch.countDown();
return null;
}
}, "write-thread").get();
fut.get();
assertNotNull("Expected exception was not thrown", err[0]);
}
finally {
destroyCache(cacheName);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentUpdateNoDeadlock() throws Exception {
concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false, false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentUpdateNoDeadlockGetPut() throws Exception {
concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, true, false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentUpdateNoDeadlockWithNonSerializable() throws Exception {
concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, true, false, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentUpdateNoDeadlockNodeRestart() throws Exception {
concurrentUpdateNoDeadlock(Collections.singletonList(ignite(1)), 10, false, true, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentUpdateNoDeadlockFromClients() throws Exception {
concurrentUpdateNoDeadlock(clients(), 20, false, false, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentUpdateNoDeadlockFromClientsNodeRestart() throws Exception {
concurrentUpdateNoDeadlock(clients(), 20, false, true, false);
}
/**
* @param updateNodes Nodes executing updates.
* @param threads Number of threads executing updates.
* @param get If {@code true} gets value in transaction.
* @param restart If {@code true} restarts one node.
* @param nonSer If {@code true} starts threads executing non-serializable transactions.
* @throws Exception If failed.
*/
private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes,
int threads,
final boolean get,
final boolean restart,
final boolean nonSer
) throws Exception {
assert !updateNodes.isEmpty();
final Ignite srv = ignite(1);
final String cacheName =
srv.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
try {
final int KEYS = SF.apply(20);
final AtomicBoolean finished = new AtomicBoolean();
IgniteInternalFuture<?> fut = restart ? restartFuture(finished, null) : null;
try {
for (int i = 0; i < SF.applyLB(10, 2); i++) {
log.info("Iteration: " + i);
final long stopTime = U.currentTimeMillis() + SF.applyLB(10_000, 1_000);
final AtomicInteger idx = new AtomicInteger();
IgniteInternalFuture<?> nonSerFut = null;
if (nonSer) {
nonSerFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int nodeIdx = idx.getAndIncrement() % updateNodes.size();
Ignite node = updateNodes.get(nodeIdx);
log.info("Non-serializable tx thread: " + node.name());
final IgniteCache<Integer, Integer> cache = node.cache(cacheName);
assertNotNull(cache);
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (U.currentTimeMillis() < stopTime) {
final TreeMap<Integer, Integer> map = new TreeMap<>();
for (int i = 0; i < KEYS / 2; i++)
map.put(rnd.nextInt(KEYS), rnd.nextInt());
TransactionConcurrency concurrency = rnd.nextBoolean() ? PESSIMISTIC : OPTIMISTIC;
doInTransaction(node, concurrency, REPEATABLE_READ, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.putAll(map);
return null;
}
});
}
return null;
}
}, 5, "non-ser-thread");
}
IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int nodeIdx = idx.getAndIncrement() % updateNodes.size();
Ignite node = updateNodes.get(nodeIdx);
log.info("Tx thread: " + node.name());
final IgniteTransactions txs = node.transactions();
final IgniteCache<Integer, Integer> cache = node.cache(cacheName);
assertNotNull(cache);
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (U.currentTimeMillis() < stopTime) {
final Map<Integer, Integer> map = new LinkedHashMap<>();
for (int i = 0; i < KEYS / 2; i++)
map.put(rnd.nextInt(KEYS), rnd.nextInt());
try {
if (restart) {
doInTransaction(node, OPTIMISTIC, SERIALIZABLE, new Callable<Void>() {
@Override public Void call() throws Exception {
if (get) {
for (Map.Entry<Integer, Integer> e : map.entrySet()) {
if (rnd.nextBoolean()) {
cache.get(e.getKey());
if (rnd.nextBoolean())
cache.put(e.getKey(), e.getValue());
}
else
cache.put(e.getKey(), e.getValue());
}
}
else
cache.putAll(map);
return null;
}
});
}
else {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
if (get) {
for (Map.Entry<Integer, Integer> e : map.entrySet()) {
if (rnd.nextBoolean()) {
cache.get(e.getKey());
if (rnd.nextBoolean())
cache.put(e.getKey(), e.getValue());
}
else
cache.put(e.getKey(), e.getValue());
}
}
else
cache.putAll(map);
tx.commit();
}
}
}
catch (TransactionOptimisticException ignore) {
// No-op.
}
catch (Throwable e) {
log.error("Unexpected error: " + e, e);
throw e;
}
}
return null;
}
}, threads, "tx-thread");
updateFut.get(60, SECONDS);
if (nonSerFut != null)
nonSerFut.get(60, SECONDS);
IgniteCache<Integer, Integer> cache = srv.cache(cacheName);
for (int key = 0; key < KEYS; key++) {
Integer val = cache.get(key);
for (int node = 1; node < SRVS + CLIENTS; node++)
assertEquals(val, ignite(node).cache(cache.getName()).get(key));
}
}
finished.set(true);
if (fut != null)
fut.get();
}
finally {
finished.set(true);
}
}
finally {
destroyCache(cacheName);
}
}
/**
* @return Cache configurations.
*/
private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() {
List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>();
// No store, no near.
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, false));
ccfgs.add(cacheConfiguration(REPLICATED, FULL_SYNC, 0, false, false));
// Store, no near.
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, false));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, false));
ccfgs.add(cacheConfiguration(REPLICATED, FULL_SYNC, 0, true, false));
// No store, near.
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, true));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, true));
// Store, near.
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, true));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, true));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, true));
return ccfgs;
}
/**
* @param ccfg Cache configuration.
*/
private void logCacheInfo(CacheConfiguration<?, ?> ccfg) {
log.info("Test cache [mode=" + ccfg.getCacheMode() +
", sync=" + ccfg.getWriteSynchronizationMode() +
", backups=" + ccfg.getBackups() +
", near=" + (ccfg.getNearConfiguration() != null) +
", store=" + ccfg.isWriteThrough() +
", evictPlc=" + (ccfg.getEvictionPolicy() != null) +
']');
}
/**
* @param cache Cache.
* @return Test keys.
* @throws Exception If failed.
*/
private List<Integer> testKeys(IgniteCache<Integer, Integer> cache) throws Exception {
CacheConfiguration ccfg = cache.getConfiguration(CacheConfiguration.class);
List<Integer> keys = new ArrayList<>();
if (!cache.unwrap(Ignite.class).configuration().isClientMode()) {
if (ccfg.getCacheMode() == PARTITIONED)
keys.add(nearKey(cache));
keys.add(primaryKey(cache));
if (ccfg.getBackups() != 0)
keys.add(backupKey(cache));
}
else
keys.add(nearKey(cache));
return keys;
}
/**
* @param cache Cache.
* @param concurrency Transaction concurrency.
* @param isolation Transaction isolcation.
* @param c Closure to run in transaction.
* @throws Exception If failed.
*/
private void txAsync(final IgniteCache<Integer, Integer> cache,
final TransactionConcurrency concurrency,
final TransactionIsolation isolation,
final IgniteClosure<IgniteCache<Integer, Integer>, Void> c) throws Exception {
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
try (Transaction tx = txs.txStart(concurrency, isolation)) {
c.apply(cache);
tx.commit();
}
return null;
}
}, "async-thread");
fut.get();
}
/**
* @param cache Cache.
* @param key Key.
* @param val Value.
* @throws Exception If failed.
*/
private void updateKey(
final IgniteCache<Integer, Integer> cache,
final Integer key,
final Integer val) throws Exception {
txAsync(cache, PESSIMISTIC, REPEATABLE_READ, new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
@Override public Void apply(IgniteCache<Integer, Integer> cache) {
cache.put(key, val);
return null;
}
});
}
/**
* @param key Key.
* @param expVal Expected value.
* @param cacheName Cache name.
*/
private void checkValue(Object key, Object expVal, String cacheName) {
checkValue(key, expVal, cacheName, false);
}
/**
* @param key Key.
* @param expVal Expected value.
* @param cacheName Cache name.
* @param skipFirst If {@code true} skips first node.
*/
private void checkValue(Object key, Object expVal, String cacheName, boolean skipFirst) {
for (int i = 0; i < SRVS + CLIENTS; i++) {
if (skipFirst && i == 0)
continue;
IgniteCache<Object, Object> cache = ignite(i).cache(cacheName);
assertEquals(expVal, cache.get(key));
}
}
/**
* @param releaseLatch Release lock latch.
* @param cache Cache.
* @param key Key.
* @return Future.
* @throws Exception If failed.
*/
private IgniteInternalFuture<?> lockKey(
final CountDownLatch releaseLatch,
final IgniteCache<Integer, Integer> cache,
final Integer key) throws Exception {
final CountDownLatch lockLatch = new CountDownLatch(1);
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.put(key, 1);
log.info("Locked key: " + key);
lockLatch.countDown();
assertTrue(releaseLatch.await(100000, SECONDS));
log.info("Commit tx: " + key);
tx.commit();
}
return null;
}
}, "lock-thread");
assertTrue(lockLatch.await(10, SECONDS));
return fut;
}
/**
* @param cacheName Cache name.
*/
private void destroyCache(String cacheName) {
storeMap.clear();
for (Ignite ignite : G.allGrids()) {
try {
ignite.destroyCache(cacheName);
}
catch (IgniteException ignore) {
// No-op.
}
}
}
/**
* @param cacheMode Cache mode.
* @param syncMode Write synchronization mode.
* @param backups Number of backups.
* @param storeEnabled If {@code true} adds cache store.
* @param nearCache If {@code true} near cache is enabled.
* @return Cache configuration.
*/
private CacheConfiguration<Integer, Integer> cacheConfiguration(
CacheMode cacheMode,
CacheWriteSynchronizationMode syncMode,
int backups,
boolean storeEnabled,
boolean nearCache) {
CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(cacheMode);
ccfg.setAtomicityMode(TRANSACTIONAL);
ccfg.setWriteSynchronizationMode(syncMode);
if (cacheMode == PARTITIONED)
ccfg.setBackups(backups);
if (storeEnabled) {
ccfg.setCacheStoreFactory(new TestStoreFactory());
ccfg.setWriteThrough(true);
ccfg.setReadThrough(true);
}
if (nearCache)
ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>());
return ccfg;
}
/**
* @return Client nodes.
*/
private List<Ignite> clients() {
List<Ignite> clients = new ArrayList<>();
for (int i = 0; i < CLIENTS; i++) {
Ignite ignite = ignite(SRVS + i);
assertTrue(ignite.configuration().isClientMode());
clients.add(ignite);
}
return clients;
}
/**
* @param stop Stop flag.
* @param fut Future.
* @return Restart thread future.
*/
private IgniteInternalFuture<?> restartFuture(final AtomicBoolean stop, final IgniteInternalFuture<?> fut) {
return GridTestUtils.runAsync(new Callable<Object>() {
private boolean stop() {
if (stop != null)
return stop.get();
return fut.isDone();
}
@Override public Object call() throws Exception {
while (!stop()) {
Ignite ignite = startGrid(SRVS + CLIENTS);
assertFalse(ignite.configuration().isClientMode());
U.sleep(300);
stopGrid(SRVS + CLIENTS);
}
return null;
}
}, "restart-thread");
}
/**
*
*/
private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>> {
/** {@inheritDoc} */
@Override public CacheStore<Integer, Integer> create() {
return new CacheStoreAdapter<Integer, Integer>() {
@Override public Integer load(Integer key) throws CacheLoaderException {
return storeMap.get(key);
}
@Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) {
storeMap.put(entry.getKey(), entry.getValue());
}
@Override public void delete(Object key) {
storeMap.remove(key);
}
};
}
}
/**
* Sets given value, returns old value.
*/
public static final class SetValueProcessor implements EntryProcessor<Integer, Integer, Integer> {
/** */
private Integer newVal;
/**
* @param newVal New value to set.
*/
SetValueProcessor(Integer newVal) {
this.newVal = newVal;
}
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) {
Integer val = entry.getValue();
if (newVal == null)
entry.remove();
else
entry.setValue(newVal);
return val;
}
}
/**
*
*/
static class Account {
/** */
private final int val;
/**
* @param val Value.
*/
public Account(int val) {
this.val = val;
}
/**
* @return Value.
*/
public int value() {
return val;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(Account.class, this);
}
}
}