blob: 0df17439ea003cae5f6829ed1167abbc5c9bcaf9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.junit.Before;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
*
*/
public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
/** */
private static final String CACHE1 = "cache1";
/** */
private static final String CACHE2 = "cache2";
/** */
private static final int GRID_CNT = 5;
/** */
private static final int KEY_RANGE = 1000;
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 6 * 60 * 1000;
}
/** */
@Before
public void beforeCrossCacheTxRandomOperationsTest() {
if (nearCacheEnabled())
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.NEAR_CACHE);
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGridsMultiThreaded(GRID_CNT - 1);
startClientGrid(GRID_CNT - 1);
}
/**
* @return Test near cache flag.
*/
protected boolean nearCacheEnabled() {
return false;
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxOperations() throws Exception {
txOperations(PARTITIONED, FULL_SYNC, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCrossCacheTxOperations() throws Exception {
txOperations(PARTITIONED, FULL_SYNC, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCrossCacheTxOperationsPrimarySync() throws Exception {
txOperations(PARTITIONED, PRIMARY_SYNC, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCrossCacheTxOperationsReplicated() throws Exception {
txOperations(REPLICATED, FULL_SYNC, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCrossCacheTxOperationsReplicatedPrimarySync() throws Exception {
txOperations(REPLICATED, PRIMARY_SYNC, true);
}
/**
* @param name Cache name.
* @param cacheMode Cache mode.
* @param writeSync Write synchronization mode.
* @param nearCache Near cache flag.
* @return Cache configuration.
*/
protected CacheConfiguration cacheConfiguration(String name,
CacheMode cacheMode,
CacheWriteSynchronizationMode writeSync,
boolean nearCache) {
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setName(name);
ccfg.setCacheMode(cacheMode);
ccfg.setAtomicityMode(TRANSACTIONAL);
ccfg.setWriteSynchronizationMode(writeSync);
if (cacheMode == PARTITIONED)
ccfg.setBackups(1);
ccfg.setAffinity(new RendezvousAffinityFunction());
if (nearCache)
ccfg.setNearConfiguration(new NearCacheConfiguration());
return ccfg;
}
/**
* @param cacheMode Cache mode.
* @param writeSync Write synchronization mode.
* @param nearCache Near cache flag.
* @param ignite Node to use.
* @param name Cache name.
*/
protected void createCache(CacheMode cacheMode,
CacheWriteSynchronizationMode writeSync,
boolean nearCache,
Ignite ignite,
String name) {
ignite.createCache(cacheConfiguration(name, cacheMode, writeSync, nearCache));
}
/**
* @param cacheMode Cache mode.
* @param writeSync Write synchronization mode.
* @param crossCacheTx If {@code true} uses cross cache transaction.
* @throws Exception If failed.
*/
private void txOperations(CacheMode cacheMode,
CacheWriteSynchronizationMode writeSync,
boolean crossCacheTx) throws Exception {
if (MvccFeatureChecker.forcedMvcc()) {
assert !nearCacheEnabled();
if(writeSync != CacheWriteSynchronizationMode.FULL_SYNC)
return;
}
Ignite ignite = ignite(0);
try {
createCache(cacheMode, writeSync, nearCacheEnabled(), ignite, CACHE1);
createCache(cacheMode, writeSync, false, ignite, CACHE2);
txOperations(PESSIMISTIC, REPEATABLE_READ, crossCacheTx, false);
txOperations(PESSIMISTIC, REPEATABLE_READ, crossCacheTx, true);
if(!MvccFeatureChecker.forcedMvcc()) {
txOperations(OPTIMISTIC, REPEATABLE_READ, crossCacheTx, false);
txOperations(OPTIMISTIC, REPEATABLE_READ, crossCacheTx, true);
if (writeSync == FULL_SYNC) {
txOperations(OPTIMISTIC, SERIALIZABLE, crossCacheTx, false);
txOperations(OPTIMISTIC, SERIALIZABLE, crossCacheTx, true);
}
}
}
finally {
ignite.destroyCache(CACHE1);
ignite.destroyCache(CACHE2);
}
}
/**
* @param concurrency Transaction concurrency.
* @param isolation Transaction isolation.
* @param crossCacheTx If {@code true} uses cross cache transaction.
* @param client If {@code true} uses client node.
* @throws Exception If failed.
*/
private void txOperations(TransactionConcurrency concurrency,
TransactionIsolation isolation,
boolean crossCacheTx,
boolean client) throws Exception {
final Map<TestKey, TestValue> expData1 = new HashMap<>();
final Map<TestKey, TestValue> expData2 = new HashMap<>();
Ignite ignite = client ? ignite(GRID_CNT - 1) : ignite(0);
assertEquals(client, (boolean)ignite.configuration().isClientMode());
final List<IgniteCache<TestKey, TestValue>> caches1 = new ArrayList<>();
final List<IgniteCache<TestKey, TestValue>> caches2 = new ArrayList<>();
for (int i = 0; i < GRID_CNT; i++) {
caches1.add(ignite(i).<TestKey, TestValue>cache(CACHE1));
caches2.add(ignite(i).<TestKey, TestValue>cache(CACHE2));
}
IgniteCache<TestKey, TestValue> cache1 = ignite.cache(CACHE1);
IgniteCache<TestKey, TestValue> cache2 = ignite.cache(CACHE2);
assertNotNull(cache1);
assertNotNull(cache2);
assertNotSame(cache1, cache2);
try {
Random rnd = new Random();
long seed = System.currentTimeMillis();
rnd.setSeed(seed);
log.info("Test tx operations [concurrency=" + concurrency +
", isolation=" + isolation +
", client=" + client +
", seed=" + seed + ']');
IgniteTransactions txs = ignite.transactions();
final List<TestKey> keys = new ArrayList<>();
for (int i = 0; i < KEY_RANGE; i++)
keys.add(new TestKey(i));
CacheConfiguration ccfg = cache1.getConfiguration(CacheConfiguration.class);
boolean fullSync = ccfg.getWriteSynchronizationMode() == FULL_SYNC;
boolean optimistic = concurrency == OPTIMISTIC;
boolean checkData = fullSync && !optimistic;
long stopTime = System.currentTimeMillis() + 10_000;
for (int i = 0; i < 10_000; i++) {
if (i % 100 == 0) {
if (System.currentTimeMillis() > stopTime) {
log.info("Stop on timeout, iteration: " + i);
break;
}
log.info("Iteration: " + i);
}
boolean rollback = i % 10 == 0;
try (Transaction tx = txs.txStart(concurrency, isolation)) {
cacheOperation(expData1, rnd, cache1, checkData, rollback);
if (crossCacheTx)
cacheOperation(expData2, rnd, cache2, checkData, rollback);
if (rollback)
tx.rollback();
else
tx.commit();
}
}
if (fullSync) {
checkData(caches1, keys, expData1);
checkData(caches2, keys, expData2);
cache1.removeAll();
cache2.removeAll();
checkData(caches1, keys, new HashMap<TestKey, TestValue>());
checkData(caches2, keys, new HashMap<TestKey, TestValue>());
}
}
finally {
cache1.removeAll();
cache2.removeAll();
}
}
/**
* @param caches Caches.
* @param keys Keys.
* @param expData Expected data.
*/
private void checkData(List<IgniteCache<TestKey, TestValue>> caches,
List<TestKey> keys, Map<TestKey, TestValue> expData) {
for (IgniteCache<TestKey, TestValue> cache : caches) {
for (TestKey key : keys) {
TestValue val = cache.get(key);
TestValue expVal = expData.get(key);
assertEquals(expVal, val);
}
}
}
/**
* @param expData Expected cache data.
* @param rnd Random.
* @param cache Cache.
* @param checkData If {@code true} checks data.
* @param willRollback {@code True} if will rollback transaction.
*/
private void cacheOperation(
Map<TestKey, TestValue> expData,
Random rnd,
IgniteCache<TestKey, TestValue> cache,
boolean checkData,
boolean willRollback) {
TestKey key = key(rnd);
TestValue val = new TestValue(rnd.nextLong());
switch (rnd.nextInt(8)) {
case 0: {
cache.put(key, val);
if (!willRollback)
expData.put(key, val);
break;
}
case 1: {
TestValue oldVal = cache.getAndPut(key, val);
TestValue expOld = expData.get(key);
if (checkData)
assertEquals(expOld, oldVal);
if (!willRollback)
expData.put(key, val);
break;
}
case 2: {
boolean rmv = cache.remove(key);
if (checkData)
assertEquals(expData.containsKey(key), rmv);
if (!willRollback)
expData.remove(key);
break;
}
case 3: {
TestValue oldVal = cache.getAndRemove(key);
TestValue expOld = expData.get(key);
if (checkData)
assertEquals(expOld, oldVal);
if (!willRollback)
expData.remove(key);
break;
}
case 4: {
boolean put = cache.putIfAbsent(key, val);
boolean expPut = !expData.containsKey(key);
if (checkData)
assertEquals(expPut, put);
if (expPut && !willRollback)
expData.put(key, val);
break;
}
case 5: {
TestValue oldVal = cache.invoke(key, new TestEntryProcessor(val.value()));
TestValue expOld = expData.get(key);
if (checkData)
assertEquals(expOld, oldVal);
if (!willRollback)
expData.put(key, val);
break;
}
case 6: {
TestValue oldVal = cache.invoke(key, new TestEntryProcessor(null));
TestValue expOld = expData.get(key);
if (checkData)
assertEquals(expOld, oldVal);
break;
}
case 7: {
TestValue oldVal = cache.get(key);
TestValue expOld = expData.get(key);
if (checkData)
assertEquals(expOld, oldVal);
break;
}
default:
assert false;
}
}
/**
* @param rnd Random.
* @return Key.
*/
private TestKey key(Random rnd) {
return new TestKey(rnd.nextInt(KEY_RANGE));
}
/**
*
*/
private static class TestKey implements Serializable {
/** */
private long key;
/**
* @param key Key.
*/
public TestKey(long key) {
this.key = key;
}
/**
* @return Key.
*/
public long key() {
return key;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TestKey other = (TestKey)o;
return key == other.key;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return (int)(key ^ (key >>> 32));
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TestKey.class, this);
}
}
/**
*
*/
private static class TestValue implements Serializable {
/** */
private long val;
/**
* @param val Value.
*/
public TestValue(long val) {
this.val = val;
}
/**
* @return Value.
*/
public long value() {
return val;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TestValue other = (TestValue)o;
return val == other.val;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TestValue.class, this);
}
}
/**
*
*/
private static class TestEntryProcessor implements CacheEntryProcessor<TestKey, TestValue, TestValue> {
/** */
private Long val;
/**
* @param val Value.
*/
public TestEntryProcessor(@Nullable Long val) {
this.val = val;
}
/** {@inheritDoc} */
@Override public TestValue process(MutableEntry<TestKey, TestValue> e, Object... args) {
TestValue old = e.getValue();
if (val != null)
e.setValue(new TestValue(val));
return old;
}
}
}