blob: d2769e392e94764016ff8f0693fbb0fd0e6bced5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* Tests specific combinations of cross-cache transactions.
*/
public abstract class IgniteCrossCacheTxAbstractSelfTest extends GridCommonAbstractTest {
/** */
private static final String FIRST_CACHE = "FirstCache";
/** */
private static final String SECOND_CACHE = "SecondCache";
/** */
private static final int TX_CNT = 500;
/**
* @return Node count for this test.
*/
private int nodeCount() {
return 4;
}
/**
* @return {@code True} if near cache should be enabled.
*/
protected boolean nearEnabled() {
return false;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected void beforeTestsStarted() throws Exception {
startGridsMultiThreaded(nodeCount());
CacheConfiguration firstCfg = new CacheConfiguration(FIRST_CACHE);
firstCfg.setBackups(1);
firstCfg.setAtomicityMode(atomicityMode());
firstCfg.setWriteSynchronizationMode(FULL_SYNC);
grid(0).createCache(firstCfg);
CacheConfiguration secondCfg = new CacheConfiguration(SECOND_CACHE);
secondCfg.setBackups(1);
secondCfg.setAtomicityMode(atomicityMode());
secondCfg.setWriteSynchronizationMode(FULL_SYNC);
if (nearEnabled())
secondCfg.setNearConfiguration(new NearCacheConfiguration());
grid(0).createCache(secondCfg);
}
/**
* @return Atomicity mode.
*/
public abstract CacheAtomicityMode atomicityMode();
/**
* @param concurrency Concurrency.
* @param isolation Isolation.
* @throws Exception If failed.
*/
protected void checkTxsSingleOp(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
Map<Integer, String> firstCheck = new HashMap<>();
Map<Integer, String> secondCheck = new HashMap<>();
for (int i = 0; i < TX_CNT; i++) {
int grid = ThreadLocalRandom.current().nextInt(nodeCount());
IgniteCache<Integer, String> first = grid(grid).cache(FIRST_CACHE);
IgniteCache<Integer, String> second = grid(grid).cache(SECOND_CACHE);
try (Transaction tx = grid(grid).transactions().txStart(concurrency, isolation)) {
try {
int size = ThreadLocalRandom.current().nextInt(24) + 1;
for (int k = 0; k < size; k++) {
boolean rnd = ThreadLocalRandom.current().nextBoolean();
IgniteCache<Integer, String> cache = rnd ? first : second;
Map<Integer, String> check = rnd ? firstCheck : secondCheck;
String val = rnd ? "first" + i : "second" + i;
cache.put(k, val);
check.put(k, val);
}
tx.commit();
}
catch (Throwable e) {
e.printStackTrace();
throw e;
}
}
if (i > 0 && i % 100 == 0)
info("Finished iteration: " + i);
}
for (int g = 0; g < nodeCount(); g++) {
IgniteEx grid = grid(g);
assertEquals(0, grid.context().cache().context().tm().idMapSize());
ClusterNode locNode = grid.localNode();
IgniteCache<Object, Object> firstCache = grid.cache(FIRST_CACHE);
for (Map.Entry<Integer, String> entry : firstCheck.entrySet()) {
boolean primary = grid.affinity(FIRST_CACHE).isPrimary(locNode, entry.getKey());
boolean backup = grid.affinity(FIRST_CACHE).isBackup(locNode, entry.getKey());
assertEquals("Invalid value found first cache [primary=" + primary + ", backup=" + backup +
", node=" + locNode.id() + ", key=" + entry.getKey() + ']',
entry.getValue(), firstCache.get(entry.getKey()));
}
for (Map.Entry<Integer, String> entry : secondCheck.entrySet()) {
boolean primary = grid.affinity(SECOND_CACHE).isPrimary(locNode, entry.getKey());
boolean backup = grid.affinity(SECOND_CACHE).isBackup(locNode, entry.getKey());
assertEquals("Invalid value found second cache [primary=" + primary + ", backup=" + backup +
", node=" + locNode.id() + ", key=" + entry.getKey() + ']',
entry.getValue(), grid.cache(SECOND_CACHE).get(entry.getKey()));
}
}
}
}