blob: 4d4981413fd5a26dc365c341f17a558b0f209d85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteFutureTimeoutException;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionRollbackException;
import org.junit.Assume;
import org.junit.Test;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
*
*/
public class TxOnCachesStopTest extends GridCommonAbstractTest {
/** Cache1 name. */
private static final String CACHE_1_NAME = "cache1";
/** Cache2 name. */
private static final String CACHE_2_NAME = "cache2";
/** rnd instance. */
private static final GridRandom rnd = new GridRandom();
/** */
private CacheConfiguration<Integer, byte[]> destroyCacheCfg;
/** */
private CacheConfiguration<Integer, byte[]> surviveCacheCfg;
/** */
private static final int CACHE_CNT = 30;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
cfg.setCommunicationSpi(commSpi);
DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration().setMaxSize(100 * 1024 * 1024).setPersistenceEnabled(true))
.setWalMode(WALMode.LOG_ONLY);
cfg.setDataStorageConfiguration(memCfg);
CacheConfiguration<Integer, byte[]> ccfg1 = new CacheConfiguration<>();
ccfg1.setName(CACHE_1_NAME);
ccfg1.setBackups(1);
ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32));
destroyCacheCfg = ccfg1;
CacheConfiguration<Integer, byte[]> ccfg2 = new CacheConfiguration<>();
ccfg2.setName(CACHE_2_NAME);
ccfg2.setBackups(1);
ccfg2.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
ccfg2.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
ccfg2.setAffinity(new RendezvousAffinityFunction(false, 32));
surviveCacheCfg = ccfg2;
cfg.setCacheConfiguration(destroyCacheCfg, surviveCacheCfg);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
grid(0).destroyCache(destroyCacheCfg.getName());
grid(0).destroyCache(surviveCacheCfg.getName());
stopAllGrids();
cleanPersistenceDir();
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxOnCacheStopNoMessageBlock() throws Exception {
runTxOnCacheStop(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxOnCacheStopWithMessageBlock() throws Exception {
runTxOnCacheStop(true);
}
/**
* @param block {@code True} To block GridNearTxPrepareRequest message.
*/
private void runTxOnCacheStop(boolean block) throws Exception {
startGridsMultiThreaded(2);
IgniteEx ig = startClientGrid("client");
ig.cluster().active(true);
for (TransactionConcurrency conc : TransactionConcurrency.values()) {
for (TransactionIsolation iso : TransactionIsolation.values())
runTxOnCacheStop(conc, iso, ig, block);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxOnCacheStopInMid() throws Exception {
startGridsMultiThreaded(2);
IgniteEx ig = startClientGrid("client");
ig.cluster().active(true);
for (TransactionConcurrency conc : TransactionConcurrency.values()) {
for (TransactionIsolation iso : TransactionIsolation.values())
runCacheStopInMidTx(conc, iso, ig);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticTxMappedOnPMETopology() throws Exception {
Assume.assumeFalse(MvccFeatureChecker.forcedMvcc());
startGridsMultiThreaded(1);
Ignite client = startClientGrid("client");
client.cluster().active(true);
awaitPartitionMapExchange(true, true, null);
final IgniteCache<Integer, byte[]> cache = client.getOrCreateCache(destroyCacheCfg);
final IgniteCache<Integer, byte[]> cache2 = client.getOrCreateCache(surviveCacheCfg);
final TestRecordingCommunicationSpi srvSpi = TestRecordingCommunicationSpi.spi(grid(0));
CountDownLatch destroyLatch = new CountDownLatch(1);
srvSpi.blockMessages((node, msg) -> (msg instanceof GridDhtPartitionsFullMessage));
try (Transaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
cache2.put(100, new byte[1024]);
cache.put(100, new byte[1024]);
GridTestUtils.runAsync(() -> {
grid(0).destroyCache(destroyCacheCfg.getName());
destroyLatch.countDown();
});
destroyLatch.await();
IgniteFuture commitFut = tx.commitAsync();
srvSpi.stopBlock();
commitFut.get(10_000);
fail("Transaction should be rolled back.");
}
catch (IgniteFutureTimeoutException fte) {
srvSpi.stopBlock();
fail("Partition map exchange hangs [err=" + fte + ']');
}
catch (IgniteException e) {
srvSpi.stopBlock();
assertTrue(X.hasCause(e, CacheInvalidStateException.class) || X.hasCause(e, IgniteException.class));
}
}
/**
* @param conc Concurrency mode.
* @param iso Isolation level.
* @param ig Client node.
* @param runConc {@code true} if a cache should be destroyed concurrently.
* @throws Exception If Failed.
*/
private void runTxOnCacheStop(
TransactionConcurrency conc,
TransactionIsolation iso,
Ignite ig,
boolean runConc
) throws Exception {
if ((conc == TransactionConcurrency.OPTIMISTIC) && (MvccFeatureChecker.forcedMvcc()))
return;
if (log.isInfoEnabled()) {
log.info("Starting runTxOnCacheStop " +
"[concurrency=" + conc + ", isolation=" + iso + ", blockPrepareRequests=" + !runConc + ']');
}
CountDownLatch destroyLatch = new CountDownLatch(1);
final IgniteCache<Integer, byte[]> cache = ig.getOrCreateCache(destroyCacheCfg);
final IgniteCache<Integer, byte[]> cache2 = ig.getOrCreateCache(surviveCacheCfg);
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(ig);
IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
try {
destroyLatch.await();
IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
doSleep(rnd.nextInt(500));
spi.stopBlock();
});
cache.destroy();
f.get();
}
catch (Exception e) {
e.printStackTrace();
}
});
spi.blockMessages((node, msg) -> {
if (msg instanceof GridNearTxPrepareRequest) {
destroyLatch.countDown();
return runConc;
}
return false;
});
IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> {
byte[] val = new byte[1024];
try (Transaction tx = ig.transactions().txStart(conc, iso, 1_000, 2)) {
cache.put(100, val);
cache2.put(100, val);
tx.commit();
}
catch (IgniteException e) {
assertTrue(X.hasCause(e, IgniteTxTimeoutCheckedException.class)
|| X.hasCause(e, CacheInvalidStateException.class) || X.hasCause(e, IgniteException.class));
}
});
f1.get();
f0.get();
try {
assertEquals(cache2.get(100), cache.get(100));
}
catch (IllegalStateException e) {
assertTrue(X.hasCause(e, CacheStoppedException.class));
}
spi.stopBlock();
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticTransactionsOnCacheDestroy() throws Exception {
Assume.assumeFalse(MvccFeatureChecker.forcedMvcc());
startGridsMultiThreaded(3);
ArrayList<Ignite> clients = new ArrayList<>();
for (int ci = 0; ci < 2; ++ci)
clients.add(startClientGrid("client-" + ci));
clients.get(0).cluster().active(true);
for (TransactionIsolation iso : TransactionIsolation.values()) {
grid(0).getOrCreateCaches(createCacheConfigurations());
// Make sure that all caches are started.
awaitPartitionMapExchange();
testConcurrentTransactionsOnCacheDestroy(clients, OPTIMISTIC, iso);
// Make sure that all caches are stopped.
awaitPartitionMapExchange();
}
}
/**
* Creates a list of cache configurations.
*
* @return List of cache configurations.
*/
private List<CacheConfiguration> createCacheConfigurations() {
String GRP_NAME = "test-destroy-group";
List<CacheConfiguration> cacheCfgs = new ArrayList<>(CACHE_CNT);
for (int i = 0; i < CACHE_CNT; ++i) {
CacheConfiguration<Integer, byte[]> c = new CacheConfiguration<>("test-cache-" + i);
c.setBackups(2);
c.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
c.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
c.setAffinity(new RendezvousAffinityFunction(false, 32));
c.setGroupName(GRP_NAME);
cacheCfgs.add(c);
}
return cacheCfgs;
}
/**
* @param clients Client nodes that are used for initiating transactions.
* @param conc Transaction concurrency mode.
* @param iso Transaction isolation.
* @throws Exception If failed.
*/
private void testConcurrentTransactionsOnCacheDestroy(
final ArrayList<Ignite> clients,
TransactionConcurrency conc,
TransactionIsolation iso
) throws Exception {
if (log.isInfoEnabled()) {
log.info("Starting testConcurrentTransactionsOnCacheDestroy " +
"[concurrency=" + conc + ", isolation=" + iso + ']');
}
final AtomicBoolean stopTxLoad = new AtomicBoolean();
final AtomicInteger cacheIdxToBeDestroyed = new AtomicInteger(-1);
IgniteInternalFuture txLoadFut = startTxLoad(stopTxLoad, cacheIdxToBeDestroyed, clients, conc, iso);
try {
for (int i = 0; i < CACHE_CNT; ++i) {
int clientIdx = (i % clients.size());
IgniteInternalFuture destFut = GridTestUtils.runAsync(() ->
clients.get(clientIdx).destroyCache("test-cache-" + cacheIdxToBeDestroyed.incrementAndGet())
);
try {
destFut.get(15, TimeUnit.SECONDS);
}
catch (IgniteCheckedException e) {
fail("Looks like PME hangs [err=" + e + ']');
}
}
}
catch (Throwable t) {
fail("Unexpected error [err=" + t + ']');
}
stopTxLoad.set(true);
txLoadFut.get();
}
/**
* Starts transactional load.
*
* @param stopTxLoad Boolean flag that is used to stop transactional load.
* @param cacheIdxToBeDestroyed Variable that allows to get an index of destroyed cache.
* @param clients Client nodes that are used for initiating transactions.
* @param concurrency Transaction concurrency mode.
* @param isolation Transaction isolation.
* @return TxLoad future.
*/
private IgniteInternalFuture startTxLoad(
final AtomicBoolean stopTxLoad,
final AtomicInteger cacheIdxToBeDestroyed,
final List<Ignite> clients,
TransactionConcurrency concurrency,
TransactionIsolation isolation) {
final GridCompoundFuture fut = new GridCompoundFuture();
for (Ignite c : clients) {
for (int i = 0; i < CACHE_CNT; ++i)
c.getOrCreateCache("test-cache-" + i);
}
clients.forEach(c -> {
fut.add(GridTestUtils.runAsync(() -> {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
List<IgniteCache<Integer, byte[]>> caches = new ArrayList<>();
for (int i = 0; i < CACHE_CNT; ++i) {
IgniteCache<Integer, byte[]> testCache = c.cache("test-cache-" + i);
if (testCache == null) {
throw new IllegalStateException(
"Cache test-cache-" + i + " is not started " +
"on client node " + c.configuration().getIgniteInstanceName());
}
caches.add(testCache);
}
byte[] val = new byte[128];
while (!stopTxLoad.get()) {
try (Transaction tx = c.transactions().txStart(concurrency, isolation)) {
int cacheIdx = cacheIdxToBeDestroyed.get();
caches.get(Math.max(0, cacheIdx)).put(rnd.nextInt(), val);
caches.get(rnd.nextInt(Math.min(cacheIdx + 1, caches.size() - 1), caches.size())).put(rnd.nextInt(), val);
doSleep(200);
tx.commit();
}
// Expected exceptions:
catch (TransactionRollbackException | CacheException e) {
// Failed to prepare the transaction (transaction is marked as rolled back).
if (!X.hasCause(e, TransactionRollbackException.class))
throw e;
}
catch (IgniteException | IllegalStateException ignore) {
// Failed to perform cache operation (cache is stopped).
}
}
}, "tx-load-" + c.configuration().getIgniteInstanceName()));
});
fut.markInitialized();
return fut;
}
/**
* @param conc Concurrency mode.
* @param iso Isolation level.
* @param ig Client node.
* @throws Exception If failed.
*/
private void runCacheStopInMidTx(TransactionConcurrency conc, TransactionIsolation iso, Ignite ig) throws Exception {
if ((conc == TransactionConcurrency.OPTIMISTIC) && (MvccFeatureChecker.forcedMvcc()))
return;
if (log.isInfoEnabled())
log.info("Starting runCacheStopInMidTx [concurrency=" + conc + ", isolation=" + iso + ']');
CountDownLatch destroyLatch = new CountDownLatch(1);
CountDownLatch putLatch = new CountDownLatch(1);
final IgniteCache<Integer, byte[]> cache = ig.getOrCreateCache(destroyCacheCfg);
final IgniteCache<Integer, byte[]> cache2 = ig.getOrCreateCache(surviveCacheCfg);
IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
try {
putLatch.await();
cache.destroy();
destroyLatch.countDown();
}
catch (Exception e) {
e.printStackTrace();
}
});
IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> {
byte[] val = new byte[1024];
try (Transaction tx = ig.transactions().txStart(conc, iso, 1_000, 2)) {
cache.put(100, val);
cache2.put(100, val);
putLatch.countDown();
destroyLatch.await();
tx.commit();
}
catch (IgniteException e) {
assertTrue(X.hasCause(e, CacheInvalidStateException.class) ||
X.hasCause(e, CacheStoppedException.class) || X.hasCause(e, TransactionRollbackException.class) ||
X.hasCause(e, IgniteException.class));
}
catch (InterruptedException e) {
e.printStackTrace();
}
}, "tx-load-thread");
f1.get();
f0.get();
assertNull(cache2.get(100));
}
}