AMQ-6494 is related, fix intermittent failure of RedeliveryPolicyTest related to vm transport server being shutdown while in use via async onException handler
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
index 2f3d519..8bef1cc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
@@ -19,6 +19,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.command.BrokerInfo;
@@ -35,7 +36,7 @@
 
     private TransportAcceptListener acceptListener;
     private final URI location;
-    private boolean disposed;
+    private AtomicBoolean disposed = new AtomicBoolean(false);
 
     private final AtomicInteger connectionCount = new AtomicInteger(0);
     private final boolean disposeOnDisconnect;
@@ -64,7 +65,7 @@
     public VMTransport connect() throws IOException {
         TransportAcceptListener al;
         synchronized (this) {
-            if (disposed) {
+            if (disposed.get()) {
                 throw new IOException("Server has been disposed.");
             }
             al = acceptListener;
@@ -117,7 +118,9 @@
     }
 
     public void stop() throws IOException {
-        VMTransportFactory.stopped(this);
+        if (disposed.compareAndSet(false, true)) {
+            VMTransportFactory.stopped(this);
+        }
     }
 
     public URI getConnectURI() {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index 5af3a37..a0a1ca8 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -600,6 +600,45 @@
     }
 
 
+    public void testRepeatedServerClose() throws Exception {
+
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        session.commit();
+
+        final int maxRedeliveries = 10000;
+        for (int i=0;i<=maxRedeliveries + 1;i++) {
+
+            final ActiveMQConnection toTest = (ActiveMQConnection)factory.createConnection(userName, password);
+            toTest.start();
+
+            // abortive close via broker
+            for (VMTransportServer transportServer : VMTransportFactory.SERVERS.values()) {
+                transportServer.stop();
+            }
+
+            Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return toTest.isTransportFailed();
+                }
+            },10000, 100 );
+
+            try {
+                toTest.close();
+            } catch (Exception expected) {
+            } finally {
+            }
+        }
+    }
+
+
+
     public void testRepeatedRedeliveryOnMessageNoCommit() throws Exception {
 
         connection.start();