blob: 0cc072a8bb44d3bd93ca44b5ecf8b7de9bc0d85c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.junit.Assume;
import org.junit.Test;
import static java.lang.Thread.sleep;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
* Tests an ability to eagerly rollback timed out transactions.
*/
public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
/** */
private static final long DURATION = SF.apply(60 * 1000);
/** */
private static final long TX_MIN_TIMEOUT = 1;
/** */
private static final String CACHE_NAME = "test";
/** */
private static final int GRID_CNT = 3;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setConsistentId(igniteInstanceName);
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
if (!"client".equals(igniteInstanceName)) {
CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME);
if (nearCacheEnabled())
ccfg.setNearConfiguration(new NearCacheConfiguration());
ccfg.setAtomicityMode(TRANSACTIONAL);
ccfg.setBackups(2);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
cfg.setCacheConfiguration(ccfg);
}
return cfg;
}
/**
* @return Near cache flag.
*/
protected boolean nearCacheEnabled() {
return false;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-7388", MvccFeatureChecker.forcedMvcc());
super.beforeTest();
startGridsMultiThreaded(GRID_CNT);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
}
/**
* @throws Exception If f nodeailed.
* @return Started client.
*/
private Ignite startClient() throws Exception {
Ignite client = startClientGrid("client");
assertTrue(client.configuration().isClientMode());
if (nearCacheEnabled())
client.createNearCache(CACHE_NAME, new NearCacheConfiguration<>());
else
assertNotNull(client.cache(CACHE_NAME));
return client;
}
/**
* @param e Exception.
*/
protected void validateDeadlockException(Exception e) {
assertEquals("Deadlock report is expected",
TransactionDeadlockException.class, e.getCause().getCause().getClass());
}
/**
* @throws Exception If failed.
*/
@Test
public void testLockAndConcurrentTimeout() throws Exception {
startClient();
for (Ignite node : G.allGrids()) {
log.info("Test with node: " + node.name());
lock(node, false);
lock(node, false);
lock(node, true);
}
}
/**
* @param node Node.
* @param retry {@code True}
* @throws Exception If failed.
*/
private void lock(final Ignite node, final boolean retry) throws Exception {
final IgniteCache<Object, Object> cache = node.cache(CACHE_NAME);
final int KEYS_PER_THREAD = 10_000;
GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
@Override public void apply(Integer idx) {
int start = idx * KEYS_PER_THREAD;
int end = start + KEYS_PER_THREAD;
int locked = 0;
try {
try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 500, 0)) {
for (int i = start; i < end; i++) {
cache.get(i);
locked++;
}
tx.commit();
}
}
catch (Exception e) {
info("Expected error: " + e);
}
info("Done, locked: " + locked);
if (retry) {
try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 10 * 60_000, 0)) {
for (int i = start; i < end; i++)
cache.get(i);
cache.put(start, 0);
tx.commit();
}
}
}
}, Math.min(4, Runtime.getRuntime().availableProcessors()), "tx-thread");
}
/**
* Tests if timeout on first tx unblocks second tx waiting for the locked key.
*
* @throws Exception If failed.
*/
@Test
public void testWaitingTxUnblockedOnTimeout() throws Exception {
waitingTxUnblockedOnTimeout(grid(0), grid(0));
waitingTxUnblockedOnTimeout(grid(0), grid(1));
Ignite client = startClient();
waitingTxUnblockedOnTimeout(grid(0), client);
waitingTxUnblockedOnTimeout(grid(1), client);
waitingTxUnblockedOnTimeout(client, grid(0));
waitingTxUnblockedOnTimeout(client, grid(1));
waitingTxUnblockedOnTimeout(client, client);
}
/**
* Tests if timeout on first tx unblocks second tx waiting for the locked key.
*
* @throws Exception If failed.
*/
@Test
public void testWaitingTxUnblockedOnThreadDeath() throws Exception {
waitingTxUnblockedOnThreadDeath(grid(0), grid(0));
waitingTxUnblockedOnThreadDeath(grid(0), grid(1));
Ignite client = startClient();
waitingTxUnblockedOnThreadDeath(grid(0), client);
waitingTxUnblockedOnThreadDeath(grid(1), client);
waitingTxUnblockedOnThreadDeath(client, grid(0));
waitingTxUnblockedOnThreadDeath(client, grid(1));
waitingTxUnblockedOnThreadDeath(client, client);
}
/**
* Tests if deadlock is resolved on timeout with correct message.
*
* @throws Exception If failed.
*/
@Test
public void testDeadlockUnblockedOnTimeout() throws Exception {
deadlockUnblockedOnTimeout(ignite(0), ignite(1));
deadlockUnblockedOnTimeout(ignite(0), ignite(0));
Ignite client = startClient();
deadlockUnblockedOnTimeout(ignite(0), client);
deadlockUnblockedOnTimeout(client, ignite(0));
}
/**
* Tests if deadlock is resolved on timeout with correct message.
*
* @param node1 First node.
* @param node2 Second node.
* @throws Exception If failed.
*/
private void deadlockUnblockedOnTimeout(final Ignite node1, final Ignite node2) throws Exception {
info("Start test [node1=" + node1.name() + ", node2=" + node2.name() + ']');
final CountDownLatch l = new CountDownLatch(2);
IgniteInternalFuture<?> fut1 = runAsync(new Runnable() {
@Override public void run() {
try {
try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 5000, 2)) {
node1.cache(CACHE_NAME).put(1, 10);
l.countDown();
U.awaitQuiet(l);
node1.cache(CACHE_NAME).put(2, 20);
tx.commit();
fail();
}
}
catch (CacheException e) {
// No-op.
validateDeadlockException(e);
}
}
}, "First");
IgniteInternalFuture<?> fut2 = runAsync(new Runnable() {
@Override public void run() {
try (Transaction tx = node2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 2)) {
node2.cache(CACHE_NAME).put(2, 2);
l.countDown();
U.awaitQuiet(l);
node2.cache(CACHE_NAME).put(1, 1);
tx.commit();
}
}
}, "Second");
fut1.get();
fut2.get();
assertTrue("Expecting committed key 2", node1.cache(CACHE_NAME).get(2) != null);
assertTrue("Expecting committed key 1", node1.cache(CACHE_NAME).get(1) != null);
node1.cache(CACHE_NAME).removeAll(F.asSet(1, 2));
}
/**
* Tests timeout object cleanup on tx commit.
*
* @throws Exception If failed.
*/
@Test
public void testTimeoutRemoval() throws Exception {
IgniteEx client = (IgniteEx)startClient();
final long TX_TIMEOUT = 250;
final int modesCnt = 5;
for (int i = 0; i < modesCnt; i++)
testTimeoutRemoval0(grid(0), i, TX_TIMEOUT);
for (int i = 0; i < modesCnt; i++)
testTimeoutRemoval0(client, i, TX_TIMEOUT);
for (int i = 0; i < modesCnt; i++)
testTimeoutRemoval0(grid(0), i, TX_MIN_TIMEOUT);
for (int i = 0; i < modesCnt; i++)
testTimeoutRemoval0(client, i, TX_MIN_TIMEOUT);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
// Repeat with more iterations to make sure everything is cleared.
for (int i = 0; i < 500; i++)
testTimeoutRemoval0(client, rnd.nextInt(modesCnt), TX_MIN_TIMEOUT);
}
/**
* Tests timeouts in all tx configurations.
*
* @throws Exception If failed.
*/
@Test
public void testSimple() throws Exception {
for (TransactionConcurrency concurrency : TransactionConcurrency.values())
for (TransactionIsolation isolation : TransactionIsolation.values()) {
for (int op = 0; op < 4; op++)
testSimple0(concurrency, isolation, op);
}
}
/**
* Test timeouts with random values and different tx configurations.
*/
@Test
public void testRandomMixedTxConfigurations() throws Exception {
final Ignite client = startClient();
final AtomicBoolean stop = new AtomicBoolean();
final long seed = System.currentTimeMillis();
final Random r = new Random(seed);
log.info("Using seed: " + seed);
final int threadsCnt = Runtime.getRuntime().availableProcessors() * 2;
for (int k = 0; k < threadsCnt; k++)
grid(0).cache(CACHE_NAME).put(k, (long)0);
final TransactionConcurrency[] TC_VALS = TransactionConcurrency.values();
final TransactionIsolation[] TI_VALS = TransactionIsolation.values();
final LongAdder cntr0 = new LongAdder();
final LongAdder cntr1 = new LongAdder();
final LongAdder cntr2 = new LongAdder();
final LongAdder cntr3 = new LongAdder();
final IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
@Override public void run() {
while (!stop.get()) {
int nodeId = r.nextInt(GRID_CNT + 1);
Ignite node = nodeId == GRID_CNT || nearCacheEnabled() ? client : grid(nodeId);
TransactionConcurrency conc = TC_VALS[r.nextInt(TC_VALS.length)];
TransactionIsolation isolation = TI_VALS[r.nextInt(TI_VALS.length)];
int k = r.nextInt(threadsCnt);
long timeout = r.nextInt(200) + 50;
// Roughly 50% of transactions should time out.
try (Transaction tx = node.transactions().txStart(conc, isolation, timeout, 1)) {
cntr0.add(1);
final Long v = (Long)node.cache(CACHE_NAME).get(k);
assertNotNull("Expecting not null value: " + tx, v);
final int delay = r.nextInt(400);
if (delay > 0)
sleep(delay);
node.cache(CACHE_NAME).put(k, v + 1);
tx.commit();
cntr1.add(1);
}
catch (TransactionTimeoutException e) {
cntr2.add(1);
}
catch (CacheException e) {
assertEquals(TransactionTimeoutException.class, X.getCause(e).getClass());
cntr2.add(1);
}
catch (Exception e) {
cntr3.add(1);
}
}
}
}, threadsCnt, "tx-async-thread");
sleep(DURATION);
stop.set(true);
try {
fut.get(30_000);
}
catch (IgniteFutureTimeoutCheckedException e) {
error("Transactions hang", e);
for (Ignite node : G.allGrids())
((IgniteKernal)node).dumpDebugInfo();
fut.cancel(); // Try to interrupt hanging threads.
throw e;
}
log.info("Tx test stats: started=" + cntr0.sum() +
", completed=" + cntr1.sum() +
", failed=" + cntr3.sum() +
", timedOut=" + cntr2.sum());
assertEquals("Expected finished count same as started count", cntr0.sum(), cntr1.sum() + cntr2.sum() +
cntr3.sum());
}
/**
* Tests timeout on DHT primary node for all tx configurations.
*
* @throws Exception If failed.
*/
@Test
public void testTimeoutOnPrimaryDHTNode() throws Exception {
final ClusterNode n0 = grid(0).affinity(CACHE_NAME).mapKeyToNode(0);
final Ignite prim = G.ignite(n0.id());
for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values())
testTimeoutOnPrimaryDhtNode0(prim, concurrency, isolation);
}
}
/**
*
*/
@Test
public void testLockRelease() throws Exception {
final Ignite client = startClient();
final AtomicInteger idx = new AtomicInteger();
final int threadCnt = Runtime.getRuntime().availableProcessors() * 2;
final CountDownLatch readStartLatch = new CountDownLatch(1);
final CountDownLatch commitLatch = new CountDownLatch(threadCnt - 1);
final IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
@Override public void run() {
final int idx0 = idx.getAndIncrement();
if (idx0 == 0) {
try (final Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 1)) {
client.cache(CACHE_NAME).put(0, 0); // Lock is owned.
readStartLatch.countDown();
U.awaitQuiet(commitLatch);
tx.commit();
}
}
else {
try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 300, 1)) {
U.awaitQuiet(readStartLatch);
client.cache(CACHE_NAME).get(0); // Lock acquisition is queued.
}
catch (CacheException e) {
assertTrue(e.getMessage(), X.hasCause(e, TransactionTimeoutException.class));
}
commitLatch.countDown();
}
}
}, threadCnt, "tx-async");
fut.get();
Thread.sleep(500);
assertEquals(0, client.cache(CACHE_NAME).get(0));
for (Ignite ignite : G.allGrids()) {
IgniteEx ig = (IgniteEx)ignite;
final IgniteInternalFuture<?> f = ig.context().cache().context().
partitionReleaseFuture(new AffinityTopologyVersion(G.allGrids().size() + 1, 0));
assertTrue("Unexpected incomplete future", f.isDone());
}
}
/**
*
*/
@Test
public void testEnlistManyRead() throws Exception {
testEnlistMany(false);
}
/**
*
*/
@Test
public void testEnlistManyWrite() throws Exception {
testEnlistMany(true);
}
/**
*
*/
@Test
public void testRollbackOnTimeoutTxRemapOptimisticReadCommitted() throws Exception {
doTestRollbackOnTimeoutTxRemap(OPTIMISTIC, READ_COMMITTED, true);
}
/**
*
*/
@Test
public void testRollbackOnTimeoutTxRemapOptimisticRepeatableRead() throws Exception {
doTestRollbackOnTimeoutTxRemap(OPTIMISTIC, REPEATABLE_READ, true);
}
/**
*
*/
@Test
public void testRollbackOnTimeoutTxRemapOptimisticSerializable() throws Exception {
doTestRollbackOnTimeoutTxRemap(OPTIMISTIC, SERIALIZABLE, true);
}
/**
*
*/
@Test
public void testRollbackOnTimeoutTxRemapPessimisticReadCommitted() throws Exception {
doTestRollbackOnTimeoutTxRemap(PESSIMISTIC, READ_COMMITTED, true);
}
/**
*
*/
@Test
public void testRollbackOnTimeoutTxRemapPessimisticRepeatableRead() throws Exception {
doTestRollbackOnTimeoutTxRemap(PESSIMISTIC, REPEATABLE_READ, true);
}
/**
*
*/
@Test
public void testRollbackOnTimeoutTxRemapPessimisticSerializable() throws Exception {
doTestRollbackOnTimeoutTxRemap(PESSIMISTIC, SERIALIZABLE, true);
}
/**
*
*/
@Test
public void testRollbackOnTimeoutTxServerRemapOptimisticReadCommitted() throws Exception {
doTestRollbackOnTimeoutTxRemap(OPTIMISTIC, READ_COMMITTED, false);
}
/**
*
*/
@Test
public void testRollbackOnTimeoutTxServerRemapOptimisticRepeatableRead() throws Exception {
doTestRollbackOnTimeoutTxRemap(OPTIMISTIC, REPEATABLE_READ, false);
}
/**
*
*/
@Test
public void testRollbackOnTimeoutTxServerRemapOptimisticSerializable() throws Exception {
doTestRollbackOnTimeoutTxRemap(OPTIMISTIC, SERIALIZABLE, false);
}
/**
*
*/
@Test
public void testRollbackOnTimeoutTxServerRemapPessimisticReadCommitted() throws Exception {
doTestRollbackOnTimeoutTxRemap(PESSIMISTIC, READ_COMMITTED, false);
}
/**
*
*/
@Test
public void testRollbackOnTimeoutTxServerRemapPessimisticRepeatableRead() throws Exception {
doTestRollbackOnTimeoutTxRemap(PESSIMISTIC, REPEATABLE_READ, false);
}
/**
*
*/
@Test
public void testRollbackOnTimeoutTxServerRemapPessimisticSerializable() throws Exception {
doTestRollbackOnTimeoutTxRemap(PESSIMISTIC, SERIALIZABLE, false);
}
/**
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param clientWait {@code True} to wait remap on client, otherwise wait remap on server.
*/
private void doTestRollbackOnTimeoutTxRemap(TransactionConcurrency concurrency,
TransactionIsolation isolation,
boolean clientWait) throws Exception {
IgniteEx client = (IgniteEx)startClient();
Ignite crd = grid(0);
assertTrue(crd.cluster().localNode().order() == 1);
List<Integer> keys = movingKeysAfterJoin(grid(1), CACHE_NAME, 1);
// Delay exchange finish on server nodes if clientWait=true, or on all nodes otherwise (excluding joining node).
TestRecordingCommunicationSpi.spi(crd).blockMessages((node,
msg) -> node.order() < 5 && msg instanceof GridDhtPartitionsFullMessage &&
(!clientWait || node.order() != grid(1).cluster().localNode().order()));
// Delay prepare until exchange is finished.
TestRecordingCommunicationSpi.spi(client).blockMessages((node, msg) -> {
boolean block = false;
if (concurrency == PESSIMISTIC) {
if (msg instanceof GridNearLockRequest) {
block = true;
assertEquals(GRID_CNT + 1, ((GridNearLockRequest)msg).topologyVersion().topologyVersion());
}
}
else {
if (msg instanceof GridNearTxPrepareRequest) {
block = true;
assertEquals(GRID_CNT + 1, ((GridNearTxPrepareRequest)msg).topologyVersion().topologyVersion());
}
}
return block;
});
// Start tx and map on topver=GRID_CNT + 1
// Delay map until exchange.
// Start new node.
IgniteInternalFuture fut0 = runAsync(new Runnable() {
@Override public void run() {
try (Transaction tx = client.transactions().txStart(concurrency, isolation, 5000, 1)) {
client.cache(CACHE_NAME).put(keys.get(0), 0);
tx.commit();
fail();
}
catch (Exception e) {
assertTrue(X.hasCause(e, TransactionTimeoutException.class));
}
}
});
IgniteInternalFuture fut1 = runAsync(new Runnable() {
@Override public void run() {
try {
TestRecordingCommunicationSpi.spi(client).waitForBlocked(); // TX is trying to prepare on prev top ver.
startGrid(GRID_CNT);
}
catch (Exception e) {
fail(e.getMessage());
}
}
});
IgniteInternalFuture fut2 = runAsync(new Runnable() {
@Override public void run() {
try {
// Wait for all full messages to be ready.
TestRecordingCommunicationSpi.spi(crd).waitForBlocked(GRID_CNT + (clientWait ? 0 : 1));
// Trigger remap.
TestRecordingCommunicationSpi.spi(client).stopBlock();
}
catch (Exception e) {
fail(e.getMessage());
}
}
});
fut0.get(30_000);
fut1.get(30_000);
fut2.get(30_000);
TestRecordingCommunicationSpi.spi(crd).stopBlock();
// FIXME: If using awaitPartitionMapExchange for waiting it some times fail while waiting for owners.
IgniteInternalFuture<?> topFut = ((IgniteEx)client).context().cache().context().exchange().
affinityReadyFuture(new AffinityTopologyVersion(GRID_CNT + 2, 1));
assertNotNull(topFut);
topFut.get(10_000);
checkFutures();
}
/**
*
*/
private void testEnlistMany(boolean write) throws Exception {
final Ignite client = startClient();
Map<Integer, Integer> entries = new HashMap<>();
for (int i = 0; i < 1000000; i++)
entries.put(i, i);
try(Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 200, 0)) {
if (write)
client.cache(CACHE_NAME).putAll(entries);
else
client.cache(CACHE_NAME).getAll(entries.keySet());
tx.commit();
}
catch (Throwable t) {
boolean timedOut = X.hasCause(t, TransactionTimeoutException.class);
if (!timedOut)
log.error("Got unexpected exception", t);
assertTrue(timedOut);
}
assertEquals(0, client.cache(CACHE_NAME).size());
}
/**
*
* @param prim Primary node.
* @param conc Concurrency.
* @param isolation Isolation.
* @throws Exception If failed.
*/
private void testTimeoutOnPrimaryDhtNode0(final Ignite prim, final TransactionConcurrency conc,
final TransactionIsolation isolation)
throws Exception {
log.info("concurrency=" + conc + ", isolation=" + isolation);
// Force timeout on primary DHT node by blocking DHT prepare response.
toggleBlocking(GridDhtTxPrepareResponse.class, prim, true);
final int val = 0;
try {
multithreaded(new Runnable() {
@Override public void run() {
try (Transaction txOpt = prim.transactions().txStart(conc, isolation, 300, 1)) {
prim.cache(CACHE_NAME).put(val, val);
txOpt.commit();
}
}
}, 1, "tx-async-thread");
fail();
}
catch (TransactionTimeoutException e) {
// Expected.
}
toggleBlocking(GridDhtTxPrepareResponse.class, prim, false);
AffinityTopologyVersion topVer = new AffinityTopologyVersion(GRID_CNT + 1, 0);
for (Ignite ignite : G.allGrids())
((IgniteEx)ignite).context().cache().context().partitionReleaseFuture(topVer).get(10_000);
}
/**
* @param cls Message class.
* @param nodeToBlock Node to block.
* @param block Block.
*/
private void toggleBlocking(Class<? extends Message> cls, Ignite nodeToBlock, boolean block) {
for (Ignite ignite : G.allGrids()) {
if (ignite == nodeToBlock)
continue;
final TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
if (block)
spi.blockMessages(cls, nodeToBlock.name());
else
spi.stopBlock(true);
}
}
/**
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param op Operation to test.
* @throws Exception If failed.
*/
private void testSimple0(TransactionConcurrency concurrency, TransactionIsolation isolation, int op) throws Exception {
Ignite near = grid(0);
final int key = 1, val = 1;
final long TX_TIMEOUT = 250;
IgniteCache<Object, Object> cache = near.cache(CACHE_NAME);
try (Transaction tx = near.transactions().txStart(concurrency, isolation, TX_TIMEOUT, 1)) {
cache.put(key, val);
U.sleep(TX_TIMEOUT * 2);
try {
switch (op) {
case 0:
cache.put(key + 1, val);
break;
case 1:
cache.remove(key + 1);
break;
case 2:
cache.get(key + 1);
break;
case 3:
tx.commit();
break;
default:
fail();
}
fail("Tx must timeout");
}
catch (CacheException | IgniteException e) {
assertTrue("Expected exception: " + e, X.hasCause(e, TransactionTimeoutException.class));
}
}
assertFalse("Must be removed by rollback on timeout", near.cache(CACHE_NAME).containsKey(key));
assertFalse("Must be removed by rollback on timeout", near.cache(CACHE_NAME).containsKey(key + 1));
assertNull(near.transactions().tx());
}
/**
* @param near Node.
* @param mode Test mode.
*
* @param timeout Tx timeout.
* @throws Exception If failed.
*/
private void testTimeoutRemoval0(IgniteEx near, int mode, long timeout) throws Exception {
Throwable saved = null;
try (Transaction tx = near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 1)) {
near.cache(CACHE_NAME).put(1, 1);
switch (mode) {
case 0:
tx.commit();
break;
case 1:
tx.commitAsync().get();
break;
case 2:
tx.rollback();
break;
case 3:
tx.rollbackAsync().get();
break;
case 4:
break;
default:
fail();
}
}
catch (Throwable t) {
saved = t;
}
Collection set = U.field(near.context().cache().context().time(), "timeoutObjs");
for (Object obj : set) {
if (obj.getClass().isAssignableFrom(GridNearTxLocal.class)) {
log.error("Last saved exception: " + saved, saved);
fail("Not removed [mode=" + mode + ", timeout=" + timeout + ", tx=" + obj +']');
}
}
}
/**
* @param near Node starting tx which is timed out.
* @param other Node starting second tx.
* @throws Exception If failed.
*/
private void waitingTxUnblockedOnTimeout(final Ignite near, final Ignite other) throws Exception {
waitingTxUnblockedOnTimeout(near, other, 1000);
waitingTxUnblockedOnTimeout(near, other, 50);
}
/**
* @param near Node starting tx which is timed out.
* @param other Node starting second tx.
* @param timeout Timeout.
* @throws Exception If failed.
*/
private void waitingTxUnblockedOnTimeout(final Ignite near, final Ignite other, final long timeout) throws Exception {
info("Start test [node1=" + near.name() + ", node2=" + other.name() + ']');
final CountDownLatch blocked = new CountDownLatch(1);
final CountDownLatch unblocked = new CountDownLatch(1);
final int recordsCnt = 5;
IgniteInternalFuture<?> fut1 = runAsync(new Runnable() {
@Override public void run() {
try (Transaction tx = near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 0)) {
try {
for (int i = 0; i < recordsCnt; i++)
near.cache(CACHE_NAME).put(i, i);
info("Locked all keys.");
}
catch (CacheException e) {
info("Failed to lock keys: " + e);
}
finally {
blocked.countDown();
}
// Will be unblocked after tx timeout occurs.
U.awaitQuiet(unblocked);
try {
near.cache(CACHE_NAME).put(0, 0);
fail();
}
catch (CacheException e) {
log.info("Expecting error: " + e.getMessage());
}
try {
tx.commit();
fail();
}
catch (IgniteException e) {
log.info("Expecting error: " + e.getMessage());
}
}
// Check thread is able to start new tx.
try (Transaction tx = near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 60_000, 0)) {
for (int i = 0; i < recordsCnt; i++)
near.cache(CACHE_NAME).put(i, i);
tx.commit();
}
}
}, "First");
IgniteInternalFuture<?> fut2 = runAsync(new Runnable() {
@Override public void run() {
U.awaitQuiet(blocked);
try (Transaction tx = other.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 1)) {
for (int i = 0; i < recordsCnt; i++)
other.cache(CACHE_NAME).put(i, i);
// Will wait until timeout on first tx will unblock put.
tx.commit();
}
}
}, "Second");
fut2.get();
unblocked.countDown();
fut1.get();
}
/**
* @param near Node starting tx which is timed out.
* @param other Node starting second tx.
* @throws Exception If failed.
*/
private void waitingTxUnblockedOnThreadDeath(final Ignite near, final Ignite other) throws Exception {
waitingTxUnblockedOnThreadDeath0(near, other, 10, 1000); // Try provoke timeout after all keys are locked.
waitingTxUnblockedOnThreadDeath0(near, other, 1000, 100); // Try provoke timeout while trying to lock keys.
}
/**
* @param near Node starting tx which is timed out.
* @param other Node starting second tx.
* @param recordsCnt Number of records to locks.
* @param timeout Transaction timeout.
* @throws Exception If failed.
*/
private void waitingTxUnblockedOnThreadDeath0(final Ignite near,
final Ignite other,
final int recordsCnt,
final long timeout)
throws Exception
{
info("Start test [node1=" + near.name() + ", node2=" + other.name() + ']');
final CountDownLatch blocked = new CountDownLatch(1);
IgniteInternalFuture<?> fut1 = multithreadedAsync(new Runnable() {
@Override public void run() {
near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, recordsCnt);
try {
for (int i = 0; i < recordsCnt; i++)
near.cache(CACHE_NAME).put(i, i);
log.info("Locked all records.");
}
catch (Exception e) {
log.info("Failed to locked all records: " + e);
}
finally {
blocked.countDown();
}
throw new IgniteException("Failure");
}
}, 1, "First");
IgniteInternalFuture<?> fut2 = multithreadedAsync(new Runnable() {
@Override public void run() {
U.awaitQuiet(blocked);
try (Transaction tx = other.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, recordsCnt)) {
for (int i = 0; i < recordsCnt; i++)
other.cache(CACHE_NAME).put(i, i);
// Will wait until timeout on first tx will unblock put.
tx.commit();
}
}
}, 1, "Second");
try {
fut1.get();
fail();
}
catch (IgniteCheckedException e) {
// No-op.
}
fut2.get();
}
}