blob: 8cac57ad12f5307f5117aa329df68178131289ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.jta;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import javax.transaction.Status;
import javax.transaction.UserTransaction;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
import org.apache.ignite.testframework.GridTestSafeThreadFactory;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import org.objectweb.jotm.Jotm;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionState.ACTIVE;
/**
* Abstract class for cache tests.
*/
public abstract class AbstractCacheJtaSelfTest extends GridCacheAbstractSelfTest {
/** */
private static final int GRID_CNT = 1;
/** Java Open Transaction Manager facade. */
protected static Jotm jotm;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
jotm = new Jotm(true, false);
super.beforeTestsStarted();
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
jotm.stop();
}
/** {@inheritDoc} */
@Override protected int gridCount() {
return GRID_CNT;
}
/** {@inheritDoc} */
@Override protected CacheMode cacheMode() {
return PARTITIONED;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
configureJta(cfg);
CacheConfiguration cfg1 = cacheConfiguration(igniteInstanceName);
CacheConfiguration cfg2 = cacheConfiguration(igniteInstanceName);
cfg2.setName("cache-2");
cfg.setCacheConfiguration(cfg1, cfg2);
return cfg;
}
/**
* @param cfg Ignite Configuration.
*/
protected abstract void configureJta(IgniteConfiguration cfg);
/**
* JUnit.
*
* @throws Exception If failed.
*/
@Test
public void testJta() throws Exception {
UserTransaction jtaTx = jotm.getUserTransaction();
IgniteCache<String, Integer> cache = jcache();
assert ignite(0).transactions().tx() == null;
jtaTx.begin();
try {
assert ignite(0).transactions().tx() == null;
assert cache.getAndPut("key", 1) == null;
Transaction tx = ignite(0).transactions().tx();
assert tx != null;
assert tx.state() == ACTIVE;
Integer one = 1;
assertEquals(one, cache.get("key"));
tx = ignite(0).transactions().tx();
assert tx != null;
assert tx.state() == ACTIVE;
jtaTx.commit();
assert ignite(0).transactions().tx() == null;
}
finally {
if (jtaTx.getStatus() == Status.STATUS_ACTIVE)
jtaTx.rollback();
}
assertEquals((Integer)1, cache.get("key"));
}
/**
* @throws Exception If failed.
*/
@Test
public void testJtaTwoCaches() throws Exception {
UserTransaction jtaTx = jotm.getUserTransaction();
IgniteEx ignite = grid(0);
IgniteCache<String, Integer> cache1 = jcache();
IgniteCache<Object, Object> cache2 = ignite.cache("cache-2");
assertNull(ignite.transactions().tx());
jtaTx.begin();
try {
cache1.put("key", 0);
cache2.put("key", 0);
cache1.put("key1", 1);
cache2.put("key2", 2);
assertEquals(0, (int)cache1.get("key"));
assertEquals(0, (int)cache2.get("key"));
assertEquals(1, (int)cache1.get("key1"));
assertEquals(2, (int)cache2.get("key2"));
assertEquals(ignite.transactions().tx().state(), ACTIVE);
jtaTx.commit();
assertNull(ignite.transactions().tx());
assertEquals(0, (int)cache1.get("key"));
assertEquals(0, (int)cache2.get("key"));
assertEquals(1, (int)cache1.get("key1"));
assertEquals(2, (int)cache2.get("key2"));
}
finally {
if (jtaTx.getStatus() == Status.STATUS_ACTIVE)
jtaTx.rollback();
}
assertEquals(0, (int)cache1.get("key"));
assertEquals(0, (int)cache2.get("key"));
assertEquals(1, (int)cache1.get("key1"));
assertEquals(2, (int)cache2.get("key2"));
}
/**
* @throws Exception If failed.
*/
@Test
public void testAsyncOpAwait() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
GridTestSafeThreadFactory factory = new GridTestSafeThreadFactory("JtaThread");
final CountDownLatch latch = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
@Override public Object call() throws Exception {
assertNull(grid(0).transactions().tx());
UserTransaction jtaTx = jotm.getUserTransaction();
jtaTx.begin();
try {
cache.put("key1", 1);
cache.putAsync("key", 1);
assertEquals(grid(0).transactions().tx().state(), ACTIVE);
latch.countDown();
info("Before JTA commit.");
}
finally {
jtaTx.commit();
}
info("After JTA commit.");
assertEquals((Integer)1, cache.get("key"));
return null;
}
};
Thread task = factory.newThread(c);
try (Transaction tx = ignite(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
cache.put("key", 0);
task.start();
latch.await();
while (task.getState() != Thread.State.WAITING)
factory.checkError();
info("Before cache TX commit.");
tx.commit();
}
}
}