blob: 6638626a28d62b75a7996226e7742643ae395d1b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.io.Serializable;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.CacheException;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
*
*/
public class IgniteCacheCrossCacheTxFailoverTest extends GridCommonAbstractTest {
/** */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
private static final String CACHE1 = "cache1";
/** */
private static final String CACHE2 = "cache2";
/** */
private static final int GRID_CNT = 4;
/** */
private static final int KEY_RANGE = 1000;
/** */
private static final long TEST_TIME = 3 * 60_000;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
if (igniteInstanceName.equals(getTestIgniteInstanceName(GRID_CNT - 1)))
cfg.setClientMode(true);
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGrids(4);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
stopAllGrids();
}
/**
* @param name Cache name.
* @param cacheMode Cache mode.
* @param parts Number of partitions.
* @return Cache configuration.
*/
private CacheConfiguration cacheConfiguration(@NotNull String name,
CacheMode cacheMode,
int parts) {
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setName(name);
ccfg.setCacheMode(cacheMode);
ccfg.setAtomicityMode(TRANSACTIONAL);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
if (cacheMode == PARTITIONED)
ccfg.setBackups(1);
ccfg.setAffinity(new RendezvousAffinityFunction(false, parts));
return ccfg;
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return TEST_TIME + 60_000;
}
/**
* @throws Exception If failed.
*/
public void testCrossCachePessimisticTxFailover() throws Exception {
crossCacheTxFailover(PARTITIONED, true, PESSIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
public void testCrossCachePessimisticTxFailoverDifferentAffinity() throws Exception {
crossCacheTxFailover(PARTITIONED, false, PESSIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
public void testCrossCacheOptimisticTxFailover() throws Exception {
crossCacheTxFailover(PARTITIONED, true, OPTIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
public void testCrossCacheOptimisticSerializableTxFailover() throws Exception {
crossCacheTxFailover(PARTITIONED, true, OPTIMISTIC, SERIALIZABLE);
}
/**
* @throws Exception If failed.
*/
public void testCrossCacheOptimisticTxFailoverDifferentAffinity() throws Exception {
crossCacheTxFailover(PARTITIONED, false, OPTIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
public void testCrossCachePessimisticTxFailoverReplicated() throws Exception {
crossCacheTxFailover(REPLICATED, true, PESSIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
public void testCrossCacheOptimisticTxFailoverReplicated() throws Exception {
crossCacheTxFailover(REPLICATED, true, OPTIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
public void testCrossCachePessimisticTxFailoverDifferentAffinityReplicated() throws Exception {
crossCacheTxFailover(PARTITIONED, false, PESSIMISTIC, REPEATABLE_READ);
}
/**
* @param cacheMode Cache mode.
* @param sameAff If {@code false} uses different number of partitions for caches.
* @param concurrency Transaction concurrency.
* @param isolation Transaction isolation.
* @throws Exception If failed.
*/
private void crossCacheTxFailover(CacheMode cacheMode,
boolean sameAff,
final TransactionConcurrency concurrency,
final TransactionIsolation isolation) throws Exception {
IgniteKernal ignite0 = (IgniteKernal)ignite(0);
final AtomicBoolean stop = new AtomicBoolean();
try {
ignite0.createCache(cacheConfiguration(CACHE1, cacheMode, 256));
ignite0.createCache(cacheConfiguration(CACHE2, cacheMode, sameAff ? 256 : 128));
final AtomicInteger threadIdx = new AtomicInteger();
IgniteInternalFuture<?> fut = runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int idx = threadIdx.getAndIncrement();
Ignite ignite = ignite(idx % GRID_CNT);
log.info("Started update thread [node=" + ignite.name() +
", client=" + ignite.configuration().isClientMode() + ']');
IgniteCache<TestKey, TestValue> cache1 = ignite.cache(CACHE1);
IgniteCache<TestKey, TestValue> cache2 = ignite.cache(CACHE2);
assertNotSame(cache1, cache2);
IgniteTransactions txs = ignite.transactions();
ThreadLocalRandom rnd = ThreadLocalRandom.current();
long iter = 0;
while (!stop.get()) {
boolean sameKey = rnd.nextBoolean();
try {
try (Transaction tx = txs.txStart(concurrency, isolation)) {
if (sameKey) {
TestKey key = new TestKey(rnd.nextLong(KEY_RANGE));
cacheOperation(rnd, cache1, key);
cacheOperation(rnd, cache2, key);
}
else {
TestKey key1 = new TestKey(rnd.nextLong(KEY_RANGE));
TestKey key2 = new TestKey(key1.key() + 1);
cacheOperation(rnd, cache1, key1);
cacheOperation(rnd, cache2, key2);
}
tx.commit();
}
}
catch (CacheException | IgniteException e) {
log.info("Update error: " + e);
}
if (iter++ % 500 == 0)
log.info("Iteration: " + iter);
}
return null;
}
/**
* @param rnd Random.
* @param cache Cache.
* @param key Key.
*/
private void cacheOperation(ThreadLocalRandom rnd, IgniteCache<TestKey, TestValue> cache, TestKey key) {
switch (rnd.nextInt(4)) {
case 0:
cache.put(key, new TestValue(rnd.nextLong()));
break;
case 1:
cache.remove(key);
break;
case 2:
cache.invoke(key, new TestEntryProcessor(rnd.nextBoolean() ? 1L : null));
break;
case 3:
cache.get(key);
break;
default:
assert false;
}
}
}, 10, "tx-thread");
long stopTime = System.currentTimeMillis() + 3 * 60_000;
long topVer = ignite0.cluster().topologyVersion();
boolean failed = false;
while (System.currentTimeMillis() < stopTime) {
log.info("Start node.");
IgniteKernal ignite = (IgniteKernal)startGrid(GRID_CNT);
assertFalse(ignite.configuration().isClientMode());
topVer++;
IgniteInternalFuture<?> affFut = ignite.context().cache().context().exchange().affinityReadyFuture(
new AffinityTopologyVersion(topVer));
try {
if (affFut != null)
affFut.get(30_000);
}
catch (IgniteFutureTimeoutCheckedException ignored) {
log.error("Failed to wait for affinity future after start: " + topVer);
failed = true;
break;
}
Thread.sleep(500);
log.info("Stop node.");
stopGrid(GRID_CNT);
topVer++;
affFut = ignite0.context().cache().context().exchange().affinityReadyFuture(
new AffinityTopologyVersion(topVer));
try {
if (affFut != null)
affFut.get(30_000);
}
catch (IgniteFutureTimeoutCheckedException ignored) {
log.error("Failed to wait for affinity future after stop: " + topVer);
failed = true;
break;
}
}
stop.set(true);
fut.get();
assertFalse("Test failed, see log for details.", failed);
}
finally {
stop.set(true);
ignite0.destroyCache(CACHE1);
ignite0.destroyCache(CACHE2);
AffinityTopologyVersion topVer = ignite0.context().cache().context().exchange().lastTopologyFuture().get();
for (Ignite ignite : G.allGrids())
((IgniteKernal)ignite).context().cache().context().exchange().affinityReadyFuture(topVer).get();
awaitPartitionMapExchange();
}
}
/**
*
*/
private static class TestKey implements Serializable {
/** */
private long key;
/**
* @param key Key.
*/
public TestKey(long key) {
this.key = key;
}
/**
* @return Key.
*/
public long key() {
return key;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TestKey testKey = (TestKey)o;
return key == testKey.key;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return (int)(key ^ (key >>> 32));
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TestKey.class, this);
}
}
/**
*
*/
private static class TestValue implements Serializable {
/** */
private long val;
/**
* @param val Value.
*/
public TestValue(long val) {
this.val = val;
}
/**
* @return Value.
*/
public long value() {
return val;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TestValue.class, this);
}
}
/**
*
*/
private static class TestEntryProcessor implements CacheEntryProcessor<TestKey, TestValue, TestValue> {
/** */
private Long val;
/**
* @param val Value.
*/
public TestEntryProcessor(@Nullable Long val) {
this.val = val;
}
/** {@inheritDoc} */
@Override public TestValue process(MutableEntry<TestKey, TestValue> e, Object... args) {
TestValue old = e.getValue();
if (val != null)
e.setValue(new TestValue(val));
return old;
}
}
}