blob: 7781d24dfacc6fa4f301ff97130f977880ab27af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.transactions;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.junit.Test;
import static org.apache.ignite.internal.util.typedef.X.hasCause;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
*
*/
public class TxDeadlockDetectionTest extends GridCommonAbstractTest {
/** Nodes count. */
private static final int NODES_CNT = 3;
/** Cache. */
private static final String CACHE = "cache";
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (isDebug()) {
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.failureDetectionTimeoutEnabled(false);
cfg.setDiscoverySpi(discoSpi);
}
TcpCommunicationSpi commSpi = new TestCommunicationSpi();
cfg.setCommunicationSpi(commSpi);
CacheConfiguration ccfg = defaultCacheConfiguration();
ccfg.setName(CACHE);
ccfg.setCacheMode(CacheMode.PARTITIONED);
ccfg.setBackups(1);
ccfg.setNearConfiguration(null);
cfg.setCacheConfiguration(ccfg);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGridsMultiThreaded(NODES_CNT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoHangs() throws Exception {
final AtomicBoolean stop = new AtomicBoolean();
IgniteInternalFuture<Long> restartFut = null;
try {
restartFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
while (!stop.get()) {
try {
U.sleep(500);
startGrid(NODES_CNT);
awaitPartitionMapExchange();
U.sleep(500);
stopGrid(NODES_CNT);
}
catch (Exception ignored) {
// No-op.
}
}
}
}, 1, "restart-thread");
long stopTime = System.currentTimeMillis() + 2 * 60_000L;
for (int i = 0; System.currentTimeMillis() < stopTime; i++) {
log.info(">>> Iteration " + i);
final AtomicInteger threadCnt = new AtomicInteger();
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
int threadNum = threadCnt.getAndIncrement();
Ignite ignite = ignite(threadNum % NODES_CNT);
IgniteCache<Integer, Integer> cache = ignite.cache(CACHE);
try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 700, 0)) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < 50; i++) {
int key = rnd.nextInt(50);
if (log.isDebugEnabled()) {
log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
", tx=" + tx + ", key=" + key + ']');
}
cache.put(key, 0);
}
tx.commit();
}
catch (Exception e) {
e.printStackTrace();
}
}
}, NODES_CNT * 3, "tx-thread");
fut.get();
}
}
finally {
stop.set(true);
if (restartFut != null)
restartFut.get();
checkDetectionFuts();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoDeadlockSimple() throws Exception {
final AtomicInteger threadCnt = new AtomicInteger();
final AtomicBoolean deadlock = new AtomicBoolean();
final AtomicBoolean timedOut = new AtomicBoolean();
final CyclicBarrier barrier = new CyclicBarrier(2);
final long timeout = 500;
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
int threadNum = threadCnt.getAndIncrement();
Ignite ignite = ignite(threadNum);
IgniteCache<Integer, Integer> cache = ignite.cache(CACHE);
try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 0)) {
int key = 42;
if (log.isDebugEnabled())
log.debug(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
", tx=" + tx + ", key=" + key + ']');
cache.put(key, 0);
barrier.await(timeout + 1000, TimeUnit.MILLISECONDS);
tx.commit();
}
catch (Exception e) {
if (hasCause(e, TransactionTimeoutException.class))
timedOut.set(true);
if (hasCause(e, TransactionDeadlockException.class))
deadlock.set(true);
}
}
}, 2, "tx-thread");
fut.get();
assertTrue(timedOut.get());
assertFalse(deadlock.get());
checkDetectionFuts();
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoDeadlock() throws Exception {
for (int i = 2; i <= 10; i++) {
final int threads = i;
log.info(">>> Test with " + threads + " transactions.");
final AtomicInteger threadCnt = new AtomicInteger();
final AtomicBoolean deadlock = new AtomicBoolean();
final AtomicBoolean timedOut = new AtomicBoolean();
final CyclicBarrier barrier = new CyclicBarrier(threads);
final long timeout = 500;
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
int threadNum = threadCnt.incrementAndGet();
Ignite ignite = ignite(threadNum % NODES_CNT);
IgniteCache<Integer, Integer> cache = ignite.cache(CACHE);
try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 0)) {
int key1 = threadNum;
log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
", tx=" + tx + ", key=" + key1 + ']');
cache.put(key1, 0);
barrier.await();
if (threadNum == threads) {
log.info(">>> Performs sleep. [node=" + ((IgniteKernal)ignite).localNode() +
", tx=" + tx + ']');
U.sleep(timeout * 3);
}
else {
int key2 = threadNum + 1;
log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
", tx=" + tx + ", key2=" + key2 + ']');
cache.put(key2, 1);
}
tx.commit();
}
catch (Exception e) {
if (hasCause(e, TransactionTimeoutException.class))
timedOut.set(true);
if (hasCause(e, TransactionDeadlockException.class))
deadlock.set(true);
}
}
}, threads, "tx-thread");
fut.get();
assertTrue(timedOut.get());
assertFalse(deadlock.get());
checkDetectionFuts();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testFailedTxLocksRequest() throws Exception {
doTestFailedMessage(TxLocksRequest.class);
}
/**
* @throws Exception If failed.
*/
@Test
public void testFailedTxLocksResponse() throws Exception {
doTestFailedMessage(TxLocksResponse.class);
}
/**
* @param failCls Failing message class.
* @throws Exception If failed.
*/
private void doTestFailedMessage(Class failCls) throws Exception {
try {
final int txCnt = 2;
final CyclicBarrier barrier = new CyclicBarrier(txCnt);
final AtomicInteger threadCnt = new AtomicInteger();
final AtomicBoolean deadlock = new AtomicBoolean();
final AtomicBoolean timeout = new AtomicBoolean();
TestCommunicationSpi.failCls = failCls;
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
int num = threadCnt.getAndIncrement();
Ignite ignite = ignite(num);
IgniteCache<Object, Integer> cache = ignite.cache(CACHE);
try (Transaction tx =
ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, num == 0 ? 500 : 1500, 0)
) {
int key1 = primaryKey(ignite((num + 1) % txCnt).cache(CACHE));
log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
", tx=" + tx + ", key=" + key1 + ']');
cache.put(new TestKey(key1), 1);
barrier.await();
int key2 = primaryKey(cache);
log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
", tx=" + tx + ", key=" + key2 + ']');
cache.put(new TestKey(key2), 2);
tx.commit();
}
catch (Exception e) {
timeout.compareAndSet(false, hasCause(e, TransactionTimeoutException.class));
deadlock.compareAndSet(false, hasCause(e, TransactionDeadlockException.class));
}
}
}, txCnt, "tx-thread");
fut.get();
assertFalse(deadlock.get());
assertTrue(timeout.get());
checkDetectionFuts();
}
finally {
TestCommunicationSpi.failCls = null;
TestKey.failSer = false;
}
}
/**
*
*/
private void checkDetectionFuts() {
for (int i = 0; i < NODES_CNT; i++) {
Ignite ignite = ignite(i);
IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
assertTrue(futs.isEmpty());
}
}
/**
*
*/
private static class TestKey implements Externalizable {
/** Fail request. */
private static volatile boolean failSer = false;
/** Id. */
private int id;
/**
* Default constructor (required by Externalizable).
*/
public TestKey() {
// No-op.
}
/**
* @param id Id.
*/
public TestKey(int id) {
this.id = id;
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(id);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
if (failSer) {
TestCommunicationSpi.failCls = null;
failSer = false;
throw new IOException();
}
id = in.readInt();
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TestKey key = (TestKey)o;
return id == key.id;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return id;
}
}
/**
*
*/
private static class TestCommunicationSpi extends TcpCommunicationSpi {
/** Fail response. */
private static volatile Class failCls;
/** {@inheritDoc} */
@Override public void sendMessage(
ClusterNode node,
Message msg,
IgniteInClosure<IgniteException> ackC
) throws IgniteSpiException {
if (failCls != null && msg instanceof GridIoMessage &&
((GridIoMessage)msg).message().getClass() == failCls)
TestKey.failSer = true;
super.sendMessage(node, msg, ackC);
}
}
}