AMQ-6494 is related, fix intermittent failure of RedeliveryPolicyTest related to vm transport server being shutdown while in use via async onException handler
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
index 2f3d519..8bef1cc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.BrokerInfo;
@@ -35,7 +36,7 @@
private TransportAcceptListener acceptListener;
private final URI location;
- private boolean disposed;
+ private AtomicBoolean disposed = new AtomicBoolean(false);
private final AtomicInteger connectionCount = new AtomicInteger(0);
private final boolean disposeOnDisconnect;
@@ -64,7 +65,7 @@
public VMTransport connect() throws IOException {
TransportAcceptListener al;
synchronized (this) {
- if (disposed) {
+ if (disposed.get()) {
throw new IOException("Server has been disposed.");
}
al = acceptListener;
@@ -117,7 +118,9 @@
}
public void stop() throws IOException {
- VMTransportFactory.stopped(this);
+ if (disposed.compareAndSet(false, true)) {
+ VMTransportFactory.stopped(this);
+ }
}
public URI getConnectURI() {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index 5af3a37..a0a1ca8 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -600,6 +600,45 @@
}
+ public void testRepeatedServerClose() throws Exception {
+
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ session.commit();
+
+ final int maxRedeliveries = 10000;
+ for (int i=0;i<=maxRedeliveries + 1;i++) {
+
+ final ActiveMQConnection toTest = (ActiveMQConnection)factory.createConnection(userName, password);
+ toTest.start();
+
+ // abortive close via broker
+ for (VMTransportServer transportServer : VMTransportFactory.SERVERS.values()) {
+ transportServer.stop();
+ }
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return toTest.isTransportFailed();
+ }
+ },10000, 100 );
+
+ try {
+ toTest.close();
+ } catch (Exception expected) {
+ } finally {
+ }
+ }
+ }
+
+
+
public void testRepeatedRedeliveryOnMessageNoCommit() throws Exception {
connection.start();