blob: 3fd1d0224be1f7aa0128bbf0b15414ebf72e5b3f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicemix.jbi.messaging;
import java.sql.Connection;
import javax.jbi.JBIException;
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.resource.spi.ConnectionManager;
import javax.resource.spi.ManagedConnectionFactory;
import javax.sql.DataSource;
import javax.sql.XADataSource;
import javax.transaction.Status;
import javax.transaction.TransactionManager;
import javax.xml.namespace.QName;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.derby.jdbc.EmbeddedXADataSource;
import org.apache.servicemix.MessageExchangeListener;
import org.apache.servicemix.client.DefaultServiceMixClient;
import org.apache.servicemix.client.ServiceMixClient;
import org.apache.servicemix.components.util.ComponentSupport;
import org.apache.servicemix.jbi.container.JBIContainer;
import org.apache.servicemix.jbi.jaxp.StringSource;
import org.apache.servicemix.jbi.nmr.flow.Flow;
import org.apache.servicemix.jbi.nmr.flow.jca.JCAFlow;
import org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow;
import org.apache.servicemix.store.Store;
import org.apache.servicemix.store.jdbc.JdbcStoreFactory;
import org.apache.servicemix.tck.ExchangeCompletedListener;
import org.jencks.GeronimoPlatformTransactionManager;
import org.jencks.factory.ConnectionManagerFactoryBean;
import org.tranql.connector.AllExceptionsAreFatalSorter;
import org.tranql.connector.jdbc.AbstractXADataSourceMCF;
public class TransactionsTest extends TestCase {
public static final long TIMEOUT = 1000;
private JBIContainer jbi;
private BrokerService broker;
private TransactionManager tm;
private ServiceMixClient client;
private DataSource dataSource;
private Connection connection;
private Store store;
private ExchangeCompletedListener listener;
protected void setUp() throws Exception {
// Create an AMQ broker
broker = new BrokerService();
broker.setUseJmx(false);
broker.setPersistent(false);
broker.addConnector("tcp://localhost:61616");
broker.start();
tm = new GeronimoPlatformTransactionManager();
// Create an embedded database for testing tx results when commit / rollback
ConnectionManagerFactoryBean factory = new ConnectionManagerFactoryBean();
factory.setTransactionManager(tm);
factory.setTransaction("xa");
factory.afterPropertiesSet();
ConnectionManager cm = (ConnectionManager) factory.getObject();
ManagedConnectionFactory mcf = new DerbyDataSourceMCF("target/testdb");
dataSource = (DataSource) mcf.createConnectionFactory(cm);
connection = dataSource.getConnection();
JdbcStoreFactory storeFactory = new JdbcStoreFactory();
storeFactory.setDataSource(dataSource);
storeFactory.setTransactional(true);
store = storeFactory.open("store");
jbi = new JBIContainer();
jbi.setFlows(new Flow[] { new SedaFlow(), new JCAFlow() });
jbi.setEmbedded(true);
jbi.setUseMBeanServer(false);
jbi.setCreateMBeanServer(false);
jbi.setTransactionManager(tm);
jbi.setAutoEnlistInTransaction(true);
listener = new ExchangeCompletedListener();
jbi.addListener(listener);
jbi.init();
jbi.start();
client = new DefaultServiceMixClient(jbi);
}
protected void tearDown() throws Exception {
listener.assertExchangeCompleted();
jbi.shutDown();
Thread.sleep(100);
broker.stop();
connection.close();
}
protected InOnly createInOnly() throws Exception {
InOnly me = client.createInOnlyExchange();
me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
me.setService(new QName("service"));
return me;
}
protected InOut createInOut() throws Exception {
InOut me = client.createInOutExchange();
me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
me.setService(new QName("service"));
return me;
}
public void testInOnlyAsyncSendAndListener() throws Exception {
jbi.activateComponent(new Listener(false, false), "target");
MessageExchange me = createInOnly();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.send(me);
assertNull(client.receive(TIMEOUT));
tm.commit();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
me = client.receive(TIMEOUT);
assertNotNull(me);
assertEquals(ExchangeStatus.DONE, me.getStatus());
assertTrue(me.isTransacted());
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNotNull(store.load(me.getExchangeId()));
}
public void testInOnlyAsyncSendAndListenerWithRollback() throws Exception {
jbi.activateComponent(new Listener(false, true), "target");
MessageExchange me = createInOnly();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.send(me);
assertNull(client.receive(TIMEOUT));
tm.commit();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNull(client.receive(TIMEOUT));
assertNull(store.load(me.getExchangeId()));
}
public void testInOnlySyncSendAndListener() throws Exception {
jbi.activateComponent(new Listener(false, false), "target");
MessageExchange me = createInOnly();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.sendSync(me, TIMEOUT);
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
assertEquals(ExchangeStatus.DONE, me.getStatus());
assertTrue(me.isTransacted());
tm.commit();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNotNull(store.load(me.getExchangeId()));
}
public void testInOnlySyncSendAndListenerWithProviderRollback() throws Exception {
jbi.activateComponent(new Listener(false, true), "target");
MessageExchange me = createInOnly();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.sendSync(me, TIMEOUT);
assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
assertEquals(ExchangeStatus.DONE, me.getStatus());
assertTrue(me.isTransacted());
tm.rollback();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNull(store.load(me.getExchangeId()));
}
public void testInOnlySyncSendAndListenerWithConsumerRollback() throws Exception {
jbi.activateComponent(new Listener(false, false), "target");
MessageExchange me = createInOnly();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.sendSync(me, TIMEOUT);
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
tm.setRollbackOnly();
assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
assertEquals(ExchangeStatus.DONE, me.getStatus());
assertTrue(me.isTransacted());
tm.rollback();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNull(store.load(me.getExchangeId()));
}
public void testInOnlyAsyncSendAndPoll() throws Exception {
jbi.activateComponent(new Async(false, false), "target");
MessageExchange me = createInOnly();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.send(me);
assertNull(client.receive(TIMEOUT));
tm.commit();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
me = client.receive(TIMEOUT);
assertNotNull(me);
assertEquals(ExchangeStatus.DONE, me.getStatus());
assertTrue(me.isTransacted());
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNotNull(store.load(me.getExchangeId()));
}
public void testInOnlyAsyncSendAndPollWithRollback() throws Exception {
jbi.activateComponent(new Async(false, true), "target");
MessageExchange me = createInOnly();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.send(me);
assertNull(client.receive(TIMEOUT));
tm.commit();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNull(client.receive(TIMEOUT));
assertNull(store.load(me.getExchangeId()));
}
public void testInOnlySyncSendAndPoll() throws Exception {
jbi.activateComponent(new Async(false, false), "target");
MessageExchange me = createInOnly();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.sendSync(me, TIMEOUT);
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
assertEquals(ExchangeStatus.DONE, me.getStatus());
assertTrue(me.isTransacted());
tm.commit();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNotNull(store.load(me.getExchangeId()));
}
public void testInOnlySyncSendAndPollWithProviderRollback() throws Exception {
jbi.activateComponent(new Async(false, true), "target");
MessageExchange me = createInOnly();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.sendSync(me, TIMEOUT);
assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
assertEquals(ExchangeStatus.DONE, me.getStatus());
assertTrue(me.isTransacted());
tm.rollback();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNull(store.load(me.getExchangeId()));
}
public void testInOnlySyncSendAndPollWithConsumerRollback() throws Exception {
jbi.activateComponent(new Async(false, false), "target");
MessageExchange me = createInOnly();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.sendSync(me, TIMEOUT);
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
tm.setRollbackOnly();
assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
assertEquals(ExchangeStatus.DONE, me.getStatus());
assertTrue(me.isTransacted());
tm.rollback();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNull(store.load(me.getExchangeId()));
}
public void testInOutAsyncSendAndAsyncSendAndListener() throws Exception {
jbi.activateComponent(new Listener(false, false), "target");
MessageExchange me = createInOut();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.send(me);
assertNull(client.receive(TIMEOUT));
tm.commit();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
me = client.receive(TIMEOUT);
assertNotNull(me);
assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
assertTrue(me.isTransacted());
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.done(me);
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNotNull(store.load(me.getExchangeId()));
}
/*
* NOT SUPPORTED
*
public void testInOutAsyncSendAndSyncSendAndListener() throws Exception {
jbi.activateComponent(new Listener(true, false), "target");
MessageExchange me = createInOut();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.send(me);
assertNull(client.receive(TIMEOUT));
tm.commit();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
me = client.receive(TIMEOUT);
assertNotNull(me);
assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
assertTrue(me.isTransacted());
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.done(me);
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNotNull(store.load(me.getExchangeId()));
}
*/
/*
* NOT SUPPORTED
*
public void testInOutSyncSendAndAsyncSendAndListener() throws Exception {
jbi.activateComponent(new Listener(false, false), "target");
MessageExchange me = createInOut();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.sendSync(me, TIMEOUT);
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
assertTrue(me.isTransacted());
client.done(me);
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
tm.commit();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNotNull(store.load(me.getExchangeId()));
}
*/
public void testInOutSyncSendAndSyncSendAndListener() throws Exception {
jbi.activateComponent(new Listener(true, false), "target");
MessageExchange me = createInOut();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.sendSync(me, TIMEOUT);
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
assertTrue(me.isTransacted());
client.done(me);
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
tm.commit();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNotNull(store.load(me.getExchangeId()));
}
public void testInOutAsyncSendAndAsyncSendAndPoll() throws Exception {
jbi.activateComponent(new Async(false, false), "target");
MessageExchange me = createInOut();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.send(me);
assertNull(client.receive(TIMEOUT));
tm.commit();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
me = client.receive(TIMEOUT);
assertNotNull(me);
assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
assertTrue(me.isTransacted());
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.done(me);
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNotNull(store.load(me.getExchangeId()));
}
public void testInOutSyncSendAndSyncSendAndPoll() throws Exception {
jbi.activateComponent(new Async(true, false), "target");
MessageExchange me = createInOut();
tm.begin();
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
client.sendSync(me, TIMEOUT);
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
assertTrue(me.isTransacted());
client.done(me);
assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
tm.commit();
assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
assertNotNull(store.load(me.getExchangeId()));
}
protected class Async extends ComponentSupport implements Runnable {
private boolean sync;
private boolean rollback;
private Thread runner;
private boolean running;
public Async(boolean sync, boolean rollback) {
this.sync = sync;
this.rollback = rollback;
setService(new QName("service"));
setEndpoint("endpoint");
}
public synchronized void start() throws JBIException {
if (!running) {
running = true;
runner = new Thread(this);
runner.start();
}
}
public void run() {
while (running) {
try {
DeliveryChannel deliveryChannel = getContext().getDeliveryChannel();
MessageExchange messageExchange = deliveryChannel.accept();
process(messageExchange);
}
catch (Exception e) {
e.printStackTrace();
}
}
}
public synchronized void stop() throws JBIException {
running = false;
}
protected void process(MessageExchange exchange) throws Exception {
if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
return;
}
try {
store.store(exchange.getExchangeId(), exchange);
} catch (Exception e) {
throw new MessagingException(e);
}
if (rollback) {
try {
tm.setRollbackOnly();
} catch (Exception e) {
throw new MessagingException(e);
}
}
if (exchange instanceof InOnly) {
exchange.setStatus(ExchangeStatus.DONE);
getDeliveryChannel().send(exchange);
} else {
NormalizedMessage msg = exchange.createMessage();
msg.setContent(exchange.getMessage("in").getContent());
exchange.setMessage(msg, "out");
if (sync) {
getDeliveryChannel().sendSync(exchange);
} else {
getDeliveryChannel().send(exchange);
}
}
}
}
protected class Listener extends ComponentSupport implements MessageExchangeListener {
private boolean sync;
private boolean rollback;
public Listener(boolean sync, boolean rollback) {
this.sync = sync;
this.rollback = rollback;
setService(new QName("service"));
setEndpoint("endpoint");
}
public void onMessageExchange(MessageExchange exchange) throws MessagingException {
if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
return;
}
try {
store.store(exchange.getExchangeId(), exchange);
} catch (Exception e) {
throw new MessagingException(e);
}
if (rollback) {
try {
tm.setRollbackOnly();
} catch (Exception e) {
throw new MessagingException(e);
}
}
if (exchange instanceof InOnly) {
exchange.setStatus(ExchangeStatus.DONE);
getDeliveryChannel().send(exchange);
} else {
NormalizedMessage msg = exchange.createMessage();
msg.setContent(exchange.getMessage("in").getContent());
exchange.setMessage(msg, "out");
if (sync) {
getDeliveryChannel().sendSync(exchange, TIMEOUT);
} else {
getDeliveryChannel().send(exchange);
}
}
}
}
public static class DerbyDataSourceMCF extends AbstractXADataSourceMCF {
private static final long serialVersionUID = 7971682207810098396L;
protected DerbyDataSourceMCF(String dbName) {
super(createXADS(dbName), new AllExceptionsAreFatalSorter());
}
public String getPassword() {
return null;
}
public String getUserName() {
return null;
}
protected static XADataSource createXADS(String dbName) {
EmbeddedXADataSource xads = new EmbeddedXADataSource();
xads.setDatabaseName(dbName);
xads.setCreateDatabase("create");
return xads;
}
}
}