https://issues.apache.org/jira/browse/AMQ-2191 https://issues.apache.org/jira/browse/AMQ-3529 - rework fixes to remove uncertanty from dealing with intettuptedexception. Sync requests will trap interrupts that ocurr while waiting for responses and fail the connection with an interruptedioexception. Interrupts pending before requests will be suppressed, allowing possible clean shutdown. It is not safe to replay openwire ops b/c they are not idempotent, the only safe option is to have a teardown of the broker side state from a close
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
index 92c9c51..9e13cf9 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
@@ -122,6 +122,7 @@
}
}
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
iioe.initCause(e);
throw iioe;
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index aecece1..3b2833d 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -629,12 +629,7 @@
*/
@Override
public void close() throws JMSException {
- // Store the interrupted state and clear so that cleanup happens without
- // leaking connection resources. Reset in finally to preserve state.
- boolean interrupted = Thread.interrupted();
-
try {
-
// If we were running, lets stop first.
if (!closed.get() && !transportFailed.get()) {
// do not fail if already closed as according to JMS spec we must not
@@ -722,9 +717,6 @@
ServiceSupport.dispose(this.transport);
factoryStats.removeConnection(this);
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
}
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index edc383f..a67022b 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -726,17 +726,11 @@
}
void doClose() throws JMSException {
- // Store interrupted state and clear so that Transport operations don't
- // throw InterruptedException and we ensure that resources are cleaned up.
- boolean interrupted = Thread.interrupted();
dispose();
RemoveInfo removeCommand = info.createRemoveCommand();
LOG.debug("remove: {}, lastDeliveredSequenceId: {}", getConsumerId(), lastDeliveredSequenceId);
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
this.session.asyncSendPacket(removeCommand);
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
}
void inProgressClearRequired() {
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 5c7cc4f..6603a2f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -660,14 +660,10 @@
}
private void doClose() throws JMSException {
- boolean interrupted = Thread.interrupted();
dispose();
RemoveInfo removeCommand = info.createRemoveCommand();
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
connection.asyncSendPacket(removeCommand);
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
}
final AtomicInteger clearRequestsCounter = new AtomicInteger(0);
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index 27cb49d..6bd7402 100755
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq;
-import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -29,13 +28,11 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.IntegerResponse;
import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
@@ -330,7 +327,7 @@
this.transactionId = null;
// Notify the listener that the tx was committed back
try {
- syncSendPacketWithInterruptionHandling(info);
+ this.connection.syncSendPacket(info);
if (localTransactionEventListener != null) {
localTransactionEventListener.commitEvent();
}
@@ -403,32 +400,36 @@
if (!equals(associatedXid, xid)) {
throw new XAException(XAException.XAER_PROTO);
}
-
- // TODO: we may want to put the xid in a suspended list.
- try {
- beforeEnd();
- } catch (JMSException e) {
- throw toXAException(e);
- } finally {
- setXid(null);
- }
+ invokeBeforeEnd();
} else if ((flags & TMSUCCESS) == TMSUCCESS) {
// set to null if this is the current xid.
// otherwise this could be an asynchronous success call
if (equals(associatedXid, xid)) {
- try {
- beforeEnd();
- } catch (JMSException e) {
- throw toXAException(e);
- } finally {
- setXid(null);
- }
+ invokeBeforeEnd();
}
} else {
throw new XAException(XAException.XAER_INVAL);
}
}
+ private void invokeBeforeEnd() throws XAException {
+ boolean throwingException = false;
+ try {
+ beforeEnd();
+ } catch (JMSException e) {
+ throwingException = true;
+ throw toXAException(e);
+ } finally {
+ try {
+ setXid(null);
+ } catch (XAException ignoreIfWillMask){
+ if (!throwingException) {
+ throw ignoreIfWillMask;
+ }
+ }
+ }
+ }
+
private boolean equals(Xid xid1, Xid xid2) {
if (xid1 == xid2) {
return true;
@@ -465,7 +466,7 @@
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
// Find out if the server wants to commit or rollback.
- IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
+ IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
if (XAResource.XA_RDONLY == response.getResult()) {
// transaction stops now, may be syncs that need a callback
List<TransactionContext> l;
@@ -534,7 +535,7 @@
// Let the server know that the tx is rollback.
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
- syncSendPacketWithInterruptionHandling(info);
+ this.connection.syncSendPacket(info);
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
@@ -581,7 +582,7 @@
// Notify the server that the tx was committed back
TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
- syncSendPacketWithInterruptionHandling(info);
+ this.connection.syncSendPacket(info);
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
@@ -643,7 +644,7 @@
try {
// Tell the server to forget the transaction.
- syncSendPacketWithInterruptionHandling(info);
+ this.connection.syncSendPacket(info);
} catch (JMSException e) {
throw toXAException(e);
}
@@ -741,7 +742,7 @@
if (transactionId != null) {
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END);
try {
- syncSendPacketWithInterruptionHandling(info);
+ this.connection.syncSendPacket(info);
LOG.debug("{} ended XA transaction {}", this, transactionId);
} catch (JMSException e) {
disassociate();
@@ -774,31 +775,6 @@
}
/**
- * Sends the given command. Also sends the command in case of interruption,
- * so that important commands like rollback and commit are never interrupted.
- * If interruption occurred, set the interruption state of the current
- * after performing the action again.
- *
- * @return the response
- */
- private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException {
- try {
- return this.connection.syncSendPacket(command);
- } catch (JMSException e) {
- if (e.getLinkedException() instanceof InterruptedIOException) {
- try {
- Thread.interrupted();
- return this.connection.syncSendPacket(command);
- } finally {
- Thread.currentThread().interrupt();
- }
- }
-
- throw e;
- }
- }
-
- /**
* Converts a JMSException from the server to an XAException. if the
* JMSException contained a linked XAException that is returned instead.
*
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java b/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java
index ba4bd67..ff95869 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java
@@ -29,25 +29,51 @@
private static final Logger LOG = LoggerFactory.getLogger(FutureResponse.class);
private final ResponseCallback responseCallback;
+ private final TransportFilter transportFilter;
+
private final ArrayBlockingQueue<Response> responseSlot = new ArrayBlockingQueue<Response>(1);
public FutureResponse(ResponseCallback responseCallback) {
+ this(responseCallback, null);
+ }
+
+ public FutureResponse(ResponseCallback responseCallback, TransportFilter transportFilter) {
this.responseCallback = responseCallback;
+ this.transportFilter = transportFilter;
}
public Response getResult() throws IOException {
+ boolean hasInterruptPending = Thread.interrupted();
try {
return responseSlot.take();
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Operation interupted: " + e, e);
+ hasInterruptPending = false;
+ throw dealWithInterrupt(e);
+ } finally {
+ if (hasInterruptPending) {
+ Thread.currentThread().interrupt();
}
- throw new InterruptedIOException("Interrupted.");
}
}
+ private InterruptedIOException dealWithInterrupt(InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Operation interrupted: " + e, e);
+ }
+ InterruptedIOException interruptedIOException = new InterruptedIOException(e.getMessage());
+ interruptedIOException.initCause(e);
+ try {
+ if (transportFilter != null) {
+ transportFilter.onException(interruptedIOException);
+ }
+ } finally {
+ Thread.currentThread().interrupt();
+ }
+ return interruptedIOException;
+ }
+
public Response getResult(int timeout) throws IOException {
+ final boolean wasInterrupted = Thread.interrupted();
try {
Response result = responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
if (result == null && timeout > 0) {
@@ -55,7 +81,11 @@
}
return result;
} catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted.");
+ throw dealWithInterrupt(e);
+ } finally {
+ if (wasInterrupted) {
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java b/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
index eca76a7..ad18ea6 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
@@ -64,7 +64,7 @@
Command command = (Command) o;
command.setCommandId(sequenceGenerator.getNextSequenceId());
command.setResponseRequired(true);
- FutureResponse future = new FutureResponse(responseCallback);
+ FutureResponse future = new FutureResponse(responseCallback, this);
IOException priorError = null;
synchronized (requestMap) {
priorError = this.error;
@@ -122,7 +122,7 @@
* any of current requests. Lets let them know of the problem.
*/
public void onException(IOException error) {
- dispose(error);
+ dispose(new TransportDisposedIOException("Disposed due to prior exception", error));
super.onException(error);
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
index 632fc05..1d57777 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
@@ -38,4 +38,8 @@
super(message);
}
+ public TransportDisposedIOException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java b/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
index c86c6ed..fe3b179 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
@@ -93,13 +93,25 @@
}
public void oneway(Object command) throws IOException {
+ boolean wasInterrupted = Thread.interrupted();
try {
- if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
+ if (readyCountDownLatch.getCount() > 0 && !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
}
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException();
+ InterruptedIOException interruptedIOException = new InterruptedIOException("Interrupted waiting for wire format negotiation");
+ interruptedIOException.initCause(e);
+ try {
+ onException(interruptedIOException);
+ } finally {
+ Thread.currentThread().interrupt();
+ wasInterrupted = false;
+ }
+ throw interruptedIOException;
+ } finally {
+ if (wasInterrupted) {
+ Thread.currentThread().interrupt();
+ }
}
super.oneway(command);
}
@@ -143,6 +155,7 @@
} catch (IOException e) {
onException(e);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
onException((IOException)new InterruptedIOException().initCause(e));
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 0f36d67..7f7d7c6 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -130,7 +130,7 @@
private String nestedExtraQueryOptions;
private boolean shuttingDown = false;
- public FailoverTransport() throws InterruptedIOException {
+ public FailoverTransport() {
brokerSslContext = SslContext.getCurrentSslContext();
stateTracker.setTrackTransactions(true);
// Setup a task that is used to reconnect the a connection async.
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
index 0d933e5..00ae7ae 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
@@ -157,7 +157,7 @@
}
}
- public FanoutTransport() throws InterruptedIOException {
+ public FanoutTransport() {
// Setup a task that is used to reconnect the a connection async.
reconnectTaskFactory = new TaskRunnerFactory();
reconnectTaskFactory.init();
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java b/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
index 1a3dc34..27b69fc 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
@@ -124,7 +124,11 @@
warned = true;
LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
// we were interrupted during shutdown, so force shutdown
- executorService.shutdownNow();
+ try {
+ executorService.shutdownNow();
+ } finally {
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
index 4715d02..c65dbb9 100755
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
@@ -184,6 +184,7 @@
Thread.sleep(1000);
} catch (InterruptedException e) {
onException(new InterruptedIOException());
+ Thread.currentThread().interrupt();
break;
}
} else {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionTxInterruptTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionTxInterruptTest.java
new file mode 100644
index 0000000..a297121
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionTxInterruptTest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+import javax.jms.XASession;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.MutableBrokerFilter;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.transaction.Synchronization;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.*;
+
+public class ActiveMQXAConnectionTxInterruptTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ActiveMQXAConnectionTxInterruptTest.class);
+ long txGenerator = System.currentTimeMillis();
+ private BrokerService broker;
+ XASession session;
+ XAResource resource;
+ ActiveMQXAConnection xaConnection;
+ Destination dest;
+
+ @Before
+ public void startBrokerEtc() throws Exception {
+ broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/BRXA"));
+ broker.setPersistent(false);
+ broker.start();
+ ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")");
+ cf1.setStatsEnabled(true);
+ xaConnection = (ActiveMQXAConnection)cf1.createConnection();
+ xaConnection.start();
+ session = xaConnection.createXASession();
+ resource = session.getXAResource();
+
+ dest = new ActiveMQQueue("Q");
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ xaConnection.close();
+ } catch (Throwable ignore) {
+ }
+ try {
+ broker.stop();
+ } catch (Throwable ignore) {
+ }
+ }
+
+
+ @Test
+ public void testRollbackAckInterrupted() throws Exception {
+
+ // publish a message
+ publishAMessage();
+ Xid tid;
+
+ // consume in tx and rollback with interrupt
+ session = xaConnection.createXASession();
+ final MessageConsumer consumer = session.createConsumer(dest);
+ tid = createXid();
+ resource = session.getXAResource();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ ((TransactionContext)resource).addSynchronization(new Synchronization() {
+ @Override
+ public void beforeEnd() throws Exception {
+ LOG.info("Interrupting thread: " + Thread.currentThread(), new Throwable("Source"));
+ Thread.currentThread().interrupt();
+ }
+ });
+ TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+ assertNotNull(receivedMessage);
+ assertEquals(getName(), receivedMessage.getText());
+ resource.end(tid, XAResource.TMFAIL);
+ resource.rollback(tid);
+ session.close();
+ assertTrue("Was interrupted", Thread.currentThread().isInterrupted());
+ }
+
+ @Test
+ public void testCommitAckInterrupted() throws Exception {
+
+ // publish a message
+ publishAMessage();
+
+ // consume in tx and rollback with interrupt
+ session = xaConnection.createXASession();
+ MessageConsumer consumer = session.createConsumer(dest);
+ Xid tid = createXid();
+ resource = session.getXAResource();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ ((TransactionContext)resource).addSynchronization(new Synchronization() {
+ @Override
+ public void beforeEnd() throws Exception {
+ LOG.info("Interrupting thread: " + Thread.currentThread(), new Throwable("Source"));
+ Thread.currentThread().interrupt();
+ }
+ });
+ TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+ assertNotNull(receivedMessage);
+ assertEquals(getName(), receivedMessage.getText());
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+ session.close();
+
+ }
+
+ @Test
+ public void testInterruptWhilePendingResponseToAck() throws Exception {
+
+ final LinkedList<Throwable> errors = new LinkedList<Throwable>();
+ final CountDownLatch blockedServerSize = new CountDownLatch(1);
+ final CountDownLatch canContinue = new CountDownLatch(1);
+ MutableBrokerFilter filter = (MutableBrokerFilter)broker.getBroker().getAdaptor(MutableBrokerFilter.class);
+ filter.setNext(new MutableBrokerFilter(filter.getNext()) {
+ @Override
+ public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
+ blockedServerSize.countDown();
+ canContinue.await();
+ super.acknowledge(consumerExchange, ack);
+ }
+ });
+
+ publishAMessage();
+
+ // consume in tx and rollback with interrupt while pending reply
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ session = xaConnection.createXASession();
+ MessageConsumer consumer = session.createConsumer(dest);
+ Xid tid = createXid();
+ resource = session.getXAResource();
+ resource.start(tid, XAResource.TMNOFLAGS);
+
+ TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+ assertNotNull(receivedMessage);
+ assertEquals(getName(), receivedMessage.getText());
+
+ try {
+ resource.end(tid, XAResource.TMSUCCESS);
+ fail("Expect end to fail");
+ } catch (Throwable expectedWithInterrupt) {
+ assertTrue(expectedWithInterrupt instanceof XAException);
+ assertCause(expectedWithInterrupt, new Class[]{InterruptedException.class});
+ }
+
+ try {
+ resource.rollback(tid);
+ fail("Expect rollback to fail due to connection being closed");
+ } catch (Throwable expectedWithInterrupt) {
+ assertTrue(expectedWithInterrupt instanceof XAException);
+ assertCause(expectedWithInterrupt, new Class[]{ConnectionClosedException.class, InterruptedException.class});
+ }
+ session.close();
+
+ assertTrue("Was interrupted", Thread.currentThread().isInterrupted());
+
+ } catch (Throwable error) {
+ error.printStackTrace();
+ errors.add(error);
+ }
+ }
+ });
+
+ assertTrue("got to blocking call", blockedServerSize.await(20, TimeUnit.SECONDS));
+
+ // will interrupt
+ executorService.shutdownNow();
+ canContinue.countDown();
+
+ assertTrue("job done", executorService.awaitTermination(20, TimeUnit.SECONDS));
+
+ assertTrue("no errors: " + errors, errors.isEmpty());
+ }
+
+ private void assertCause(Throwable expectedWithInterrupt, Class[] exceptionClazzes) {
+ Throwable candidate = expectedWithInterrupt;
+
+ while (candidate != null) {
+ for (Class<?> exceptionClazz: exceptionClazzes) {
+ if (exceptionClazz.isInstance(candidate)) {
+ return;
+ }
+ }
+ candidate = candidate.getCause();
+ }
+ LOG.error("ex", expectedWithInterrupt);
+ fail("no expected type as cause:" + expectedWithInterrupt);
+ }
+
+ public Xid createXid() throws IOException {
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream os = new DataOutputStream(baos);
+ os.writeLong(++txGenerator);
+ os.close();
+ final byte[] bs = baos.toByteArray();
+
+ return new Xid() {
+ public int getFormatId() {
+ return 87;
+ }
+
+ public byte[] getGlobalTransactionId() {
+ return bs;
+ }
+
+ public byte[] getBranchQualifier() {
+ return bs;
+ }
+ };
+
+ }
+
+ private void publishAMessage() throws IOException, XAException, JMSException {
+ Xid tid = createXid();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ MessageProducer producer = session.createProducer(dest);
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText(getName());
+ producer.send(message);
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+ session.close();
+ }
+
+
+ private String getName() {
+ return this.getClass().getName();
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529v2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529v2Test.java
new file mode 100644
index 0000000..030f2b4
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529v2Test.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.activemq.bugs;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AMQ3529v2Test {
+
+ private static Logger LOG = LoggerFactory.getLogger(AMQ3529v2Test.class);
+
+ private BrokerService broker;
+ private String connectionUri;
+
+ @Before
+ public void startBroker() throws Exception {
+ broker = new BrokerService();
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.addConnector("tcp://0.0.0.0:0");
+ broker.start();
+ broker.waitUntilStarted();
+
+ connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ @Test(timeout = 60000)
+ public void testRandomInterruptionAffects() throws Exception {
+ doTestRandomInterruptionAffects();
+ }
+
+ @Test(timeout = 60000)
+ public void testRandomInterruptionAffectsWithFailover() throws Exception {
+ connectionUri = "failover:(" + connectionUri + ")";
+ doTestRandomInterruptionAffects();
+ }
+
+ public void doTestRandomInterruptionAffects() throws Exception {
+ final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+
+ ThreadGroup tg = new ThreadGroup("tg");
+
+ assertEquals(0, tg.activeCount());
+
+ class ClientThread extends Thread {
+
+ public Exception error;
+
+ public ClientThread(ThreadGroup tg, String name) {
+ super(tg, name);
+ }
+
+ @Override
+ public void run() {
+ Context ctx = null;
+ Connection connection = null;
+ Session session = null;
+ MessageConsumer consumer = null;
+
+ try {
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+
+ Properties props = new Properties();
+ props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ props.setProperty(Context.PROVIDER_URL, connectionUri);
+ ctx = null;
+ try {
+ ctx = new InitialContext(props);
+ } catch (NoClassDefFoundError e) {
+ throw new NamingException(e.toString());
+ } catch (Exception e) {
+ throw new NamingException(e.toString());
+ }
+ Destination destination = (Destination) ctx.lookup("dynamicTopics/example.C");
+ consumer = session.createConsumer(destination);
+ consumer.receive(10000);
+ } catch (Exception e) {
+ // Expect an exception here from the interrupt.
+ } finally {
+ try {
+ if (consumer != null) {
+ consumer.close();
+ }
+ } catch (JMSException e) {
+ trackException("Consumer Close failed with", e);
+ }
+ try {
+ if (session != null) {
+ session.close();
+ }
+ } catch (JMSException e) {
+ trackException("Session Close failed with", e);
+ }
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (JMSException e) {
+ trackException("Connection Close failed with", e);
+ }
+ try {
+ if (ctx != null) {
+ ctx.close();
+ }
+ } catch (Exception e) {
+ trackException("Connection Close failed with", e);
+ }
+ }
+ }
+
+ private void trackException(String s, Exception e) {
+ LOG.error(s, e);
+ this.error = e;
+ }
+ }
+
+ final Random random = new Random();
+ List<ClientThread> threads = new LinkedList<ClientThread>();
+ for (int i=0;i<10;i++) {
+ threads.add(new ClientThread(tg, "Client-"+ i));
+ }
+ for (Thread thread : threads) {
+ thread.start();
+ }
+ // interrupt the threads at some random time
+ ExecutorService doTheInterrupts = Executors.newFixedThreadPool(threads.size());
+ for (final Thread thread : threads) {
+ doTheInterrupts.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(random.nextInt(5000));
+ } catch (InterruptedException ignored) {
+ ignored.printStackTrace();
+ }
+ thread.interrupt();
+ }
+ });
+ }
+ doTheInterrupts.shutdown();
+ assertTrue("all interrupts done", doTheInterrupts.awaitTermination(30, TimeUnit.SECONDS));
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ for (ClientThread thread : threads) {
+ if (thread.error != null) {
+ LOG.info("Close error on thread: " + thread, thread.error);
+ }
+ }
+
+ Thread[] remainThreads = new Thread[tg.activeCount()];
+ tg.enumerate(remainThreads);
+ for (final Thread t : remainThreads) {
+ if (t != null && t.isAlive() && !t.isDaemon())
+ assertTrue("Thread completes:" + t, Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("Remaining thread: " + t.toString());
+ return !t.isAlive();
+ }
+ }));
+ }
+
+ ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
+ while (root.getParent() != null) {
+ root = root.getParent();
+ }
+ visit(root, 0);
+ }
+
+ // This method recursively visits all thread groups under `group'.
+ public static void visit(ThreadGroup group, int level) {
+ // Get threads in `group'
+ int numThreads = group.activeCount();
+ Thread[] threads = new Thread[numThreads * 2];
+ numThreads = group.enumerate(threads, false);
+
+ // Enumerate each thread in `group'
+ for (int i = 0; i < numThreads; i++) {
+ // Get thread
+ Thread thread = threads[i];
+ LOG.debug("Thread:" + thread.getName() + " is still running");
+ }
+
+ // Get thread subgroups of `group'
+ int numGroups = group.activeGroupCount();
+ ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
+ numGroups = group.enumerate(groups, false);
+
+ // Recursively visit each subgroup
+ for (int i = 0; i < numGroups; i++) {
+ visit(groups[i], level + 1);
+ }
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
index c5c4706..eccbf1b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
@@ -318,7 +318,7 @@
// simulate broker stop
remote.stop();
- assertTrue(Wait.waitFor(new Wait.Condition() {
+ assertTrue("got expected exception response", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("answer: " + answer[0]);