blob: fdfe9c75ecfda63cb0679e2fffd7873d2a4d4673 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import javax.cache.CacheException;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.junit.Assume;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Test checks that grid transaction configuration doesn't influence system caches.
*/
public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
/** Test cache name. */
private static final String CACHE_NAME = "cache_name";
/** Timeout of transaction. */
private static final long TX_TIMEOUT = 100;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TcpCommunicationSpi commSpi = new TestCommunicationSpi();
cfg.setCommunicationSpi(commSpi);
CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME);
ccfg.setAtomicityMode(atomicityMode());
ccfg.setBackups(1);
cfg.setCacheConfiguration(ccfg);
final TransactionConfiguration txCfg = new TransactionConfiguration();
txCfg.setDefaultTxTimeout(TX_TIMEOUT);
cfg.setTransactionConfiguration(txCfg);
return cfg;
}
/**
* @return Cache atomicity mode.
*/
public CacheAtomicityMode atomicityMode() {
return TRANSACTIONAL;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGrids(2);
}
/**
* Success if user tx was timed out.
*
* @throws Exception If failed.
*/
@Test
public void testUserTxTimeout() throws Exception {
Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-7952", MvccFeatureChecker.forcedMvcc());
final Ignite ignite = grid(0);
final IgniteCache<Object, Object> cache = ignite.getOrCreateCache(CACHE_NAME);
checkImplicitTxTimeout(cache);
checkExplicitTxTimeout(cache, ignite);
}
/**
* Success if system caches weren't timed out.
*
* @throws Exception If failed.
*/
@Test
public void testSystemCacheTx() throws Exception {
final Ignite ignite = grid(0);
final IgniteInternalCache<Object, Object> utilCache = getSystemCache(ignite, CU.UTILITY_CACHE_NAME);
checkImplicitTxSuccess(utilCache);
checkStartTxSuccess(utilCache);
}
/**
* Extract system cache from kernal.
*
* @param ignite Ignite instance.
* @param cacheName System cache name.
* @return Internal cache instance.
*/
protected IgniteInternalCache<Object, Object> getSystemCache(final Ignite ignite, final String cacheName) {
return ((IgniteKernal)ignite).context().cache().cache(cacheName);
}
/**
* Success if implicit tx fails.
*
* @param cache Cache name.
* @throws Exception If failed.
*/
protected void checkImplicitTxTimeout(final IgniteCache<Object, Object> cache) throws Exception {
TestCommunicationSpi.delay = true;
Integer key = primaryKey(ignite(1).cache(CACHE_NAME));
try {
cache.put(key, 0);
fail("Timeout exception must be thrown");
}
catch (CacheException ignored) {
// No-op.
}
finally {
TestCommunicationSpi.delay = false;
}
cache.clear();
}
/**
* Success if explicit tx fails.
*
* @param cache Cache name.
* @param ignite Ignite instance.
* @throws Exception If failed.
*/
protected void checkExplicitTxTimeout(final IgniteCache<Object, Object> cache, final Ignite ignite)
throws Exception {
try (final Transaction tx = ignite.transactions().txStart()) {
assert tx != null;
cache.put("key0", "val0");
sleepForTxFailure();
cache.put("key", "val");
fail("Timeout exception must be thrown");
}
catch (CacheException e) {
assert e.getCause() instanceof TransactionTimeoutException;
}
assertNull(ignite.transactions().tx());
assert !cache.containsKey("key0");
assert !cache.containsKey("key");
// New transaction must succeed.
try (final Transaction tx = ignite.transactions().txStart()) {
cache.put("key", "val");
tx.commit();
}
assert cache.containsKey("key");
}
/**
* Success if explicit tx doesn't fail.
*
* @param cache Cache instance.
* @throws Exception If failed.
*/
protected void checkStartTxSuccess(final IgniteInternalCache<Object, Object> cache) throws Exception {
try (final GridNearTxLocal tx = CU.txStartInternal(cache.context(), cache, PESSIMISTIC, REPEATABLE_READ)) {
assert tx != null;
sleepForTxFailure();
cache.put("key", "val");
tx.commit();
}
assert cache.containsKey("key");
cache.clear();
}
/**
* Success if implicit tx fails.
*
* @param cache Cache instance.
* @throws Exception If failed.
*/
protected void checkImplicitTxSuccess(final IgniteInternalCache<Object, Object> cache) throws Exception {
cache.invoke("key", new EntryProcessor<Object, Object, Object>() {
@Override public Object process(final MutableEntry<Object, Object> entry, final Object... args)
throws EntryProcessorException {
try {
sleepForTxFailure();
}
catch (InterruptedException e) {
throw new EntryProcessorException(e);
}
return null;
}
});
cache.clear();
}
/**
* Sleep multiple {@link #TX_TIMEOUT} times.
*
* @throws InterruptedException If interrupted.
*/
private void sleepForTxFailure() throws InterruptedException {
Thread.sleep(TX_TIMEOUT * 3);
}
/**
*
*/
private static class TestCommunicationSpi extends TcpCommunicationSpi {
/** Delay. */
private static volatile boolean delay;
/** {@inheritDoc} */
@Override public void sendMessage(
final ClusterNode node,
final Message msg,
final IgniteInClosure<IgniteException> ackC
) throws IgniteSpiException {
if (msg instanceof GridIoMessage) {
Message msg0 = ((GridIoMessage)msg).message();
if (msg0 instanceof GridNearTxPrepareRequest && delay) {
try {
U.sleep(TX_TIMEOUT * 2);
}
catch (IgniteInterruptedCheckedException e) {
e.printStackTrace();
}
}
}
super.sendMessage(node, msg, ackC);
}
}
}