blob: 11c4af5dc624ac6fbcf0f692a042312615100766 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.TransactionsMXBeanImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.mxbean.TransactionsMXBean;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionRollbackException;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.junit.Test;
import static org.apache.ignite.internal.util.typedef.X.hasCause;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
*
*/
public class SetTxTimeoutOnPartitionMapExchangeTest extends GridCommonAbstractTest {
/** Wait condition timeout. */
private static final long WAIT_CONDITION_TIMEOUT = 10_000L;
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
}
/**
*
*/
@Test
public void testDefaultTxTimeoutOnPartitionMapExchange() throws Exception {
IgniteEx ig1 = startGrid(1);
IgniteEx ig2 = startGrid(2);
TransactionConfiguration txCfg1 = ig1.configuration().getTransactionConfiguration();
TransactionConfiguration txCfg2 = ig2.configuration().getTransactionConfiguration();
final long expDfltTimeout = TransactionConfiguration.TX_TIMEOUT_ON_PARTITION_MAP_EXCHANGE;
assertEquals(expDfltTimeout, txCfg1.getTxTimeoutOnPartitionMapExchange());
assertEquals(expDfltTimeout, txCfg2.getTxTimeoutOnPartitionMapExchange());
}
/**
*
*/
@Test
public void testJmxSetTxTimeoutOnPartitionMapExchange() throws Exception {
startGrid(1);
startGrid(2);
TransactionsMXBean mxBean1 = txMXBean(1);
TransactionsMXBean mxBean2 = txMXBean(2);
final long expTimeout1 = 20_000L;
final long expTimeout2 = 30_000L;
mxBean1.setTxTimeoutOnPartitionMapExchange(expTimeout1);
assertTxTimeoutOnPartitionMapExchange(expTimeout1);
assertEquals(expTimeout1, mxBean1.getTxTimeoutOnPartitionMapExchange());
mxBean2.setTxTimeoutOnPartitionMapExchange(expTimeout2);
assertTxTimeoutOnPartitionMapExchange(expTimeout2);
assertEquals(expTimeout2, mxBean2.getTxTimeoutOnPartitionMapExchange());
}
/**
*
*/
@Test
public void testClusterSetTxTimeoutOnPartitionMapExchange() throws Exception {
Ignite ig1 = startGrid(1);
Ignite ig2 = startGrid(2);
final long expTimeout1 = 20_000L;
final long expTimeout2 = 30_000L;
ig1.cluster().setTxTimeoutOnPartitionMapExchange(expTimeout1);
assertTxTimeoutOnPartitionMapExchange(expTimeout1);
ig2.cluster().setTxTimeoutOnPartitionMapExchange(expTimeout2);
assertTxTimeoutOnPartitionMapExchange(expTimeout2);
}
/**
* Tests applying new txTimeoutOnPartitionMapExchange while an exchange future runs.
*
* @throws Exception If fails.
*/
@Test
public void testSetTxTimeoutDuringPartitionMapExchange() throws Exception {
IgniteEx ig = (IgniteEx)startGrids(2);
checkSetTxTimeoutDuringPartitionMapExchange(ig);
}
/**
* Tests applying new txTimeoutOnPartitionMapExchange while an exchange future runs on client node.
*
* @throws Exception If fails.
*/
@Test
public void testSetTxTimeoutOnClientDuringPartitionMapExchange() throws Exception {
IgniteEx ig = startGrids(2);
IgniteEx client = startClientGrid(getConfiguration("client"));
checkSetTxTimeoutDuringPartitionMapExchange(client);
}
/**
* @param ig Ignite instance where deadlock tx will start.
* @throws Exception If fails.
*/
private void checkSetTxTimeoutDuringPartitionMapExchange(IgniteEx ig) throws Exception {
final long longTimeout = 600_000L;
final long shortTimeout = 5_000L;
TransactionsMXBean mxBean = txMXBean(0);
// Case 1: set very long txTimeoutOnPME, transaction should be rolled back.
mxBean.setTxTimeoutOnPartitionMapExchange(longTimeout);
assertTxTimeoutOnPartitionMapExchange(longTimeout);
AtomicReference<Exception> txEx = new AtomicReference<>();
IgniteInternalFuture<Long> fut = startDeadlock(ig, txEx, 0);
startGridAsync(2);
waitForExchangeStarted(ig);
mxBean.setTxTimeoutOnPartitionMapExchange(shortTimeout);
awaitPartitionMapExchange();
fut.get();
assertTrue("Transaction should be rolled back", hasCause(txEx.get(), TransactionRollbackException.class));
// Case 2: txTimeoutOnPME will be set to 0 after starting of PME, transaction should be cancelled on timeout.
mxBean.setTxTimeoutOnPartitionMapExchange(longTimeout);
assertTxTimeoutOnPartitionMapExchange(longTimeout);
fut = startDeadlock(ig, txEx, 10000L);
startGridAsync(3);
waitForExchangeStarted(ig);
mxBean.setTxTimeoutOnPartitionMapExchange(0);
fut.get();
assertTrue("Transaction should be canceled on timeout", hasCause(txEx.get(), TransactionTimeoutException.class));
}
/**
* Start test deadlock
*
* @param ig Ig.
* @param txEx Atomic reference to transaction exception.
* @param timeout Transaction timeout.
*/
private IgniteInternalFuture<Long> startDeadlock(Ignite ig, AtomicReference<Exception> txEx, long timeout)
throws InterruptedException {
// Without the awaiting txs can map on different topology versions causing unexpected behavior.
awaitPartitionMapExchange();
IgniteCache<Object, Object> cache = ig.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
AtomicInteger thCnt = new AtomicInteger();
CyclicBarrier barrier = new CyclicBarrier(2);
return GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() {
int thNum = thCnt.incrementAndGet();
try (Transaction tx = ig.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 0)) {
cache.put(thNum, 1);
barrier.await();
cache.put(thNum % 2 + 1, 1);
tx.commit();
}
catch (Exception e) {
txEx.set(e);
}
return null;
}
}, 2, "tx-thread");
}
/**
* Starts grid asynchronously and returns just before grid starting.
* Avoids blocking on PME.
*
* @param idx Test grid index.
* @throws Exception If fails.
*/
private void startGridAsync(int idx) throws Exception {
GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
try {
startGrid(idx);
}
catch (Exception ignore) {
// no-op.
}
}
});
}
/**
* Waits for srarting PME on grid.
*
* @param ig Ignite grid.
* @throws IgniteCheckedException If fails.
*/
private void waitForExchangeStarted(IgniteEx ig) throws IgniteCheckedException {
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
for (GridDhtPartitionsExchangeFuture fut: ig.context().cache().context().exchange().exchangeFutures()) {
if (!fut.isDone())
return true;
}
return false;
}
}, WAIT_CONDITION_TIMEOUT);
// Additional waiting to ensure that code really start waiting for partition release.
U.sleep(5_000L);
}
/** */
private TransactionsMXBean txMXBean(int igniteInt) throws Exception {
return getMxBean(getTestIgniteInstanceName(igniteInt), "Transactions",
TransactionsMXBeanImpl.class, TransactionsMXBean.class);
}
/**
* Checking the transaction timeout on all grids.
*
* @param expTimeout Expected timeout.
* @throws IgniteInterruptedCheckedException If failed.
*/
private void assertTxTimeoutOnPartitionMapExchange(final long expTimeout)
throws IgniteInterruptedCheckedException {
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
for (Ignite ignite : G.allGrids()) {
long actualTimeout = ignite.configuration()
.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
if (actualTimeout != expTimeout) {
log.warning(String.format(
"Wrong transaction timeout on partition map exchange [grid=%s, timeout=%d, expected=%d]",
ignite.name(), actualTimeout, expTimeout));
return false;
}
}
return true;
}
}, WAIT_CONDITION_TIMEOUT));
}
}