blob: 974d457a2631b5ae2b53d1170370e3452006caf4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import javax.jms.XASession;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
public class ActiveMQXAConnectionTxInterruptTest {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQXAConnectionTxInterruptTest.class);
long txGenerator = System.currentTimeMillis();
private BrokerService broker;
XASession session;
XAResource resource;
ActiveMQXAConnection xaConnection;
Destination dest;
@Before
public void startBrokerEtc() throws Exception {
broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/BRXA"));
broker.setPersistent(false);
broker.start();
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")");
cf1.setStatsEnabled(true);
xaConnection = (ActiveMQXAConnection)cf1.createConnection();
xaConnection.start();
session = xaConnection.createXASession();
resource = session.getXAResource();
dest = new ActiveMQQueue("Q");
}
@After
public void tearDown() throws Exception {
try {
xaConnection.close();
} catch (Throwable ignore) {
}
try {
broker.stop();
} catch (Throwable ignore) {
}
}
@Test
public void testRollbackAckInterrupted() throws Exception {
// publish a message
publishAMessage();
Xid tid;
// consume in tx and rollback with interrupt
session = xaConnection.createXASession();
final MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
((TransactionContext)resource).addSynchronization(new Synchronization() {
@Override
public void beforeEnd() throws Exception {
LOG.info("Interrupting thread: " + Thread.currentThread(), new Throwable("Source"));
Thread.currentThread().interrupt();
}
});
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMFAIL);
resource.rollback(tid);
session.close();
assertTrue("Was interrupted", Thread.currentThread().isInterrupted());
}
@Test
public void testCommitAckInterrupted() throws Exception {
// publish a message
publishAMessage();
// consume in tx and rollback with interrupt
session = xaConnection.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
Xid tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
((TransactionContext)resource).addSynchronization(new Synchronization() {
@Override
public void beforeEnd() throws Exception {
LOG.info("Interrupting thread: " + Thread.currentThread(), new Throwable("Source"));
Thread.currentThread().interrupt();
}
});
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
}
@Test
public void testInterruptWhilePendingResponseToAck() throws Exception {
final LinkedList<Throwable> errors = new LinkedList<Throwable>();
final CountDownLatch blockedServerSize = new CountDownLatch(1);
final CountDownLatch canContinue = new CountDownLatch(1);
MutableBrokerFilter filter = (MutableBrokerFilter)broker.getBroker().getAdaptor(MutableBrokerFilter.class);
filter.setNext(new MutableBrokerFilter(filter.getNext()) {
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
blockedServerSize.countDown();
canContinue.await();
super.acknowledge(consumerExchange, ack);
}
});
publishAMessage();
// consume in tx and rollback with interrupt while pending reply
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
session = xaConnection.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
Xid tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
try {
resource.end(tid, XAResource.TMSUCCESS);
} catch (Throwable expectedWithInterrupt) {
assertTrue(expectedWithInterrupt instanceof XAException);
assertCause(expectedWithInterrupt, new Class[]{InterruptedException.class});
}
assertTrue("Was interrupted during ack!", Thread.currentThread().isInterrupted());
try {
resource.rollback(tid);
} catch (Throwable expectedWithInterruptIfClosed) {
assertTrue(expectedWithInterruptIfClosed.toString(), expectedWithInterruptIfClosed instanceof XAException);
assertCause(expectedWithInterruptIfClosed, new Class[]{ConnectionClosedException.class, InterruptedException.class});
}
session.close();
assertTrue("Was interrupted", Thread.currentThread().isInterrupted());
} catch (Throwable error) {
error.printStackTrace();
errors.add(error);
}
}
});
assertTrue("got to blocking call", blockedServerSize.await(20, TimeUnit.SECONDS));
// will interrupt
executorService.shutdownNow();
canContinue.countDown();
assertTrue("job done", executorService.awaitTermination(20, TimeUnit.SECONDS));
assertTrue("no errors: " + errors, errors.isEmpty());
}
private void assertCause(Throwable expectedWithInterrupt, Class[] exceptionClazzes) {
Throwable candidate = expectedWithInterrupt;
while (candidate != null) {
for (Class<?> exceptionClazz: exceptionClazzes) {
if (exceptionClazz.isInstance(candidate)) {
return;
}
}
candidate = candidate.getCause();
}
LOG.error("ex", expectedWithInterrupt);
fail("no expected type as cause:" + expectedWithInterrupt);
}
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
os.writeLong(++txGenerator);
os.close();
final byte[] bs = baos.toByteArray();
return new Xid() {
public int getFormatId() {
return 87;
}
public byte[] getGlobalTransactionId() {
return bs;
}
public byte[] getBranchQualifier() {
return bs;
}
};
}
private void publishAMessage() throws IOException, XAException, JMSException {
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
}
private String getName() {
return this.getClass().getName();
}
}