blob: c48420a97b2b029dab444c387123bce00a514e12 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XAQueueConnection;
import javax.jms.XASession;
import javax.jms.XATopicConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransactionBroker;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.management.JMSConnectionStatsImpl;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQXAConnectionFactoryTest.class);
long txGenerator = System.currentTimeMillis();
private ActiveMQConnection connection;
private BrokerService broker;
@Override
public void tearDown() throws Exception {
// Try our best to close any previously opend connection.
try {
connection.close();
} catch (Throwable ignore) {
}
// Try our best to stop any previously started broker.
try {
broker.stop();
} catch (Throwable ignore) {
}
}
protected ActiveMQConnectionFactory getXAConnectionFactory(String brokerUrl) {
return new ActiveMQXAConnectionFactory(brokerUrl);
}
protected ActiveMQConnectionFactory getXAConnectionFactory(URI uri) {
return new ActiveMQXAConnectionFactory(uri);
}
public void testCopy() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = getXAConnectionFactory("vm://localhost?");
ActiveMQConnectionFactory copy = cf.copy();
assertTrue("Should be an ActiveMQXAConnectionFactory", copy.getClass().equals(cf.getClass()));
}
public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = getXAConnectionFactory("vm://localhost?jms.useAsyncSend=true");
assertTrue(cf.isUseAsyncSend());
// the broker url have been adjusted.
assertEquals("vm://localhost", cf.getBrokerURL());
cf = getXAConnectionFactory("vm://localhost?jms.useAsyncSend=false");
assertFalse(cf.isUseAsyncSend());
// the broker url have been adjusted.
assertEquals("vm://localhost", cf.getBrokerURL());
cf = getXAConnectionFactory("vm:(broker:()/localhost)?jms.useAsyncSend=true");
assertTrue(cf.isUseAsyncSend());
// the broker url have been adjusted.
assertEquals("vm:(broker:()/localhost)", cf.getBrokerURL());
cf = getXAConnectionFactory(
"vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=10&" +
"jms.redeliveryPolicy.initialRedeliveryDelay=10000&" +
"jms.redeliveryPolicy.redeliveryDelay=10000&" +
"jms.redeliveryPolicy.useExponentialBackOff=true&" +
"jms.redeliveryPolicy.backOffMultiplier=2");
assertEquals(10, cf.getRedeliveryPolicy().getMaximumRedeliveries());
assertEquals(10000, cf.getRedeliveryPolicy().getInitialRedeliveryDelay());
assertEquals(10000, cf.getRedeliveryPolicy().getRedeliveryDelay());
assertEquals(true, cf.getRedeliveryPolicy().isUseExponentialBackOff());
assertEquals(2.0, cf.getRedeliveryPolicy().getBackOffMultiplier(), 0.1);
// the broker url have been adjusted.
assertEquals("vm://localhost", cf.getBrokerURL());
}
public void testCreateVMConnectionWithEmbdeddBroker() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = getXAConnectionFactory("vm://myBroker?broker.persistent=false");
// Make sure the broker is not created until the connection is
// instantiated.
assertNull(BrokerRegistry.getInstance().lookup("myBroker"));
connection = (ActiveMQConnection) cf.createConnection();
// This should create the connection.
assertNotNull(connection);
// Verify the broker was created.
assertNotNull(BrokerRegistry.getInstance().lookup("myBroker"));
connection.close();
// Verify the broker was destroyed.
assertNull(BrokerRegistry.getInstance().lookup("myBroker"));
connection.close();
}
public void testGetBrokerName() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = getXAConnectionFactory("vm://localhost?broker.persistent=false");
connection = (ActiveMQConnection)cf.createConnection();
connection.start();
String brokerName = connection.getBrokerName();
LOG.info("Got broker name: " + brokerName);
assertNotNull("No broker name available!", brokerName);
connection.close();
}
public void testCreateTcpConnectionUsingAllocatedPort() throws Exception {
assertCreateConnection("tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true");
}
public void testCreateTcpConnectionUsingKnownPort() throws Exception {
assertCreateConnection("tcp://localhost:61610?wireFormat.tcpNoDelayEnabled=true");
}
public void testIsSameRM() throws URISyntaxException, JMSException, XAException {
XAConnection connection1 = null;
XAConnection connection2 = null;
try {
ActiveMQConnectionFactory cf1 = getXAConnectionFactory("vm://localhost?broker.persistent=false");
connection1 = (XAConnection)cf1.createConnection();
XASession session1 = connection1.createXASession();
XAResource resource1 = session1.getXAResource();
ActiveMQConnectionFactory cf2 = getXAConnectionFactory("vm://localhost?broker.persistent=false");
connection2 = (XAConnection)cf2.createConnection();
XASession session2 = connection2.createXASession();
XAResource resource2 = session2.getXAResource();
assertTrue(resource1.isSameRM(resource2));
session1.close();
session2.close();
} finally {
if (connection1 != null) {
try {
connection1.close();
} catch (Exception e) {
// ignore
}
}
if (connection2 != null) {
try {
connection2.close();
} catch (Exception e) {
// ignore
}
}
}
}
public void testIsSameRMOverride() throws URISyntaxException, JMSException, XAException {
XAConnection connection1 = null;
XAConnection connection2 = null;
try {
ActiveMQConnectionFactory cf1 = getXAConnectionFactory("vm://localhost?broker.persistent=false&jms.rmIdFromConnectionId=true");
connection1 = (XAConnection)cf1.createConnection();
XASession session1 = connection1.createXASession();
XAResource resource1 = session1.getXAResource();
ActiveMQConnectionFactory cf2 = getXAConnectionFactory("vm://localhost?broker.persistent=false");
connection2 = (XAConnection)cf2.createConnection();
XASession session2 = connection2.createXASession();
XAResource resource2 = session2.getXAResource();
assertFalse(resource1.isSameRM(resource2));
// ensure identity is preserved
XASession session1a = connection1.createXASession();
assertTrue(resource1.isSameRM(session1a.getXAResource()));
session1.close();
session2.close();
} finally {
if (connection1 != null) {
try {
connection1.close();
} catch (Exception e) {
// ignore
}
}
if (connection2 != null) {
try {
connection2.close();
} catch (Exception e) {
// ignore
}
}
}
}
public void testVanilaTransactionalProduceReceive() throws Exception {
XAConnection connection1 = null;
try {
ActiveMQConnectionFactory cf1 = getXAConnectionFactory("vm://localhost?broker.persistent=false");
connection1 = (XAConnection)cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
} finally {
if (connection1 != null) {
try {
connection1.close();
} catch (Exception e) {
// ignore
}
}
}
}
public void testConsumerCloseTransactionalSendReceive() throws Exception {
ActiveMQConnectionFactory cf1 = getXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection)cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
producer.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
consumer.close();
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
assertNull(consumer.receive(1000));
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
}
public void testSessionCloseTransactionalSendReceive() throws Exception {
ActiveMQConnectionFactory cf1 = getXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection)cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
session.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
session.close();
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
assertNull(consumer.receive(1000));
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
}
public void testReadonlyNoLeak() throws Exception {
final String brokerName = "readOnlyNoLeak";
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
broker.setPersistent(false);
broker.start();
ActiveMQConnectionFactory cf1 = getXAConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")");
cf1.setStatsEnabled(true);
ActiveMQXAConnection xaConnection = (ActiveMQXAConnection)cf1.createConnection();
xaConnection.start();
XASession session = xaConnection.createXASession();
XAResource resource = session.getXAResource();
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
session.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
assertTransactionGoneFromBroker(tid);
assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
assertSessionGone(xaConnection, session);
assertTransactionGoneFromFailoverState(xaConnection, tid);
// two phase
session = xaConnection.createXASession();
resource = session.getXAResource();
tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
session.close();
resource.end(tid, XAResource.TMSUCCESS);
assertEquals(XAResource.XA_RDONLY, resource.prepare(tid));
// no need for a commit on read only
assertTransactionGoneFromBroker(tid);
assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
assertSessionGone(xaConnection, session);
assertTransactionGoneFromFailoverState(xaConnection, tid);
xaConnection.close();
broker.stop();
}
public void testCloseSendConnection() throws Exception {
String brokerName = "closeSend";
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
broker.start();
broker.waitUntilStarted();
ActiveMQConnectionFactory cf = getXAConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
XAConnection connection = (XAConnection)cf.createConnection();
connection.start();
XASession session = connection.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
connection.close();
assertTransactionGoneFromBroker(tid);
broker.stop();
}
public void testExceptionAfterClose() throws Exception {
ActiveMQConnectionFactory cf1 = getXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection)cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
session.close();
try {
session.commit();
fail("expect exception after close");
} catch (javax.jms.IllegalStateException expected) {}
try {
session.rollback();
fail("expect exception after close");
} catch (javax.jms.IllegalStateException expected) {}
try {
session.getTransacted();
fail("expect exception after close");
} catch (javax.jms.IllegalStateException expected) {}
}
public void testProducerFailAfterRollbackOnly() throws Exception {
ActiveMQConnectionFactory cf1 = getXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection)cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
// can happen out of band with XA via RAR
resource.end(tid, XAResource.TMFAIL);
((ActiveMQSession)session).getTransactionContext().setRollbackOnly(true);
try {
producer.send(message);
fail("expect error on setRollbackOnly");
} catch (JMSException expected) {}
// rollback only state does not linger
tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
producer.send(message);
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
connection1.close();
}
public void testRollbackXaErrorCode() throws Exception {
String brokerName = "rollbackErrorCode";
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
broker.start();
broker.waitUntilStarted();
ActiveMQConnectionFactory cf = getXAConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
XAConnection connection = (XAConnection)cf.createConnection();
connection.start();
XASession session = connection.createXASession();
XAResource resource = session.getXAResource();
Xid tid = createXid();
try {
resource.rollback(tid);
fail("Expected xa exception on no tx");
} catch (XAException expected) {
LOG.info("got expected xa", expected);
assertEquals("no tx", XAException.XAER_NOTA, expected.errorCode);
}
connection.close();
broker.stop();
}
private void assertTransactionGoneFromFailoverState(
ActiveMQXAConnection connection1, Xid tid) throws Exception {
FailoverTransport transport = (FailoverTransport) connection1.getTransport().narrow(FailoverTransport.class);
TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE);
assertNull("transaction should not exist in the state tracker",
transport.getStateTracker().processCommitTransactionOnePhase(info));
}
private void assertSessionGone(ActiveMQXAConnection connection1,
XASession session) {
JMSConnectionStatsImpl stats = (JMSConnectionStatsImpl)connection1.getStats();
// should be no dangling sessions maintained by the transaction
assertEquals("should be no sessions", 0, stats.getSessions().length);
}
private void assertTransactionGoneFromConnection(String brokerName, String clientId, ConnectionId connectionId, Xid tid) throws Exception {
BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName);
CopyOnWriteArrayList<TransportConnection> connections = broker.getTransportConnectors().get(0).getConnections();
for (TransportConnection connection: connections) {
if (connection.getConnectionId().equals(clientId)) {
try {
connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE));
fail("did not get expected excepton on missing transaction, it must be still there in error!");
} catch (IllegalStateException expectedOnNoTransaction) {
}
}
}
}
private void assertTransactionGoneFromBroker(Xid tid) throws Exception {
BrokerService broker = BrokerRegistry.getInstance().lookup("localhost");
TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class);
try {
transactionBroker.getTransaction(null, new XATransactionId(tid), false);
fail("expected exception on tx not found");
} catch (XAException expectedOnNotFound) {
}
}
protected void assertCreateConnection(String uri) throws Exception {
// Start up a broker with a tcp connector.
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
TransportConnector connector = broker.addConnector(uri);
broker.start();
URI temp = new URI(uri);
// URI connectURI = connector.getServer().getConnectURI();
// TODO this sometimes fails when using the actual local host name
URI currentURI = new URI(connector.getPublishableConnectString());
// sometimes the actual host name doesn't work in this test case
// e.g. on OS X so lets use the original details but just use the actual
// port
URI connectURI = new URI(temp.getScheme(), temp.getUserInfo(), temp.getHost(), currentURI.getPort(),
temp.getPath(), temp.getQuery(), temp.getFragment());
LOG.info("connection URI is: " + connectURI);
// This should create the connection.
ActiveMQConnectionFactory cf = getXAConnectionFactory(connectURI);
Connection connection = cf.createConnection();
assertXAConnection(connection);
assertNotNull(connection);
connection.close();
connection = ((XAConnectionFactory)cf).createXAConnection();
assertXAConnection(connection);
assertNotNull(connection);
}
private void assertXAConnection(Connection connection) {
assertTrue("Should be an XAConnection", connection instanceof XAConnection);
assertTrue("Should be an XATopicConnection", connection instanceof XATopicConnection);
assertTrue("Should be an XAQueueConnection", connection instanceof XAQueueConnection);
}
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
os.writeLong(++txGenerator);
os.close();
final byte[] bs = baos.toByteArray();
return new Xid() {
@Override
public int getFormatId() {
return 86;
}
@Override
public byte[] getGlobalTransactionId() {
return bs;
}
@Override
public byte[] getBranchQualifier() {
return bs;
}
};
}
}