Merge pull request #357 from bd2019us/AMQ-7199

[AMQ-7199] Replace Math.Random with Random.nextDouble
diff --git a/NOTICE b/NOTICE
index b2e0c43..927e2d2 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,4 +1,4 @@
-Apache ActiveMQ Copyright 2005-2017 Apache Software Foundation
+Apache ActiveMQ Copyright 2005-2019 Apache Software Foundation
 
 This product includes software developed by
 The Apache Software Foundation (http://www.apache.org/).
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index 346a54c..ffe9ccc 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -319,7 +319,7 @@
                         footerMap = new HashMap<>();
                     }
                     String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
-                    footerMap.put(name, value);
+                    footerMap.put(Symbol.valueOf(name), value);
                     continue;
                 }
             } else if (key.startsWith(AMQ_SCHEDULED_MESSAGE_PREFIX )) {
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
index 201cee2..deb0c0d 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
@@ -22,6 +22,8 @@
 import static org.junit.Assert.assertTrue;
 
 import java.net.URI;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -36,6 +38,11 @@
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.junit.After;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -188,6 +195,54 @@
         openwireConn.close();
     }
 
+    @Test(timeout = 60000)
+    public void testSendAMQPMessageWithComplexAnnotationsReceiveCore() throws Exception {
+        startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=jms"));
+
+        URI remoteURI = new URI("tcp://" + amqpConnectionURI.getHost() + ":" + amqpConnectionURI.getPort());
+        AmqpClient client = new AmqpClient(remoteURI, null, null);
+        AmqpConnection connection = client.connect();
+        try {
+            connection.connect();
+
+            String annotation = "x-opt-embedded-map";
+            Map<String, String> embeddedMap = new LinkedHashMap<>();
+            embeddedMap.put("test-key-1", "value-1");
+            embeddedMap.put("test-key-2", "value-2");
+            embeddedMap.put("test-key-3", "value-3");
+
+            AmqpSession session = connection.createSession();
+            AmqpSender sender = session.createSender(TEST_QUEUE);
+            AmqpMessage message = createAmqpMessage((byte) 'A', 65535);
+
+            message.setApplicationProperty("IntProperty", 42);
+            message.setDurable(true);
+            message.setMessageAnnotation(annotation, embeddedMap);
+            sender.send(message);
+
+            session.close();
+
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireConnectionURI);
+            Connection connection2 = factory.createConnection();
+            try {
+
+                Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                connection2.start();
+                MessageConsumer consumer = session2.createConsumer(session2.createQueue(TEST_QUEUE));
+
+                Message received = consumer.receive(5000);
+                assertNotNull(received);
+                assertEquals(42, received.getIntProperty("IntProperty"));
+
+                connection2.close();
+            } finally {
+                connection2.close();
+            }
+        } finally {
+            connection.close();
+        }
+    }
+
     public void startBrokerWithAmqpTransport(String amqpUrl) throws Exception {
         brokerService = new BrokerService();
         brokerService.setPersistent(false);
@@ -211,4 +266,14 @@
             brokerService = null;
         }
     }
+
+    private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
+        AmqpMessage message = new AmqpMessage();
+        byte[] payload = new byte[payloadSize];
+        for (int i = 0; i < payload.length; i++) {
+           payload[i] = value;
+        }
+        message.setBytes(payload);
+        return message;
+     }
 }
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index a86edbb..67e0dc9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -23,8 +23,10 @@
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -47,10 +49,13 @@
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.activemq.util.Wait;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
@@ -65,6 +70,8 @@
 
     protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
 
+    private static final int PAYLOAD = 110 * 1024;
+
     @Test(timeout = 60000)
     public void testSimpleSendOneReceiveOneToQueue() throws Exception {
         doTestSimpleSendOneReceiveOne(Queue.class);
@@ -858,4 +865,66 @@
         receiver.close();
         connection.close();
     }
+
+   @Test(timeout = 60000)
+   public void testSendAMQPMessageWithComplexAnnotationsReceiveAMQP() throws Exception {
+      String testQueueName = "ConnectionFrameSize";
+      int nMsgs = 200;
+
+      AmqpClient client = createAmqpClient();
+
+      Symbol annotation = Symbol.valueOf("x-opt-embedded-map");
+      Map<String, String> embeddedMap = new LinkedHashMap<>();
+      embeddedMap.put("test-key-1", "value-1");
+      embeddedMap.put("test-key-2", "value-2");
+      embeddedMap.put("test-key-3", "value-3");
+
+      {
+         AmqpConnection connection = client.connect();
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(testQueueName);
+         AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
+
+         message.setApplicationProperty("IntProperty", 42);
+         message.setDurable(true);
+         message.setMessageAnnotation(annotation.toString(), embeddedMap);
+         sender.send(message);
+         session.close();
+         connection.close();
+      }
+
+      {
+         AmqpConnection connection = client.connect();
+         AmqpSession session = connection.createSession();
+         AmqpReceiver receiver = session.createReceiver(testQueueName);
+         receiver.flow(nMsgs);
+
+         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull("Failed to read message with embedded map in annotations", message);
+         MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
+         if (wrapped.getBody() instanceof Data) {
+            Data data = (Data) wrapped.getBody();
+            System.out.println("received : message: " + data.getValue().getLength());
+            assertEquals(PAYLOAD, data.getValue().getLength());
+         }
+
+         assertNotNull(message.getWrappedMessage().getMessageAnnotations());
+         assertNotNull(message.getWrappedMessage().getMessageAnnotations().getValue());
+         assertEquals(embeddedMap, message.getWrappedMessage().getMessageAnnotations().getValue().get(annotation));
+
+         message.accept();
+         session.close();
+         connection.close();
+      }
+   }
+
+   private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
+       AmqpMessage message = new AmqpMessage();
+       byte[] payload = new byte[payloadSize];
+       for (int i = 0; i < payload.length; i++) {
+          payload[i] = value;
+       }
+       message.setBytes(payload);
+       return message;
+    }
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
index 3eaf28b..e856c5d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
@@ -96,7 +96,7 @@
     private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
     private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
     private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
-    private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
+    private final Set<ObjectName> registeredMBeans = new ConcurrentHashMap<>().newKeySet();
     /* This is the first broker in the broker interceptor chain. */
     private Broker contextBroker;
 
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index a265570..a3b9a4d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -532,9 +532,7 @@
 
     @Override
     public long getPendingMessageSize() {
-        synchronized (pendingLock) {
-            return pending.messageSize();
-        }
+        return pending.messageSize();
     }
 
     @Override
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index e8ef717..fa75752 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -694,7 +694,7 @@
                                     // While waiting for space to free up... the
                                     // transaction may be done
                                     if (message.isInTransaction()) {
-                                        if (context.getTransaction().getState() > IN_USE_STATE) {
+                                        if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
                                             throw new JMSException("Send transaction completed while waiting for space");
                                         }
                                     }
@@ -1471,10 +1471,7 @@
         int count = 0;
         Set<MessageReference> set = new LinkedHashSet<MessageReference>();
         do {
-            int oldMaxSize = getMaxPageSize();
-            setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
-            doPageIn(true);
-            setMaxPageSize(oldMaxSize);
+            doPageIn(true, false, (messages.isCacheEnabled() || !broker.getBrokerService().isPersistent()) ? messages.size() : getMaxBrowsePageSize());
             pagedInMessagesLock.readLock().lock();
             try {
                 set.addAll(pagedInMessages.values());
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index a0f2d06..66e586c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -63,6 +63,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.JMSException;
+
+import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;
+
 /**
  * The Topic is a destination that sends a copy of a message to every active
  * Subscription registered.
@@ -409,8 +413,15 @@
                             public void run() {
                                 try {
 
-                                    // While waiting for space to free up... the
-                                    // message may have expired.
+                                    // While waiting for space to free up...
+                                    // the transaction may be done
+                                    if (message.isInTransaction()) {
+                                        if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
+                                            throw new JMSException("Send transaction completed while waiting for space");
+                                        }
+                                    }
+
+                                    // the message may have expired.
                                     if (message.isExpired()) {
                                         broker.messageExpired(context, message, null);
                                         getDestinationStatistics().getExpired().increment();
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index bf3f97b..0bf1c4e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -16,14 +16,6 @@
  */
 package org.apache.activemq.broker.region;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -32,15 +24,7 @@
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.Response;
+import org.apache.activemq.command.*;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.transport.TransmitCallback;
@@ -48,6 +32,14 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class TopicSubscription extends AbstractSubscription {
 
     private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
@@ -61,7 +53,7 @@
 
     private int maximumPendingMessages = -1;
     private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
-    private int discarded;
+    private final AtomicInteger discarded = new AtomicInteger();
     private final Object matchedListMutex = new Object();
     private int memoryUsageHighWaterMark = 95;
     // allow duplicate suppression in a ring network of brokers
@@ -448,9 +440,7 @@
 
     @Override
     public long getPendingMessageSize() {
-        synchronized (matchedListMutex) {
-            return matched.messageSize();
-        }
+        return matched.messageSize();
     }
 
     @Override
@@ -482,9 +472,7 @@
      * @return the number of messages discarded due to being a slow consumer
      */
     public int discarded() {
-        synchronized (matchedListMutex) {
-            return discarded;
-        }
+        return discarded.get();
     }
 
     /**
@@ -493,9 +481,7 @@
      *         prefetch buffer being full).
      */
     public int matched() {
-        synchronized (matchedListMutex) {
-            return matched.size();
-        }
+        return matched.size();
     }
 
     /**
@@ -727,7 +713,7 @@
         try {
             message.decrementReferenceCount();
             matched.remove(message);
-            discarded++;
+            discarded.incrementAndGet();
             if (destination != null) {
                 destination.getDestinationStatistics().getDequeues().increment();
             }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index ba1a0c2..6a8b66a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1074,8 +1074,11 @@
             sending.setConnectionId(this.localConnectionInfo.getConnectionId());
             localBroker.oneway(sending);
 
-            //remove subscriber from map
+            //remove subscriber from local map
             i.remove();
+
+            //need to remove the mapping from the remote map as well
+            subscriptionMapByRemoteId.remove(ds.getRemoteInfo().getConsumerId());
         }
     }
 
diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java b/activemq-broker/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java
index 2b89d12..1b90e00 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java
@@ -102,7 +102,6 @@
         initialContextFactory = "com.sun.jndi.ldap.LdapCtxFactory";
         connectionURL = "ldap://localhost:10389";
         connectionUsername = "uid=admin,ou=system";
-        connectionPassword = "secret";
         connectionProtocol = "s";
         authentication = "simple";
 
@@ -491,4 +490,4 @@
         return context;
     }
 
-}
\ No newline at end of file
+}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java b/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java
index 44c23f6..77cbb20 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java
@@ -64,7 +64,7 @@
     private final String initialContextFactory = "com.sun.jndi.ldap.LdapCtxFactory";
     private String connectionURL = "ldap://localhost:1024";
     private String connectionUsername = "uid=admin,ou=system";
-    private String connectionPassword = "secret";
+    private String connectionPassword;
     private String connectionProtocol = "s";
     private String authentication = "simple";
 
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java
index 4179b7b..af040a2 100644
--- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java
@@ -182,7 +182,7 @@
         }
     }
 
-    public static Class<?> loadClass(String name, ClassLoader loader) throws ClassNotFoundException {
+    private static Class<?> loadClass(String name, ClassLoader loader) throws ClassNotFoundException {
         ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
         if (contextClassLoader != null) {
             try {
diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml
index 0a6f8e1..f5e2bae 100644
--- a/activemq-client/pom.xml
+++ b/activemq-client/pom.xml
@@ -32,6 +32,15 @@
 
   <properties>
     <surefire.argLine>-Xmx512M</surefire.argLine>
+    <activemq.osgi.import.pkg>
+      !com.google.errorprone.annotations,
+      !com.google.errorprone.annotations.concurrent,
+      *
+    </activemq.osgi.import.pkg>
+    <activemq.osgi.private.pkg>
+      com.google.errorprone.annotations,
+      com.google.errorprone.annotations.concurrent
+    </activemq.osgi.private.pkg>
   </properties>
 
   <dependencies>
diff --git a/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java b/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
index 8c38c2d..40d2f1a 100644
--- a/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
@@ -94,7 +94,7 @@
      * @return true if space
      */
     @Override
-    public boolean waitForSpace(long timeout) throws InterruptedException {
+    public boolean waitForSpace(final long timeout) throws InterruptedException {
         if (parent != null) {
             if (!parent.waitForSpace(timeout)) {
                 return false;
@@ -106,12 +106,15 @@
                 usageLock.readLock().unlock();
                 usageLock.writeLock().lock();
                 try {
-                    while (percentUsage >= 100 ) {
-                        waitForSpaceCondition.await(timeout, TimeUnit.MILLISECONDS);
+                    final long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
+                    long timeleft = deadline;
+                    while (percentUsage >= 100 && timeleft > 0) {
+                        waitForSpaceCondition.await(Math.min(getPollingTime(), timeleft), TimeUnit.MILLISECONDS);
+                        timeleft = deadline - System.currentTimeMillis();
                     }
-                    usageLock.readLock().lock();
                 } finally {
                     usageLock.writeLock().unlock();
+                    usageLock.readLock().lock();
                 }
             }
 
diff --git a/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java b/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
index b869939..24e47c2 100644
--- a/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
+++ b/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.activemq.usage;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -29,6 +26,11 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 public class MemoryUsageTest {
 
     MemoryUsage underTest;
@@ -83,6 +85,15 @@
         assertEquals("limits are still matched whole", underTest.getLimit(), child.getLimit());
     }
 
+    @Test(timeout=2000)
+    public void testLimitedWaitFail() throws Exception {
+        underTest.setLimit(10);
+        underTest.start();
+        underTest.increaseUsage(11);
+
+        assertFalse("did not get usage within limit", underTest.waitForSpace(500));
+    }
+
     @Before
     public void setUp() throws Exception {
         underTest = new MemoryUsage();
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/CreateCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/CreateCommand.java
index f9f0fb0..3e246ce 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/CreateCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/CreateCommand.java
@@ -209,6 +209,8 @@
     // utlity method to write an xml source to file
     private void writeToFile(Source src, File file) throws TransformerException {
         TransformerFactory tFactory = TransformerFactory.newInstance();
+        tFactory.setFeature(javax.xml.XMLConstants.FEATURE_SECURE_PROCESSING, Boolean.TRUE);
+
         Transformer fileTransformer = tFactory.newTransformer();
 
         Result res = new StreamResult(file);
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java
index 897a279..15f4de6 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java
@@ -38,7 +38,7 @@
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +53,7 @@
     private static final Logger LOG = LoggerFactory.getLogger(HTTPDiscoveryAgent.class);
 
     private String registryURL = "http://localhost:8080/discovery-registry/default";
-    private HttpClient httpClient = new DefaultHttpClient();
+    private HttpClient httpClient = HttpClientBuilder.create().build();
     private AtomicBoolean running = new AtomicBoolean();
     private final AtomicReference<DiscoveryListener> discoveryListener = new AtomicReference<DiscoveryListener>();
     private final HashSet<String> registeredServices = new HashSet<String>();
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
index 2480daa..6582bb8 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
@@ -40,26 +40,23 @@
 import org.apache.http.HttpStatus;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.HttpResponseException;
 import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.CookieSpecs;
+import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpHead;
 import org.apache.http.client.methods.HttpOptions;
 import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.params.CookiePolicy;
-import org.apache.http.client.params.HttpClientParams;
-import org.apache.http.conn.ClientConnectionManager;
-import org.apache.http.conn.params.ConnRoutePNames;
-import org.apache.http.conn.scheme.PlainSocketFactory;
-import org.apache.http.conn.scheme.Scheme;
+import org.apache.http.conn.HttpClientConnectionManager;
 import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.message.AbstractHttpMessage;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
 import org.apache.http.protocol.HttpContext;
 import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
@@ -72,7 +69,7 @@
  */
 public class HttpClientTransport extends HttpTransportSupport {
 
-    public static final int MAX_CLIENT_TIMEOUT = 30000;
+    public static final int MAX_CLIENT_TIMEOUT = 90000;
     private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransport.class);
     private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator();
 
@@ -317,9 +314,10 @@
     }
 
     protected HttpClient createHttpClient() {
-        DefaultHttpClient client = new DefaultHttpClient(createClientConnectionManager());
+        HttpClientBuilder clientBuilder = HttpClientBuilder.create();
+        clientBuilder.setConnectionManager(createClientConnectionManager());
         if (useCompression) {
-            client.addRequestInterceptor( new HttpRequestInterceptor() {
+            clientBuilder.addInterceptorLast(new HttpRequestInterceptor() {
                 @Override
                 public void process(HttpRequest request, HttpContext context) {
                     // We expect to received a compression response that we un-gzip
@@ -327,31 +325,30 @@
                 }
             });
         }
+
+        RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
         if (getProxyHost() != null) {
             HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort());
-            client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
+            requestConfigBuilder.setProxy(proxy);
 
-            if (client.getConnectionManager().getSchemeRegistry().get("http") == null) {
-                client.getConnectionManager().getSchemeRegistry().register(
-                    new Scheme("http", getProxyPort(), PlainSocketFactory.getSocketFactory()));
-            }
-
-            if(getProxyUser() != null && getProxyPassword() != null) {
-                client.getCredentialsProvider().setCredentials(
+            if (getProxyUser() != null && getProxyPassword() != null) {
+                CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+                credentialsProvider.setCredentials(
                     new AuthScope(getProxyHost(), getProxyPort()),
                     new UsernamePasswordCredentials(getProxyUser(), getProxyPassword()));
+                clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
             }
         }
 
-        HttpParams params = client.getParams();
-        HttpConnectionParams.setSoTimeout(params, soTimeout);
-        HttpClientParams.setCookiePolicy(params, CookiePolicy.BROWSER_COMPATIBILITY);
+        requestConfigBuilder.setSocketTimeout(soTimeout);
+        requestConfigBuilder.setCookieSpec(CookieSpecs.DEFAULT);
+        clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
 
-        return client;
+        return clientBuilder.build();
     }
 
-    protected ClientConnectionManager createClientConnectionManager() {
-        return new PoolingClientConnectionManager();
+    protected HttpClientConnectionManager createClientConnectionManager() {
+        return new PoolingHttpClientConnectionManager();
     }
 
     protected void configureMethod(AbstractHttpMessage method) {
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java
index 2e432fc..9f6774a 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java
@@ -24,11 +24,13 @@
 import org.apache.activemq.transport.http.HttpClientTransport;
 import org.apache.activemq.transport.util.TextWireFormat;
 import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.http.conn.ClientConnectionManager;
-import org.apache.http.conn.scheme.Scheme;
-import org.apache.http.conn.scheme.SchemeRegistry;
-import org.apache.http.conn.ssl.SSLSocketFactory;
-import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 
 public class HttpsClientTransport extends HttpClientTransport {
 
@@ -37,19 +39,17 @@
     }
 
     @Override
-    protected ClientConnectionManager createClientConnectionManager() {
-        PoolingClientConnectionManager connectionManager = new PoolingClientConnectionManager(createSchemeRegistry());
-        return connectionManager;
+    protected HttpClientConnectionManager createClientConnectionManager() {
+        return new PoolingHttpClientConnectionManager(createRegistry());
     }
 
-    private SchemeRegistry createSchemeRegistry() {
+    private Registry<ConnectionSocketFactory> createRegistry() {
 
-        SchemeRegistry schemeRegistry = new SchemeRegistry();
+        RegistryBuilder<ConnectionSocketFactory> registryBuilder = RegistryBuilder.<ConnectionSocketFactory>create();
         try {
-            SSLSocketFactory sslSocketFactory = new SSLSocketFactory(createSocketFactory(),
-                    SSLSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
-            schemeRegistry.register(new Scheme("https", getRemoteUrl().getPort(), sslSocketFactory));
-            return schemeRegistry;
+            SSLConnectionSocketFactory sslConnectionFactory = new SSLConnectionSocketFactory(createSocketFactory(), new DefaultHostnameVerifier());
+            registryBuilder.register("https", sslConnectionFactory);
+            return registryBuilder.build();
         } catch (Exception e) {
             throw new IllegalStateException("Failure trying to create scheme registry", e);
         }
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientTransportCookiePolicyTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientTransportCookiePolicyTest.java
deleted file mode 100644
index b0f5691..0000000
--- a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientTransportCookiePolicyTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.http;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.activemq.transport.util.TextWireFormat;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.params.HttpClientParams;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test that {@link HttpClientTransport} sets a broad-range compatibility
- * cookie policy.
- *
- * @see <a href="https://issues.apache.org/jira/browse/AMQ-6571">AMQ-6571: HttpClientTransport refuses to accept cookies using `Expires' header</a>
- */
-@SuppressWarnings("deprecation")
-public class HttpClientTransportCookiePolicyTest {
-
-    private HttpClientTransport transport;
-
-
-    /**
-     * Create the transport so we can inspect it.
-     * @throws URISyntaxException if something goes wrong.
-     */
-    @Before
-    public void setUp() throws URISyntaxException {
-        transport = new HttpClientTransport(mock(TextWireFormat.class), new URI("http://localhost:8080/test"));
-    }
-
-
-    /**
-     * Create a new connection and check the connection properties.
-     */
-    @Test
-    public void test() {
-        HttpClient client = transport.createHttpClient();
-        assertEquals("Cookie spec", org.apache.http.client.params.CookiePolicy.BROWSER_COMPATIBILITY, HttpClientParams.getCookiePolicy(client.getParams()));
-    }
-}
diff --git a/activemq-jaas/pom.xml b/activemq-jaas/pom.xml
index f52ee08..4dcfeb7 100644
--- a/activemq-jaas/pom.xml
+++ b/activemq-jaas/pom.xml
@@ -69,22 +69,15 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.directory.server</groupId>
       <artifactId>apacheds-core-integ</artifactId>
       <version>${directory-version}</version>
       <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>bouncycastle</groupId>
-          <artifactId>bcprov-jdk15</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.bouncycastle</groupId>
-      <artifactId>bcprov-jdk15</artifactId>
-      <version>1.46</version>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.directory.server</groupId>
diff --git a/activemq-jaas/src/main/java/org/apache/activemq/jaas/ReloadableProperties.java b/activemq-jaas/src/main/java/org/apache/activemq/jaas/ReloadableProperties.java
index 95781cc..42427d0 100644
--- a/activemq-jaas/src/main/java/org/apache/activemq/jaas/ReloadableProperties.java
+++ b/activemq-jaas/src/main/java/org/apache/activemq/jaas/ReloadableProperties.java
@@ -24,6 +24,8 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +35,7 @@
     private Properties props = new Properties();
     private Map<String, String> invertedProps;
     private Map<String, Set<String>> invertedValueProps;
+    private Map<String, Pattern> regexpProps;
     private long reloadTime = -1;
     private final PropertiesLoader.FileNameKey key;
 
@@ -51,6 +54,7 @@
                 load(key.file(), props);
                 invertedProps = null;
                 invertedValueProps = null;
+                regexpProps = null;
                 if (key.isDebug()) {
                     LOG.debug("Load of: " + key);
                 }
@@ -69,7 +73,10 @@
         if (invertedProps == null) {
             invertedProps = new HashMap<>(props.size());
             for (Map.Entry<Object, Object> val : props.entrySet()) {
-                invertedProps.put((String) val.getValue(), (String) val.getKey());
+                String str = (String) val.getValue();
+                if (!looksLikeRegexp(str)) {
+                    invertedProps.put(str, (String) val.getKey());
+                }
             }
         }
         return invertedProps;
@@ -93,6 +100,24 @@
         return invertedValueProps;
     }
 
+    public synchronized Map<String, Pattern> regexpPropertiesMap() {
+        if (regexpProps == null) {
+            regexpProps = new HashMap<>(props.size());
+            for (Map.Entry<Object, Object> val : props.entrySet()) {
+                String str = (String) val.getValue();
+                if (looksLikeRegexp(str)) {
+                    try {
+                        Pattern p = Pattern.compile(str.substring(1, str.length() - 1));
+                        regexpProps.put((String) val.getKey(), p);
+                    } catch (PatternSyntaxException e) {
+                        LOG.warn("Ignoring invalid regexp: " + str);
+                    }
+                }
+            }
+        }
+        return regexpProps;
+    }
+
     private void load(final File source, Properties props) throws IOException {
         FileInputStream in = new FileInputStream(source);
         try {
@@ -116,4 +141,9 @@
         return key.file.lastModified() > reloadTime;
     }
 
+    private boolean looksLikeRegexp(String str) {
+        int len = str.length();
+        return len > 2 && str.charAt(0) == '/' && str.charAt(len - 1) == '/';
+    }
+
 }
diff --git a/activemq-jaas/src/main/java/org/apache/activemq/jaas/TextFileCertificateLoginModule.java b/activemq-jaas/src/main/java/org/apache/activemq/jaas/TextFileCertificateLoginModule.java
index 42f2c9d..c316367 100644
--- a/activemq-jaas/src/main/java/org/apache/activemq/jaas/TextFileCertificateLoginModule.java
+++ b/activemq-jaas/src/main/java/org/apache/activemq/jaas/TextFileCertificateLoginModule.java
@@ -24,6 +24,7 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import javax.security.auth.Subject;
 import javax.security.auth.callback.CallbackHandler;
@@ -32,13 +33,14 @@
 /**
  * A LoginModule allowing for SSL certificate based authentication based on
  * Distinguished Names (DN) stored in text files. The DNs are parsed using a
- * Properties class where each line is <user_name>=<user_DN>. This class also
- * uses a group definition file where each line is <group_name>=<user_name_1>,<user_name_2>,etc.
+ * Properties class where each line is either <UserName>=<StringifiedSubjectDN>
+ * or <UserName>=/<SubjectDNRegExp>/. This class also uses a group definition
+ * file where each line is <GroupName>=<UserName1>,<UserName2>,etc.
  * The user and group files' locations must be specified in the
  * org.apache.activemq.jaas.textfiledn.user and
- * org.apache.activemq.jaas.textfiledn.user properties respectively. NOTE: This
- * class will re-read user and group files for every authentication (i.e it does
- * live updates of allowed groups and users).
+ * org.apache.activemq.jaas.textfiledn.group properties respectively.
+ * NOTE: This class will re-read user and group files for every authentication
+ * (i.e it does live updates of allowed groups and users).
  *
  * @author sepandm@gmail.com (Sepand)
  */
@@ -48,6 +50,7 @@
     private static final String GROUP_FILE_PROP_NAME = "org.apache.activemq.jaas.textfiledn.group";
 
     private Map<String, Set<String>> groupsByUser;
+    private Map<String, Pattern> regexpByUser;
     private Map<String, String> usersByDn;
 
     /**
@@ -58,6 +61,7 @@
         super.initialize(subject, callbackHandler, sharedState, options);
 
         usersByDn = load(USER_FILE_PROP_NAME, "", options).invertedPropertiesMap();
+        regexpByUser = load(USER_FILE_PROP_NAME, "", options).regexpPropertiesMap();
         groupsByUser = load(GROUP_FILE_PROP_NAME, "", options).invertedPropertiesValuesMap();
      }
 
@@ -76,8 +80,8 @@
         if (certs == null) {
             throw new LoginException("Client certificates not found. Cannot authenticate.");
         }
-
-        return usersByDn.get(getDistinguishedName(certs));
+        String dn = getDistinguishedName(certs);
+        return usersByDn.containsKey(dn) ? usersByDn.get(dn) : getUserByRegexp(dn);
     }
 
     /**
@@ -96,4 +100,17 @@
         }
         return userGroups;
     }
+
+    private synchronized String getUserByRegexp(String dn) {
+        String name = null;
+        for (Map.Entry<String, Pattern> val : regexpByUser.entrySet()) {
+            if (val.getValue().matcher(dn).matches()) {
+                name = val.getKey();
+                break;
+            }
+        }
+        usersByDn.put(dn, name);
+        return name;
+    }
+
 }
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
index 111e730..00911ec 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
@@ -81,8 +81,8 @@
 
     @Override
     public void close() throws JMSException {
-        this.cleanupConnectionTemporaryDestinations();
         this.cleanupAllLoanedSessions();
+        this.cleanupConnectionTemporaryDestinations();
         if (this.pool != null) {
             this.pool.decrementReferenceCount();
             this.pool = null;
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
index 86b4e27..d879580 100644
--- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
@@ -19,6 +19,7 @@
 import static org.junit.Assert.assertEquals;
 
 import javax.jms.Connection;
+import javax.jms.MessageConsumer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.Topic;
@@ -119,6 +120,24 @@
         assertEquals(0, countBrokerTemporaryTopics());
     }
 
+    @Test(timeout = 60000)
+    public void testTemporaryQueueLeakAfterConnectionCloseWithConsumer() throws Exception {
+        Connection pooledConnection = null;
+        Session session = null;
+        Queue tempQueue = null;
+        for (int i = 0; i < 2; i++) {
+            pooledConnection = pooledFactory.createConnection();
+            session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            tempQueue = session.createTemporaryQueue();
+            MessageConsumer consumer = session.createConsumer(tempQueue);
+            consumer.receiveNoWait();
+            LOG.info("Created queue named: " + tempQueue.getQueueName());
+            pooledConnection.close();
+        }
+
+        assertEquals(0, countBrokerTemporaryQueues());
+    }
+
     private int countBrokerTemporaryQueues() throws Exception {
         return ((RegionBroker) brokerService.getRegionBroker()).getTempQueueRegion().getDestinationMap().size();
     }
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 9660afc..1a120a2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1397,6 +1397,8 @@
             if (before != null) {
                 before.sequenceAssignedWithIndexLocked(-1);
             }
+            // Moving the checkpoint pointer as there is no persistent operations in this transaction to be replayed
+            processLocation(location);
             return;
         }
 
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index f143a07..fcf6ba2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -56,7 +56,6 @@
 import org.apache.activemq.store.TransactionIdTransformerAware;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
-import org.apache.activemq.usage.StoreUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
@@ -554,6 +553,15 @@
         return transactionStore.getJournalMaxWriteBatchSize();
     }
 
+
+    public void setJournalCleanupInterval(long journalCleanupInterval) {
+        transactionStore.setJournalCleanupInterval(journalCleanupInterval);
+    }
+
+    public long getJournalCleanupInterval() {
+        return transactionStore.getJournalCleanupInterval();
+    }
+
     public List<PersistenceAdapter> getAdapters() {
         return Collections.unmodifiableList(adapters);
     }
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index f13fc53..55931a6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -63,11 +63,13 @@
     static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
     final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
     final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
-    final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
+    final ConcurrentMap<TransactionId, Tx> pendingCommit = new ConcurrentHashMap<TransactionId, Tx>();
     private Journal journal;
     private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
     private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean recovered = new AtomicBoolean(false);
+    private long journalCleanupInterval = Journal.DEFAULT_CLEANUP_INTERVAL;
 
     public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
         this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
@@ -190,6 +192,14 @@
         this.journalWriteBatchSize = journalWriteBatchSize;
     }
 
+    public void setJournalCleanupInterval(long journalCleanupInterval) {
+        this.journalCleanupInterval = journalCleanupInterval;
+    }
+
+    public long getJournalCleanupInterval() {
+        return journalCleanupInterval;
+    }
+
     public class Tx {
         private final HashMap<TransactionStore, TransactionId> stores = new HashMap<TransactionStore, TransactionId>();
         private int prepareLocationId = 0;
@@ -284,10 +294,12 @@
 
     public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
         tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
+        pendingCommit.put(txid, tx);
     }
 
     public void persistCompletion(TransactionId txid) throws IOException {
         store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
+        pendingCommit.remove(txid);
     }
 
     private Location store(JournalCommand<?> data) throws IOException {
@@ -328,18 +340,26 @@
             journal.setDirectory(getDirectory());
             journal.setMaxFileLength(journalMaxFileLength);
             journal.setWriteBatchSize(journalWriteBatchSize);
+            journal.setCleanupInterval(journalCleanupInterval);
             IOHelper.mkdirs(journal.getDirectory());
             journal.start();
             recoverPendingLocalTransactions();
+            recovered.set(true);
             store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
         }
     }
 
     private void txStoreCleanup() {
+        if (!recovered.get()) {
+            return;
+        }
         Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
         for (Tx tx : inflightTransactions.values()) {
             knownDataFileIds.remove(tx.getPreparedLocationId());
         }
+        for (Tx tx : pendingCommit.values()) {
+            knownDataFileIds.remove(tx.getPreparedLocationId());
+        }
         try {
             journal.removeDataFiles(knownDataFileIds);
         } catch (Exception e) {
@@ -362,11 +382,11 @@
     private void recoverPendingLocalTransactions() throws IOException {
         Location location = journal.getNextLocation(null);
         while (location != null) {
-            process(load(location));
+            process(location, load(location));
             location = journal.getNextLocation(location);
         }
-        recoveredPendingCommit.addAll(inflightTransactions.keySet());
-        LOG.info("pending local transactions: " + recoveredPendingCommit);
+        pendingCommit.putAll(inflightTransactions);
+        LOG.info("pending local transactions: " + pendingCommit.keySet());
     }
 
     public JournalCommand<?> load(Location location) throws IOException {
@@ -381,11 +401,11 @@
         return message;
     }
 
-    public void process(JournalCommand<?> command) throws IOException {
+    public void process(final Location location, JournalCommand<?> command) throws IOException {
         switch (command.type()) {
             case KAHA_PREPARE_COMMAND:
                 KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
-                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
+                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())).trackPrepareLocation(location);
                 break;
             case KAHA_COMMIT_COMMAND:
                 KahaCommitCommand commitCommand = (KahaCommitCommand) command;
@@ -422,10 +442,9 @@
             for (TransactionId txid : broker.getPreparedTransactions(null)) {
                 if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
                     try {
-                        if (recoveredPendingCommit.contains(txid)) {
+                        if (pendingCommit.keySet().contains(txid)) {
                             LOG.info("delivering pending commit outcome for tid: " + txid);
                             broker.commitTransaction(null, txid, false);
-
                         } else {
                             LOG.info("delivering rollback outcome to store for tid: " + txid);
                             broker.forgetTransaction(null, txid);
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 67d4c86..9a6e256 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -124,6 +124,14 @@
         }
     }
 
+    public void setCleanupInterval(long cleanupInterval) {
+        this.cleanupInterval = cleanupInterval;
+    }
+
+    public long getCleanupInterval() {
+        return cleanupInterval;
+    }
+
     public enum PreallocationStrategy {
         SPARSE_FILE,
         OS_KERNEL_COPY,
@@ -230,6 +238,7 @@
     protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
     private File osKernelCopyTemplateFile = null;
     private ByteBuffer preAllocateDirectBuffer = null;
+    private long cleanupInterval = DEFAULT_CLEANUP_INTERVAL;
 
     protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
 
@@ -345,7 +354,7 @@
             public void run() {
                 cleanup();
             }
-        }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
+        }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
 
         long end = System.currentTimeMillis();
         LOG.trace("Startup took: "+(end-start)+" ms");
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java
index 52b2f99..c91020c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java
@@ -22,6 +22,8 @@
 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayInputStream;
 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
 import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.*;
 import java.util.Iterator;
@@ -34,6 +36,8 @@
  */
 public class Transaction implements Iterable<Page> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(Transaction.class);
+
     private RandomAccessFile tmpFile;
     private File txFile;
     private long nextLocation = 0;
@@ -656,6 +660,7 @@
     public void commit() throws IOException {
         if( writeTransactionId!=-1 ) {
             if (tmpFile != null) {
+                LOG.debug("Committing transaction {}: Size {} kb", writeTransactionId, tmpFile.length() / (1024));
                 pageFile.removeTmpFile(getTempFile(), tmpFile);
                 tmpFile = null;
                 txFile = null;
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index 4a0fbc4..5889c6a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -20,6 +20,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -578,6 +579,9 @@
             }
         }
 
+        List<Integer> removedJobFileIds = new ArrayList<>();
+        HashMap<Integer, Integer> decrementJournalCount = new HashMap<>();
+
         for (Long executionTime : keys) {
             List<JobLocation> values = this.index.remove(tx, executionTime);
             if (location != null) {
@@ -586,9 +590,9 @@
 
                     // Remove the references for add and reschedule commands for this job
                     // so that those logs can be GC'd when free.
-                    this.store.decrementJournalCount(tx, job.getLocation());
+                    decrementJournalCount.compute(job.getLocation().getDataFileId(), (key, value) -> value == null ? 1 : value + 1);
                     if (job.getLastUpdate() != null) {
-                        this.store.decrementJournalCount(tx, job.getLastUpdate());
+                        decrementJournalCount.compute(job.getLastUpdate().getDataFileId(), (key, value) -> value == null ? 1 : value + 1);
                     }
 
                     // now that the job is removed from the index we can store the remove info and
@@ -597,11 +601,19 @@
                     // the same file we don't need to track it and just let a normal GC of the logs
                     // remove it when the log is unreferenced.
                     if (job.getLocation().getDataFileId() != location.getDataFileId()) {
-                        this.store.referenceRemovedLocation(tx, location, job);
+                        removedJobFileIds.add(job.getLocation().getDataFileId());
                     }
                 }
             }
         }
+
+        if (!removedJobFileIds.isEmpty()) {
+            this.store.referenceRemovedLocation(tx, location, removedJobFileIds);
+        }
+
+        if (decrementJournalCount.size() > 0) {
+            this.store.decrementJournalCount(tx, decrementJournalCount);
+        }
     }
 
     /**
@@ -657,22 +669,35 @@
      * @param tx
      *        the transaction under which this operation was invoked.
      *
-     * @return a list of all referenced Location values for this JobSchedulerImpl
+     * @return a iterator of all referenced Location values for this JobSchedulerImpl
      *
      * @throws IOException if an error occurs walking the scheduler tree.
      */
-    protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
-        List<JobLocation> references = new ArrayList<>();
+    protected Iterator<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
+        return new Iterator<JobLocation>() {
 
-        for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
-            Map.Entry<Long, List<JobLocation>> entry = i.next();
-            List<JobLocation> scheduled = entry.getValue();
-            for (JobLocation job : scheduled) {
-                references.add(job);
+            final Iterator<Map.Entry<Long, List<JobLocation>>> mapIterator = index.iterator(tx);
+            Iterator<JobLocation> iterator;
+
+            @Override
+            public boolean hasNext() {
+
+                while (iterator == null || !iterator.hasNext()) {
+                    if (!mapIterator.hasNext()) {
+                        break;
+                    }
+
+                    iterator = new ArrayList<>(mapIterator.next().getValue()).iterator();
+                }
+
+                return iterator != null && iterator.hasNext();
             }
-        }
 
-        return references;
+            @Override
+            public JobLocation next() {
+                return iterator.next();
+            }
+        };
     }
 
     @Override
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 79059f1..7720159 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -322,7 +322,7 @@
 
             @Override
             public boolean accept(File dir, String name) {
-                if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log")) {
+                if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log") || name.endsWith(".free")) {
                     return true;
                 }
                 return false;
@@ -493,6 +493,37 @@
     }
 
     /**
+     * Removes multiple references for the Journal log file indicated in the given Location map.
+     *
+     * The references are used to track which log files cannot be GC'd.  When the reference count
+     * on a log file reaches zero the file id is removed from the tracker and the log will be
+     * removed on the next check point update.
+     *
+     * @param tx
+     *      The TX under which the update is to be performed.
+     * @param decrementsByFileIds
+     *      Map indicating how many decrements per fileId.
+     *
+     * @throws IOException if an error occurs while updating the journal references table.
+     */
+    protected void decrementJournalCount(Transaction tx, HashMap<Integer, Integer> decrementsByFileIds) throws IOException {
+        for(Map.Entry<Integer, Integer> entry : decrementsByFileIds.entrySet()) {
+            int logId = entry.getKey();
+            Integer refCount = metaData.getJournalRC().get(tx, logId);
+
+            if (refCount != null) {
+                int refCountValue = refCount;
+                refCountValue -= entry.getValue();
+                if (refCountValue <= 0) {
+                    metaData.getJournalRC().remove(tx, logId);
+                } else {
+                    metaData.getJournalRC().put(tx, logId, refCountValue);
+                }
+            }
+        }
+    }
+
+    /**
      * Updates the Job removal tracking index with the location of a remove command and the
      * original JobLocation entry.
      *
@@ -520,6 +551,33 @@
     }
 
     /**
+     * Updates the Job removal tracking index with the location of a remove command and the
+     * original JobLocation entry.
+     *
+     * The JobLocation holds the locations in the logs where the add and update commands for
+     * a job stored.  The log file containing the remove command can only be discarded after
+     * both the add and latest update log files have also been discarded.
+     *
+     * @param tx
+     *      The TX under which the update is to be performed.
+     * @param location
+     *      The location value to reference a remove command.
+     * @param removedJobsFileId
+     *      List of the original JobLocation instances that holds the add and update locations
+     *
+     * @throws IOException if an error occurs while updating the remove location tracker.
+     */
+    protected void referenceRemovedLocation(Transaction tx, Location location, List<Integer> removedJobsFileId) throws IOException {
+        int logId = location.getDataFileId();
+        List<Integer> removed = this.metaData.getRemoveLocationTracker().get(tx, logId);
+        if (removed == null) {
+            removed = new ArrayList<Integer>();
+        }
+        removed.addAll(removedJobsFileId);
+        this.metaData.getRemoveLocationTracker().put(tx, logId, removed);
+    }
+
+    /**
      * Retrieve the scheduled Job's byte blob from the journal.
      *
      * @param location
@@ -826,8 +884,8 @@
             Map.Entry<String, JobSchedulerImpl> entry = i.next();
             JobSchedulerImpl scheduler = entry.getValue();
 
-            List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
-            for (JobLocation job : jobs) {
+            for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
+                final JobLocation job = jobLocationIterator.next();
                 if (job.getLocation().compareTo(lastAppendLocation) >= 0) {
                     if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) {
                         LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId());
@@ -854,8 +912,8 @@
             Map.Entry<String, JobSchedulerImpl> entry = i.next();
             JobSchedulerImpl scheduler = entry.getValue();
 
-            List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
-            for (JobLocation job : jobs) {
+            for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
+                final JobLocation job = jobLocationIterator.next();
                 missingJournalFiles.add(job.getLocation().getDataFileId());
                 if (job.getLastUpdate() != null) {
                     missingJournalFiles.add(job.getLastUpdate().getDataFileId());
@@ -937,8 +995,8 @@
             Map.Entry<String, JobSchedulerImpl> entry = i.next();
             JobSchedulerImpl scheduler = entry.getValue();
 
-            List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
-            for (JobLocation job : jobs) {
+            for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
+                final JobLocation job = jobLocationIterator.next();
 
                 // Remove all jobs in missing log files.
                 if (missing.contains(job.getLocation().getDataFileId())) {
diff --git a/activemq-karaf/pom.xml b/activemq-karaf/pom.xml
index 3859da9..60d64ee 100644
--- a/activemq-karaf/pom.xml
+++ b/activemq-karaf/pom.xml
@@ -33,9 +33,8 @@
   <properties>
     <xpp3-bundle-version>1.1.4c_5</xpp3-bundle-version>
     <jodatime-bundle-version>2.9</jodatime-bundle-version>
-    <jdom-bundle-version>1.1_4</jdom-bundle-version>
     <dom4j-bundle-version>1.6.1_2</dom4j-bundle-version>
-    <xstream-bundle-version>1.4.8_1</xstream-bundle-version>
+    <xstream-bundle-version>1.4.11.1_1</xstream-bundle-version>
     <servicemix.specs.version>2.4.0</servicemix.specs.version>
   </properties>
 
@@ -154,7 +153,7 @@
                 </artifact>
                 <artifact>
                   <file>target/classes/activemq.xml</file>
-                  <type>xml</type>
+                  <type>xml</type>>
                   <classifier>activemq</classifier>
                 </artifact>
                 <artifact>
@@ -180,12 +179,17 @@
             <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
             <Export-Package>org.apache.activemq.karaf*;version=${project.version};-split-package:=merge-first</Export-Package>
             <Import-Package>
+              !com.google.errorprone.annotations,
+              !com.google.errorprone.annotations.concurrent,
               org.apache.felix.gogo.commands,
               org.apache.karaf.shell.console;version="[2,4)",
               org.apache.karaf.shell.console.commands;version="[2,4)",
               *
             </Import-Package>
-            <Private-Package>!*</Private-Package>
+            <Private-Package>
+              com.google.errorprone.annotations,
+              com.google.errorprone.annotations.concurrent
+            </Private-Package>
             <_versionpolicy>[$(version;==;$(@)),$(version;+;$(@)))</_versionpolicy>
             <Embed-Transitive>true</Embed-Transitive>
           </instructions>
diff --git a/activemq-karaf/src/main/resources/features.xml b/activemq-karaf/src/main/resources/features.xml
index c989705..9c514df 100644
--- a/activemq-karaf/src/main/resources/features.xml
+++ b/activemq-karaf/src/main/resources/features.xml
@@ -25,15 +25,15 @@
     <!-- Starts the broker with default configuration -->
     <feature name="activemq-broker-noweb" description="Full ActiveMQ broker with default configuration" version="${project.version}" resolver="(obr)" start-level="50">
         <feature version="${project.version}">activemq</feature>
-        <configfile finalname="/etc/activemq.xml">mvn:org.apache.activemq/activemq-karaf/${project.version}/xml/activemq</configfile>
-        <configfile finalname="/etc/org.apache.activemq.server-default.cfg">mvn:org.apache.activemq/activemq-karaf/${project.version}/cfg/activemq</configfile>
+        <configfile finalname="${karaf.etc}/activemq.xml">mvn:org.apache.activemq/activemq-karaf/${project.version}/xml/activemq</configfile>
+        <configfile finalname="${karaf.etc}/org.apache.activemq.server-default.cfg">mvn:org.apache.activemq/activemq-karaf/${project.version}/cfg/activemq</configfile>
     </feature>
 
     <!-- Starts the broker with default configuration and web console -->
     <feature name="activemq-broker" description="Full ActiveMQ broker with default configuration and web console" version="${project.version}" resolver="(obr)" start-level="50">
         <feature version="${project.version}">activemq</feature>
-        <configfile finalname="/etc/activemq.xml">mvn:org.apache.activemq/activemq-karaf/${project.version}/xml/activemq</configfile>
-        <configfile finalname="/etc/org.apache.activemq.server-default.cfg">mvn:org.apache.activemq/activemq-karaf/${project.version}/cfg/activemq</configfile>
+        <configfile finalname="${karaf.etc}/activemq.xml">mvn:org.apache.activemq/activemq-karaf/${project.version}/xml/activemq</configfile>
+        <configfile finalname="${karaf.etc}/org.apache.activemq.server-default.cfg">mvn:org.apache.activemq/activemq-karaf/${project.version}/cfg/activemq</configfile>
         <feature version="${project.version}">activemq-web-console</feature>
     </feature>
 
@@ -49,7 +49,7 @@
     <feature name="activemq-web-console" version="${project.version}" resolver="(obr)" start-level="50">
       <feature start-level="10">war</feature>
       <feature start-level="10">eventadmin</feature>
-      <configfile finalname="/etc/org.apache.activemq.webconsole.cfg">mvn:org.apache.activemq/activemq-karaf/${project.version}/cfg/activemq-webconsole</configfile>
+      <configfile finalname="${karaf.etc}/org.apache.activemq.webconsole.cfg">mvn:org.apache.activemq/activemq-karaf/${project.version}/cfg/activemq-webconsole</configfile>
       <bundle>mvn:org.apache.activemq/activemq-web-console/${project.version}/war</bundle>
     </feature>
 
diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml
index 15fb1b1..d8ac05c 100644
--- a/activemq-mqtt/pom.xml
+++ b/activemq-mqtt/pom.xml
@@ -320,7 +320,7 @@
       <!-- for the paho dependency -->
       <repository>
           <id>spring.io</id>
-          <url>http://repo.spring.io/plugins-release</url>
+          <url>https://repo.spring.io/plugins-release</url>
           <releases><enabled>true</enabled></releases>
           <snapshots><enabled>false</enabled></snapshots>
       </repository>
diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml
index 9a406f0..9f8bd8a 100644
--- a/activemq-osgi/pom.xml
+++ b/activemq-osgi/pom.xml
@@ -35,6 +35,10 @@
       !org.apache.commons.daemon,
       !org.apache.maven*,
       !com.google.thirdparty.publicsuffix,
+      !com.rometools*,
+      !com.google.errorprone.annotations,
+      !com.google.errorprone.annotations.concurrent,
+      !com.google.j2objc.annotations,
       sun.misc*;resolution:=optional,
       sun.nio*;resolution:=optional,
       javax.jmdns*;resolution:=optional,
@@ -62,7 +66,7 @@
       org.springframework*;version="[4,5)";resolution:=optional,
       org.xmlpull*;resolution:=optional,
       scala*;resolution:=optional,
-      javax.annotation*,
+      javax.annotation*;version="[1,4)",
       !com.thoughtworks.qdox*,
       org.apache.commons.logging;version="[1.2,2)";resolution:=optional,
       javax.jms*;version="[1.1,3)",
@@ -92,6 +96,9 @@
          org.fusesource.hawtbuf*,
          org.apache.qpid*,
          com.google.common*,
+         com.google.errorprone.annotations,
+         com.google.errorprone.annotations.concurrent,
+         com.google.j2objc.annotations,
          org.linkedin*,
          org.iq80*
     </activemq.osgi.private.pkg>
@@ -163,7 +170,8 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>activemq-web</artifactId>
       <exclusions>
-      <exclusion>
+        <exclusion>
+          <groupId>${project.groupId}</groupId>
           <artifactId>activemq-all</artifactId>
         </exclusion>
       </exclusions>
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
index a89d5ee..b3deac4 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
@@ -22,7 +22,7 @@
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -116,8 +116,10 @@
     private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<>();
     private final StompTransport stompTransport;
 
-    private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<>();
-    private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
+    // Global Map shared with all subscriptions to allow finding the sub associated with an ACK Id
+    private final ConcurrentMap<String, StompAckEntry> pendingAcksTracker = new ConcurrentHashMap<>();
+    // Read-Only view used in this class to enforce the separation of read vs update of the global index.
+    private final Map<String, StompAckEntry> pendingAcks = Collections.unmodifiableMap(pendingAcksTracker);
 
     private final Object commnadIdMutex = new Object();
     private int lastCommandId;
@@ -131,34 +133,6 @@
     private float hbGracePeriodMultiplier = 1.0f;
     private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
 
-    private static class AckEntry {
-
-        private final String messageId;
-        private final StompSubscription subscription;
-
-        public AckEntry(String messageId, StompSubscription subscription) {
-            this.messageId = messageId;
-            this.subscription = subscription;
-        }
-
-        public MessageAck onMessageAck(TransactionId transactionId) {
-            return subscription.onStompMessageAck(messageId, transactionId);
-        }
-
-        public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
-            return subscription.onStompMessageNack(messageId, transactionId);
-        }
-
-        public String getMessageId() {
-            return this.messageId;
-        }
-
-        @SuppressWarnings("unused")
-        public StompSubscription getSubscription() {
-            return this.subscription;
-        }
-    }
-
     public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
         this.stompTransport = stompTransport;
         this.brokerContext = brokerContext;
@@ -297,7 +271,11 @@
         // Let the stomp client know about any protocol errors.
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
-        exception.printStackTrace(stream);
+        if (exception instanceof SecurityException || exception.getCause() instanceof SecurityException) {
+            stream.write(exception.getLocalizedMessage());
+        } else {
+            exception.printStackTrace(stream);
+        }
         stream.close();
 
         HashMap<String, String> headers = new HashMap<>();
@@ -383,9 +361,9 @@
         boolean nacked = false;
 
         if (ackId != null) {
-            AckEntry pendingAck = this.pedingAcks.remove(ackId);
+            StompAckEntry pendingAck = this.pendingAcks.get(ackId);
             if (pendingAck != null) {
-                messageId = pendingAck.getMessageId();
+                messageId = pendingAck.getMessageId().toString();
                 MessageAck ack = pendingAck.onMessageNack(activemqTx);
                 if (ack != null) {
                     sendToActiveMQ(ack, createResponseHandler(command));
@@ -439,9 +417,9 @@
         boolean acked = false;
 
         if (ackId != null) {
-            AckEntry pendingAck = this.pedingAcks.remove(ackId);
+            StompAckEntry pendingAck = this.pendingAcks.get(ackId);
             if (pendingAck != null) {
-                messageId = pendingAck.getMessageId();
+                messageId = pendingAck.getMessageId().toString();
                 MessageAck ack = pendingAck.onMessageAck(activemqTx);
                 if (ack != null) {
                     sendToActiveMQ(ack, createResponseHandler(command));
@@ -522,8 +500,6 @@
             sub.onStompCommit(activemqTx);
         }
 
-        pedingAcks.clear();
-
         TransactionInfo tx = new TransactionInfo();
         tx.setConnectionId(connectionId);
         tx.setTransactionId(activemqTx);
@@ -553,8 +529,6 @@
             }
         }
 
-        pedingAcks.clear();
-
         TransactionInfo tx = new TransactionInfo();
         tx.setConnectionId(connectionId);
         tx.setTransactionId(activemqTx);
@@ -620,9 +594,9 @@
 
         StompSubscription stompSubscription;
         if (!consumerInfo.isBrowser()) {
-            stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+            stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
         } else {
-            stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+            stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
         }
         stompSubscription.setDestination(actualDest);
 
@@ -841,6 +815,7 @@
 
     protected void onStompDisconnect(StompFrame command) throws ProtocolException {
         if (connected.get()) {
+            LOG.trace("Connection closed with {} pending ACKs still being tracked.", pendingAcks.size());
             sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
             sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
             connected.set(false);
@@ -876,19 +851,7 @@
             MessageDispatch md = (MessageDispatch)command;
             StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
             if (sub != null) {
-                String ackId = null;
-                if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) {
-                    AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub);
-                    ackId = this.ACK_ID_GENERATOR.generateId();
-                    this.pedingAcks.put(ackId, pendingAck);
-                }
-                try {
-                    sub.onMessageDispatch(md, ackId);
-                } catch (Exception ex) {
-                    if (ackId != null) {
-                        this.pedingAcks.remove(ackId);
-                    }
-                }
+                sub.onMessageDispatch(md);
             }
         } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
             stompTransport.sendToStomp(ping);
@@ -1048,26 +1011,15 @@
         return result;
     }
 
-    /**
-     * Remove all pending acknowledgement markers that are batched into the single
-     * client acknowledge operation.
-     *
-     * @param subscription
-     *      The STOMP Subscription that has performed a client acknowledge.
-     * @param msgIdsToRemove
-     *      List of message IDs that are bound to the subscription that has ack'd
-     */
-    protected void afterClientAck(StompSubscription subscription, ArrayList<String> msgIdsToRemove) {
-        int count = 0;
+    boolean isStomp10() {
+        return version.equals(Stomp.V1_0);
+    }
 
-        for (Map.Entry<String,AckEntry> entry : this.pedingAcks.entrySet()){
-            AckEntry actEntry = entry.getValue();
-            if (msgIdsToRemove.contains(actEntry.messageId)) {
-                this.pedingAcks.remove(entry.getKey());
-                count++;
-            }
-        }
+    boolean isStomp11() {
+        return version.equals(Stomp.V1_1);
+    }
 
-        LOG.trace("Subscription:[{}] client acknowledged {} messages", subscription.getSubscriptionId(), count);
+    boolean isStomp12() {
+        return version.equals(Stomp.V1_2);
     }
 }
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java
new file mode 100644
index 0000000..1edcb62
--- /dev/null
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * Tracker object for Messages that carry STOMP v1.2 ACK IDs
+ */
+public class StompAckEntry {
+
+    private final String ackId;
+    private final MessageId messageId;
+    private final StompSubscription subscription;
+    private final MessageDispatch dispatch;
+
+    public StompAckEntry(MessageDispatch dispatch, String ackId, StompSubscription subscription) {
+        this.messageId = dispatch.getMessage().getMessageId();
+        this.subscription = subscription;
+        this.ackId = ackId;
+        this.dispatch = dispatch;
+    }
+
+    public MessageAck onMessageAck(TransactionId transactionId) {
+        return subscription.onStompMessageAck(messageId.toString(), transactionId);
+    }
+
+    public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
+        return subscription.onStompMessageNack(messageId.toString(), transactionId);
+    }
+
+    public MessageId getMessageId() {
+        return this.messageId;
+    }
+
+    public MessageDispatch getMessageDispatch() {
+        return this.dispatch;
+    }
+
+    public String getAckId() {
+        return this.ackId;
+    }
+
+    public StompSubscription getSubscription() {
+        return this.subscription;
+    }
+
+    @Override
+    public String toString() {
+        return "AckEntry[ msgId:" + messageId + ", ackId:" + ackId + ", sub:" + subscription + " ]";
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((messageId == null) ? 0 : messageId.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        StompAckEntry other = (StompAckEntry) obj;
+        if (messageId == null) {
+            if (other.messageId != null) {
+                return false;
+            }
+        } else if (!messageId.equals(other.messageId)) {
+            return false;
+        }
+
+        return true;
+    }
+}
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
index 9e267ac..1238572 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.stomp;
 
 import java.io.IOException;
+import java.util.Map;
 
 import javax.jms.JMSException;
 
@@ -27,15 +28,14 @@
 
 public class StompQueueBrowserSubscription extends StompSubscription {
 
-    public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
-        super(stompTransport, subscriptionId, consumerInfo, transformation);
+    public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation, Map<String, StompAckEntry> pendingAcks) {
+        super(stompTransport, subscriptionId, consumerInfo, transformation, pendingAcks);
     }
 
     @Override
-    void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
-
+    void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
         if (md.getMessage() != null) {
-            super.onMessageDispatch(md, ackId);
+            super.onMessageDispatch(md);
         } else {
             StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE);
             browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId());
@@ -52,5 +52,4 @@
     public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
         throw new ProtocolException("Cannot Nack a message on a Queue Browser Subscription.");
     }
-
 }
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
index 0d8e308..95fe986 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
@@ -17,12 +17,10 @@
 package org.apache.activemq.transport.stomp;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import javax.jms.JMSException;
 
@@ -34,6 +32,9 @@
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.IdGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Keeps track of the STOMP subscription so that acking is correctly done.
@@ -42,6 +43,10 @@
  */
 public class StompSubscription {
 
+    private static final Logger LOG = LoggerFactory.getLogger(StompSubscription.class);
+
+    private static final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
+
     public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
     public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
     public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
@@ -50,27 +55,37 @@
     protected final String subscriptionId;
     protected final ConsumerInfo consumerInfo;
 
-    protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<>();
-    protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<>();
+    protected final Map<MessageId, StompAckEntry> dispatchedMessage = new LinkedHashMap<>();
+    protected final Map<String, StompAckEntry> pendingAcks; // STOMP v1.2 requires ACK ID tracking
+    protected final LinkedList<StompAckEntry> transactedMessages = new LinkedList<>();
 
     protected String ackMode = AUTO_ACK;
     protected ActiveMQDestination destination;
     protected String transformation;
 
-    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
+    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation, Map<String, StompAckEntry> pendingAcks) {
         this.protocolConverter = stompTransport;
         this.subscriptionId = subscriptionId;
         this.consumerInfo = consumerInfo;
         this.transformation = transformation;
+        this.pendingAcks = pendingAcks;
     }
 
-    void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
+    void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
         ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
-        if (ackMode.equals(CLIENT_ACK) || ackMode.equals(INDIVIDUAL_ACK)) {
+
+        String ackId = null;
+        if (isClientAck() || isIndividualAck()) {
+            ackId = ACK_ID_GENERATOR.generateId();
+            StompAckEntry pendingAck = new StompAckEntry(md, ackId, this);
+
             synchronized (this) {
-                dispatchedMessage.put(message.getMessageId(), md);
+                dispatchedMessage.put(message.getMessageId(), pendingAck);
             }
-        } else if (ackMode.equals(AUTO_ACK)) {
+            if (protocolConverter.isStomp12()) {
+                this.pendingAcks.put(ackId, pendingAck);
+            }
+        } else if (isAutoAck()) {
             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
             protocolConverter.getStompTransport().sendToActiveMQ(ack);
         }
@@ -93,35 +108,48 @@
             command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
         }
 
-        if (ackId != null) {
+        if (protocolConverter.isStomp12() && ackId != null) {
             command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
         }
 
-        protocolConverter.getStompTransport().sendToStomp(command);
+        try {
+            protocolConverter.getStompTransport().sendToStomp(command);
+        } catch (IOException ex) {
+            if (ackId != null) {
+                pendingAcks.remove(ackId);
+            }
+            throw ex;
+        }
     }
 
     synchronized void onStompAbort(TransactionId transactionId) {
-        unconsumedMessage.clear();
+        // Restore the pending ACKs so that their ACK IDs are again valid for a client
+        // to operate on.
+        LOG.trace("Transaction Abort restoring {} pending ACKs to valid state.", transactedMessages.size());
+        for (StompAckEntry ackEntry : transactedMessages) {
+            if (protocolConverter.isStomp12()) {
+                pendingAcks.put(ackEntry.getAckId(), ackEntry);
+            }
+        }
+        transactedMessages.clear();
     }
 
     void onStompCommit(TransactionId transactionId) {
         MessageAck ack = null;
         synchronized (this) {
-            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
-                @SuppressWarnings("rawtypes")
-                Map.Entry entry = (Entry)iter.next();
-                MessageDispatch msg = (MessageDispatch)entry.getValue();
-                if (unconsumedMessage.contains(msg)) {
-                    iter.remove();
+            for (Iterator<StompAckEntry> iterator = dispatchedMessage.values().iterator(); iterator.hasNext();) {
+                StompAckEntry ackEntry = iterator.next();
+                if (transactedMessages.contains(ackEntry)) {
+                    iterator.remove();
                 }
             }
 
             // For individual Ack we already sent an Ack that will be applied on commit
             // we don't send a second standard Ack as that would produce an error.
-            if (!unconsumedMessage.isEmpty() && ackMode == CLIENT_ACK) {
-                ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
+            if (!transactedMessages.isEmpty() && isClientAck()) {
+                ack = new MessageAck(transactedMessages.getLast().getMessageDispatch(), MessageAck.STANDARD_ACK_TYPE, transactedMessages.size());
                 ack.setTransactionId(transactionId);
-                unconsumedMessage.clear();
+                transactedMessages.clear();
             }
         }
         // avoid contention with onMessageDispatch
@@ -131,10 +159,10 @@
     }
 
     synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
-
         MessageId msgId = new MessageId(messageId);
 
-        if (!dispatchedMessage.containsKey(msgId)) {
+        final StompAckEntry ackEntry = dispatchedMessage.get(msgId);
+        if (ackEntry == null) {
             return null;
         }
 
@@ -142,35 +170,33 @@
         ack.setDestination(consumerInfo.getDestination());
         ack.setConsumerId(consumerInfo.getConsumerId());
 
-        final ArrayList<String> acknowledgedMessages = new ArrayList<>();
-
-        if (ackMode == CLIENT_ACK) {
+        if (isClientAck()) {
             if (transactionId == null) {
                 ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
             } else {
                 ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
             }
             int count = 0;
-            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
+            for (Iterator<StompAckEntry> iterator = dispatchedMessage.values().iterator(); iterator.hasNext();) {
+                StompAckEntry entry = iterator.next();
+                MessageId current = entry.getMessageId();
 
-                @SuppressWarnings("rawtypes")
-                Map.Entry entry = (Entry)iter.next();
-                MessageId id = (MessageId)entry.getKey();
-                MessageDispatch msg = (MessageDispatch)entry.getValue();
+                if (entry.getAckId() != null) {
+                    pendingAcks.remove(entry.getAckId());
+                }
 
                 if (transactionId != null) {
-                    if (!unconsumedMessage.contains(msg)) {
-                        unconsumedMessage.add(msg);
+                    if (!transactedMessages.contains(entry)) {
+                        transactedMessages.add(entry);
                         count++;
                     }
                 } else {
-                    acknowledgedMessages.add(id.toString());
-                    iter.remove();
+                    iterator.remove();
                     count++;
                 }
 
-                if (id.equals(msgId)) {
-                    ack.setLastMessageId(id);
+                if (current.equals(msgId)) {
+                    ack.setLastMessageId(current);
                     break;
                 }
             }
@@ -178,14 +204,15 @@
             if (transactionId != null) {
                 ack.setTransactionId(transactionId);
             }
-
-            this.protocolConverter.afterClientAck(this, acknowledgedMessages);
-        } else if (ackMode == INDIVIDUAL_ACK) {
+        } else if (isIndividualAck()) {
+            if (ackEntry.getAckId() != null) {
+                pendingAcks.remove(ackEntry.getAckId());
+            }
             ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
             ack.setMessageID(msgId);
             ack.setMessageCount(1);
             if (transactionId != null) {
-                unconsumedMessage.add(dispatchedMessage.get(msgId));
+                transactedMessages.add(dispatchedMessage.get(msgId));
                 ack.setTransactionId(transactionId);
             } else {
                 dispatchedMessage.remove(msgId);
@@ -196,23 +223,29 @@
     }
 
     public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
-
         MessageId msgId = new MessageId(messageId);
 
         if (!dispatchedMessage.containsKey(msgId)) {
             return null;
         }
 
+        final StompAckEntry ackEntry = dispatchedMessage.get(msgId);
+
+        if (ackEntry.getAckId() != null) {
+            pendingAcks.remove(ackEntry.getAckId());
+        }
+
         MessageAck ack = new MessageAck();
         ack.setDestination(consumerInfo.getDestination());
         ack.setConsumerId(consumerInfo.getConsumerId());
         ack.setAckType(MessageAck.POSION_ACK_TYPE);
         ack.setMessageID(msgId);
         if (transactionId != null) {
-            unconsumedMessage.add(dispatchedMessage.get(msgId));
+            transactedMessages.add(ackEntry);
             ack.setTransactionId(transactionId);
+        } else {
+            dispatchedMessage.remove(msgId);
         }
-        dispatchedMessage.remove(msgId);
 
         return ack;
     }
@@ -225,6 +258,18 @@
         this.ackMode = ackMode;
     }
 
+    public boolean isAutoAck() {
+        return ackMode.equals(AUTO_ACK);
+    }
+
+    public boolean isClientAck() {
+        return ackMode.equals(CLIENT_ACK);
+    }
+
+    public boolean isIndividualAck() {
+        return ackMode.equals(INDIVIDUAL_ACK);
+    }
+
     public String getSubscriptionId() {
         return subscriptionId;
     }
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
index b7560c7..3944c50 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
@@ -21,7 +21,9 @@
 
 import java.io.IOException;
 import java.net.Socket;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
@@ -40,7 +42,6 @@
 
     @Override
     public void setUp() throws Exception {
-
         super.setUp();
 
         stompConnect();
@@ -70,7 +71,6 @@
 
     @Test(timeout = 60000)
     public void testTelnetStyleSends() throws Exception {
-
         stompConnection.setVersion(Stomp.V1_2);
 
         String connect = "CONNECT\r\n" +
@@ -107,7 +107,6 @@
 
     @Test(timeout = 60000)
     public void testClientAckWithoutAckId() throws Exception {
-
         stompConnection.setVersion(Stomp.V1_2);
 
         String connect = "STOMP\r\n" +
@@ -150,18 +149,40 @@
         assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
         assertEquals("1", received.getBody());
 
-        String frame = "ACK\n" + "message-id:" +
-                received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+        String ackId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
+
+        // Put ACK ID in wrong header
+        String frame = "ACK\n" + "message-id:" + ackId + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         received = stompConnection.receive();
         assertTrue(received.getAction().equals("ERROR"));
         LOG.info("Broker sent: " + received);
+
+        // Now place it in the correct location and check it still works.
+        frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("2", receiptId);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        frame = "DISCONNECT\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
     }
 
     @Test(timeout = 60000)
     public void testClientAck() throws Exception {
-
         stompConnection.setVersion(Stomp.V1_2);
 
         String connect = "STOMP\r\n" +
@@ -255,11 +276,106 @@
         frame = "ACK\n" + "id:" +
                 received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        frame = "DISCONNECT\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test(timeout = 60000)
+    public void testClientAckMultipleMessagesWithSingleAck() throws Exception {
+        final int MESSAGE_COUNT = 10;
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        // Send some messages
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+            stompConnection.sendFrame(message);
+        }
+
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "ack:client\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+        assertEquals(MESSAGE_COUNT, getProxyToQueue(getQueueName()).getQueueSize());
+
+        String lastAckId = null;
+
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            StompFrame received = stompConnection.receive();
+            LOG.info("Broker sent: " + received);
+            assertTrue(received.getAction().equals("MESSAGE"));
+            assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+            assertEquals(String.format("%d", n), received.getBody());
+
+            lastAckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
+        }
+
+        String frame = "ACK\n" + "id:" +  lastAckId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("2", receiptId);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        frame = "DISCONNECT\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() == 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
     }
 
     @Test(timeout = 60000)
     public void testClientIndividualAck() throws Exception {
-
         stompConnection.setVersion(Stomp.V1_2);
 
         String connect = "STOMP\r\n" +
@@ -345,24 +461,117 @@
         assertTrue(received.getAction().equals("MESSAGE"));
         assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
         assertEquals("1", received.getBody());
+        String message1AckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
 
-        frame = "ACK\n" + "id:" +
-                received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+        frame = "ACK\n" + "id:" + message1AckId + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         received = stompConnection.receive();
         assertTrue(received.getAction().equals("MESSAGE"));
         assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
         assertEquals("3", received.getBody());
+        String message3AckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
 
-        frame = "ACK\n" + "id:" +
-                received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+        frame = "ACK\n" + "id:" + message3AckId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("2", receiptId);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        frame = "DISCONNECT\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
     }
 
     @Test(timeout = 60000)
-    public void testQueueBrowerSubscription() throws Exception {
+    public void testRepeatedClientIndividualAckInMultipleTransactions() throws Exception {
+        final int MESSAGE_COUNT = 50;
 
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        // Send some messages
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+            stompConnection.sendFrame(message);
+        }
+
+        // Subscribe to the queue
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "activemq.prefetchSize:1\n" +
+                           "ack:client-individual\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        // Receive all messages, each in their own transaction
+        // Ensure we don't have any errors
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            StompFrame received = stompConnection.receive();
+            LOG.info("Broker sent: " + received);
+            assertTrue(received.getAction().equals("MESSAGE"));
+            assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+            assertEquals(String.format("%d", n), received.getBody());
+
+            // Ack & Commit the first message
+            String begin = "BEGIN\n" + "transaction:tx" + String.format("%d", n) + "\n\n" + Stomp.NULL;
+            stompConnection.sendFrame(begin);
+
+            String frame = "ACK\n" + "transaction:tx" + String.format("%d", n) + "\n" + "id:" +
+                    received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+            stompConnection.sendFrame(frame);
+
+            String commit = "COMMIT\n" + "transaction:tx" + String.format("%d", n) + "\n\n" + Stomp.NULL;
+            stompConnection.sendFrame(commit);
+        }
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+    }
+
+    @Test(timeout = 60000)
+    public void testQueueBrowerSubscription() throws Exception {
         final int MSG_COUNT = 10;
 
         String connectFrame = "STOMP\n" +
@@ -523,7 +732,6 @@
 
     @Test(timeout = 60000)
     public void testSubscribeWithNoId() throws Exception {
-
         String connectFrame = "STOMP\n" +
                               "login:system\n" +
                               "passcode:manager\n" +
@@ -571,7 +779,7 @@
 
         long usageStart = brokerService.getSystemUsage().getMemoryUsage().getUsage();
 
-        for(int i = 0; i < MSG_COUNT; ++i) {
+        for (int i = 0; i < MSG_COUNT; ++i) {
             String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
                              "receipt:0\n" +
                              "myXkProp:" + bigProp + "\n"+
@@ -593,11 +801,372 @@
                            "id:12345\n" + "browser:true\n\n" + Stomp.NULL;
         stompConnection.sendFrame(subscribe);
 
-        for(int i = 0; i < MSG_COUNT; ++i) {
+        for (int i = 0; i < MSG_COUNT; ++i) {
             StompFrame message = stompConnection.receive();
             assertEquals(Stomp.Responses.MESSAGE, message.getAction());
             assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
         }
+    }
 
+    @Test(timeout = 60000)
+    public void testAckMessagesAfterTransactionAbortClientIndividualAckMode() throws Exception {
+        doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testNackMessagesAfterTransactionAbortClientIndividualAckMode() throws Exception {
+        doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(true);
+    }
+
+    private void doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(boolean nack) throws Exception {
+        final int MESSAGE_COUNT = 10;
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        // Send some messages
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+            stompConnection.sendFrame(message);
+        }
+
+        // Subscribe to the queue
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "activemq.prefetchSize:1\n" +
+                           "ack:client-individual\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        // Start a TX that will later be aborted.
+        String begin = "BEGIN\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(begin);
+
+        List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
+
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            StompFrame received = stompConnection.receive();
+            LOG.info("Broker sent: " + received);
+            assertTrue(received.getAction().equals("MESSAGE"));
+            assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+            assertEquals(String.format("%d", n), received.getBody());
+
+            ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
+
+            String frame = "ACK\n" + "transaction:tx1" + "\n" + "id:" +
+                    received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+            stompConnection.sendFrame(frame);
+        }
+
+        String commit = "ABORT\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(commit);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        for (String ackId : ackIds) {
+            if (nack) {
+                String frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+                stompConnection.sendFrame(frame);
+            } else {
+                String frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+                stompConnection.sendFrame(frame);
+            }
+
+            receipt = stompConnection.receive();
+            LOG.info("Broker sent: " + receipt);
+            assertTrue(receipt.getAction().startsWith("RECEIPT"));
+            receiptId = receipt.getHeaders().get("receipt-id");
+            assertEquals("2", receiptId);
+        }
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+    }
+
+    @Test(timeout = 60000)
+    public void testAckMessagesAfterTransactionAbortClientAckMode() throws Exception {
+        doTestMessagesRetirementAfterTransactionAbortClientAckMode(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testNackMessagesAfterTransactionAbortClientAckMode() throws Exception {
+        doTestMessagesRetirementAfterTransactionAbortClientAckMode(true);
+    }
+
+    private void doTestMessagesRetirementAfterTransactionAbortClientAckMode(boolean nack) throws Exception {
+        final int MESSAGE_COUNT = 10;
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        // Send some messages
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+            stompConnection.sendFrame(message);
+        }
+
+        // Subscribe to the queue
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "activemq.prefetchSize:" + MESSAGE_COUNT + "\n" +
+                           "ack:client\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        // Start a TX that will later be aborted.
+        String begin = "BEGIN\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(begin);
+
+        List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
+
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            StompFrame received = stompConnection.receive();
+            LOG.info("Broker sent: " + received);
+            assertTrue(received.getAction().equals("MESSAGE"));
+            assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+            assertEquals(String.format("%d", n), received.getBody());
+
+            ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
+        }
+
+        // Client ACK that enlists all messages in the TX
+        String frame = "ACK\n" + "transaction:tx1" + "\n" + "id:" + ackIds.get(MESSAGE_COUNT - 1) + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        String commit = "ABORT\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(commit);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        for (String ackId : ackIds) {
+            if (nack) {
+                frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+                stompConnection.sendFrame(frame);
+            } else {
+                frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+                stompConnection.sendFrame(frame);
+            }
+
+            receipt = stompConnection.receive();
+            LOG.info("Broker sent: " + receipt);
+            assertTrue(receipt.getAction().startsWith("RECEIPT"));
+            receiptId = receipt.getHeaders().get("receipt-id");
+            assertEquals("2", receiptId);
+        }
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+    }
+
+    @Test(timeout = 60000)
+    public void testMixedAckNackWithMessageAckIdsClientAck() throws Exception {
+        doTestMixedAckNackWithMessageAckIds(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testMixedAckNackWithMessageAckIdsClientIndividualAck() throws Exception {
+        doTestMixedAckNackWithMessageAckIds(true);
+    }
+
+    public void doTestMixedAckNackWithMessageAckIds(boolean individual) throws Exception {
+
+        final int MESSAGE_COUNT = 20;
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        // Send some messages
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+            stompConnection.sendFrame(message);
+        }
+
+        // Subscribe to the queue
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "activemq.prefetchSize:" + MESSAGE_COUNT + "\n" +
+                           "ack:" + (individual ? "client-individual" : "client") + "\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
+
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            StompFrame received = stompConnection.receive();
+            LOG.info("Broker sent: " + received);
+            assertTrue(received.getAction().equals("MESSAGE"));
+            assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+            assertEquals(String.format("%d", n), received.getBody());
+
+            ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
+        }
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        boolean nack = false;
+
+        for (String ackId : ackIds) {
+            if (nack) {
+                String frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+                stompConnection.sendFrame(frame);
+                nack = !nack;
+            } else {
+                String frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+                stompConnection.sendFrame(frame);
+                nack = !nack;
+            }
+
+            receipt = stompConnection.receive();
+            LOG.info("Broker sent: " + receipt);
+            assertTrue(receipt.getAction().startsWith("RECEIPT"));
+            receiptId = receipt.getHeaders().get("receipt-id");
+            assertEquals("2", receiptId);
+        }
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
     }
 }
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
index 7ded503..5e96385 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
@@ -901,6 +901,7 @@
         try {
             String f = stompConnection.receiveFrame();
             assertTrue(f.startsWith("ERROR"));
+            assertFalse("no stack trace impl leak:" + f, f.contains("at "));
         } catch (IOException socketMayBeClosedFirstByBroker) {}
     }
 
@@ -913,6 +914,7 @@
         try {
             String f = stompConnection.receiveFrame();
             assertTrue(f.startsWith("ERROR"));
+            assertFalse("no stack trace impl leak:" + f, f.contains("at "));
         } catch (IOException socketMayBeClosedFirstByBroker) {}
     }
 
@@ -930,6 +932,7 @@
         stompConnection.sendFrame(frame);
         String f = stompConnection.receiveFrame();
         assertTrue(f.startsWith("ERROR"));
+        assertFalse("no stack trace impl leak:" + f, f.contains("at "));
     }
 
     @Test(timeout = 60000)
@@ -946,6 +949,7 @@
         stompConnection.sendFrame(frame);
         frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("ERROR"));
+        assertFalse("no stack trace impl leak:" + frame, frame.contains("at "));
     }
 
     @Test(timeout = 60000)
@@ -964,6 +968,7 @@
         frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("ERROR"));
         assertTrue("Error Frame did not contain receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
+        assertFalse("no stack trace impl leak:" + frame, frame.contains("at "));
     }
 
     @Test(timeout = 60000)
diff --git a/activemq-tooling/activemq-maven-plugin/pom.xml b/activemq-tooling/activemq-maven-plugin/pom.xml
index 5588f8a..7f5cfee 100644
--- a/activemq-tooling/activemq-maven-plugin/pom.xml
+++ b/activemq-tooling/activemq-maven-plugin/pom.xml
@@ -31,13 +31,8 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.maven</groupId>
-      <artifactId>maven-plugin-api</artifactId>
-      <version>2.0</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.maven</groupId>
-      <artifactId>maven-project</artifactId>
-      <version>2.0</version>
+      <artifactId>maven-core</artifactId>
+      <version>${maven-core-version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.activemq</groupId>
@@ -72,6 +67,11 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/activemq-tooling/activemq-memtest-maven-plugin/pom.xml b/activemq-tooling/activemq-memtest-maven-plugin/pom.xml
index e42b37b..346571b 100644
--- a/activemq-tooling/activemq-memtest-maven-plugin/pom.xml
+++ b/activemq-tooling/activemq-memtest-maven-plugin/pom.xml
@@ -33,7 +33,7 @@
     <dependency>
       <groupId>org.apache.maven</groupId>
       <artifactId>maven-plugin-api</artifactId>
-      <version>2.0</version>
+      <version>${maven-core-version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.activemq</groupId>
diff --git a/activemq-tooling/activemq-perf-maven-plugin/pom.xml b/activemq-tooling/activemq-perf-maven-plugin/pom.xml
index fde54a7..9490bde 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/pom.xml
+++ b/activemq-tooling/activemq-perf-maven-plugin/pom.xml
@@ -32,7 +32,7 @@
     <dependency>
       <groupId>org.apache.maven</groupId>
       <artifactId>maven-plugin-api</artifactId>
-      <version>2.0</version>
+      <version>${maven-core-version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.activemq</groupId>
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java
index 8b84798..17ce87f 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java
@@ -57,14 +57,14 @@
                     } else {
                         LOG.info("Adding extension dir: " + f.getAbsolutePath());
 
-                        urls.add(f.toURL());
+                        urls.add(f.toURI().toURL());
 
                         File[] files = f.listFiles();
                         if (files != null) {
                             for (int j = 0; j < files.length; j++) {
                                 if (files[j].getName().endsWith(".zip") || files[j].getName().endsWith(".jar")) {
                                     LOG.info("Adding extension dir: " + files[j].getAbsolutePath());
-                                    urls.add(files[j].toURL());
+                                    urls.add(files[j].toURI().toURL());
                                 }
                             }
                         }
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 6ef398d..d63fed8 100644
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -274,18 +274,6 @@
       <artifactId>apacheds-core-integ</artifactId>
       <version>${directory-version}</version>
       <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>bouncycastle</groupId>
-          <artifactId>bcprov-jdk15</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.bouncycastle</groupId>
-      <artifactId>bcprov-jdk15</artifactId>
-      <version>1.46</version>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.directory.server</groupId>
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 80d45eb..89745a9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -1085,15 +1085,14 @@
             consumerInfos.add(consumerInfo);
         }
 
-        for (ConsumerInfo info : consumerInfos) {
-            connection.send(info);
-        }
-
         Message message = null;
         for (ConsumerInfo info : consumerInfos) {
+            // one by one registration to avoid ordering issue with concurrent dispatch from composite dests broker side
+            connection.request(info);
             for (int i = 0; i < numMessages; i++) {
                 message = receiveMessage(connection);
                 assertNotNull(message);
+                LOG.info("ORIG " + message.getMessageId());
                 connection.send(createAck(info, message, 1, MessageAck.DELIVERED_ACK_TYPE));
             }
             MessageAck ack = createAck(info, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
@@ -1574,6 +1573,7 @@
     protected void configureBroker(BrokerService broker) throws Exception {
         super.configureBroker(broker);
         broker.setKeepDurableSubsActive(keepDurableSubsActive);
+        maxWait = 2000;
     }
 
     public static Test suite() {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
index 0739070..6a061c7 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
@@ -35,8 +35,14 @@
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.DefaultTestAppender;
 import org.apache.activemq.util.ProducerThread;
 import org.apache.activemq.util.Wait;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -201,6 +207,69 @@
 
     @Test
     public void testScheduleRestart() throws Exception {
+        testScheduleRestart(RestartType.NORMAL);
+    }
+
+    @Test
+    public void testScheduleFullRecoveryRestart() throws Exception {
+        testScheduleRestart(RestartType.FULL_RECOVERY);
+    }
+
+    @Test
+    public void testUpdatesAppliedToIndexBeforeJournalShouldBeDiscarded() throws Exception {
+        final int NUMBER_OF_MESSAGES = 1000;
+        final AtomicInteger numberOfDiscardedJobs = new AtomicInteger();
+        final JobSchedulerStoreImpl jobSchedulerStore = (JobSchedulerStoreImpl) broker.getJobSchedulerStore();
+        Location middleLocation = null;
+
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getMessage().toString().contains("Removed Job past last appened in the journal")) {
+                    numberOfDiscardedJobs.incrementAndGet();
+                }
+            }
+        };
+
+        registerLogAppender(appender);
+
+        // send a messages
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+        MessageProducer producer = session.createProducer(destination);
+
+        for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            TextMessage message = session.createTextMessage("test msg");
+            long time = 5000;
+            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+            producer.send(message);
+
+            if (NUMBER_OF_MESSAGES / 2 == i) {
+                middleLocation = jobSchedulerStore.getJournal().getLastAppendLocation();
+            }
+        }
+
+        producer.close();
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        // Simulating the case here updates got applied on the index before the journal updates
+        jobSchedulerStore.getJournal().setLastAppendLocation(middleLocation);
+        jobSchedulerStore.load();
+
+        assertEquals(numberOfDiscardedJobs.get(), NUMBER_OF_MESSAGES / 2);
+    }
+
+    private void registerLogAppender(final Appender appender) {
+        org.apache.log4j.Logger log4jLogger =
+                org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class);
+        log4jLogger.addAppender(appender);
+        log4jLogger.setLevel(Level.TRACE);
+    }
+
+    private void testScheduleRestart(final RestartType restartType) throws Exception {
         // send a message
         Connection connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -213,12 +282,7 @@
         producer.close();
 
         //restart broker
-        broker.stop();
-        broker.waitUntilStopped();
-
-        broker = createBroker(false);
-        broker.start();
-        broker.waitUntilStarted();
+        restartBroker(restartType);
 
         // consume the message
         connection = createConnection();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
index 6f6dc76..b1f3b2e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
@@ -23,6 +23,19 @@
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.log4j.Level;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -44,13 +57,25 @@
 
     private static final transient Logger LOG = LoggerFactory.getLogger(JobSchedulerManagementTest.class);
 
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService brokerService = createBroker(true);
+        if (isPersistent()) {
+            ((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setCleanupInterval(500);
+            ((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setJournalMaxFileLength(100 * 1024);
+        }
+        return brokerService;
+    }
+
     @Test
     public void testRemoveAllScheduled() throws Exception {
-        final int COUNT = 5;
+        org.apache.log4j.Logger.getLogger(Transaction.class).setLevel(Level.DEBUG);
+        final int COUNT = 5000;
+        System.setProperty("maxKahaDBTxSize", "" + (500*1024));
         Connection connection = createConnection();
 
         // Setup the scheduled Message
-        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6), COUNT);
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(180), COUNT);
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
@@ -80,6 +105,10 @@
         latch.await(10, TimeUnit.SECONDS);
         assertEquals(latch.getCount(), COUNT);
 
+        if (isPersistent()) {
+            assertEquals(1, getNumberOfJournalFiles());
+        }
+
         connection.close();
     }
 
@@ -423,4 +452,15 @@
 
         producer.close();
     }
+
+    private int getNumberOfJournalFiles() throws IOException, InterruptedException {
+        Collection<DataFile> files = ((JobSchedulerStoreImpl) broker.getJobSchedulerStore()).getJournal().getFileMap().values();
+        int reality = 0;
+        for (DataFile file : files) {
+            if (file != null) {
+                reality++;
+            }
+        }
+        return reality;
+    }
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
index 5bf8d8c..acfd1ba 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
@@ -41,6 +41,11 @@
 
     @Rule public TestName name = new TestName();
 
+    enum RestartType {
+        NORMAL,
+        FULL_RECOVERY
+    }
+
     protected String connectionUri;
     protected BrokerService broker;
     protected JobScheduler jobScheduler;
@@ -113,4 +118,22 @@
         answer.setUseJmx(isUseJmx());
         return answer;
     }
+
+    protected void restartBroker(RestartType restartType) throws Exception {
+        tearDown();
+
+        if (restartType == RestartType.FULL_RECOVERY)  {
+            File dir = broker.getSchedulerDirectoryFile();
+
+            if (dir != null) {
+                IOHelper.deleteFile(new File(dir, "scheduleDB.data"));
+                IOHelper.deleteFile(new File(dir, "scheduleDB.redo"));
+            }
+        }
+
+        broker = createBroker(false);
+
+        broker.start();
+        broker.waitUntilStarted();
+    }
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7270Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7270Test.java
new file mode 100644
index 0000000..c79a536
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7270Test.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class AMQ7270Test extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ7270Test.class);
+    final int messageCount = 150;
+    final int messageSize = 1024*1024;
+    final int maxPageSize = 50;
+    final ActiveMQQueue activeMQQueue = new ActiveMQQueue("BIG");
+    BrokerService broker;
+    ActiveMQConnectionFactory factory;
+
+    protected void configureBroker() throws Exception {
+        broker.setPersistent(false);
+        broker.setAdvisorySupport(false);
+
+        PolicyMap pMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        // disable expriy processing as this will call browse in parallel
+        policy.setExpireMessagesPeriod(0);
+        policy.setMaxPageSize(maxPageSize);
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+    }
+
+    public void testConcurrentCopyMatchingPageSizeOk() throws Exception {
+
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(activeMQQueue);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        BytesMessage bytesMessage = session.createBytesMessage();
+
+        for (int i = 0; i < messageCount; i++) {
+            bytesMessage.setIntProperty("id", i);
+            producer.send(activeMQQueue, bytesMessage);
+        }
+
+        final QueueViewMBean queueViewMBean = (QueueViewMBean)
+                broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
+
+        LOG.info(queueViewMBean.getName() + " Size: " + queueViewMBean.getEnqueueCount());
+
+        connection.close();
+
+
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+        for (int i=0; i<20; i++) {
+
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+
+                    try {
+                        // only match the last to require pageIn
+                        queueViewMBean.copyMatchingMessagesTo("id=" + (messageCount - 1), "To");
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+
+        executor.shutdown();
+        assertTrue("all work done", executor.awaitTermination(30, TimeUnit.SECONDS));
+
+        final Queue underTest = (Queue) ((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(activeMQQueue);
+        assertEquals("page Size as expected " + underTest, maxPageSize, underTest.getMaxPageSize());
+    }
+
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        broker.setBrokerName("thisOne");
+        configureBroker();
+        broker.start();
+        factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true");
+        factory.setWatchTopicAdvisories(false);
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+            broker = null;
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
new file mode 100644
index 0000000..da96431
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class MKahaDBTxRecoveryTest {
+
+    static final Logger LOG = LoggerFactory.getLogger(MKahaDBTxRecoveryTest.class);
+    private final static int maxFileLength = 1024*1024*32;
+
+    private final static String PREFIX_DESTINATION_NAME = "queue";
+
+    private final static String DESTINATION_NAME = PREFIX_DESTINATION_NAME + ".test";
+    private final static String DESTINATION_NAME_2 = PREFIX_DESTINATION_NAME + "2.test";
+    private final static int CLEANUP_INTERVAL_MILLIS = 500;
+
+    BrokerService broker;
+    private List<KahaDBPersistenceAdapter> kahadbs = new LinkedList<KahaDBPersistenceAdapter>();
+
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(true);
+        broker.setBrokerName("localhost");
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+
+    @Test
+    public void testCommitOutcomeDeliveryOnRecovery() throws Exception {
+
+        prepareBrokerWithMultiStore(true);
+        broker.start();
+        broker.waitUntilStarted();
+
+
+        // Ensure we have an Admin View.
+        assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return (broker.getAdminView()) != null;
+            }
+        }));
+
+
+        final AtomicBoolean injectFailure = new AtomicBoolean(true);
+
+        final AtomicInteger reps = new AtomicInteger();
+        final AtomicReference<TransactionIdTransformer> delegate = new AtomicReference<TransactionIdTransformer>();
+
+        TransactionIdTransformer faultInjector  = new TransactionIdTransformer() {
+            @Override
+            public TransactionId transform(TransactionId txid) {
+                if (injectFailure.get() && reps.incrementAndGet() > 5) {
+                    throw new RuntimeException("Bla");
+                }
+                return delegate.get().transform(txid);
+            }
+        };
+        // set up kahadb to fail after N ops
+        for (KahaDBPersistenceAdapter pa : kahadbs) {
+            if (delegate.get() == null) {
+                delegate.set(pa.getStore().getTransactionIdTransformer());
+            }
+            pa.setTransactionIdTransformer(faultInjector);
+        }
+
+        ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
+        f.setAlwaysSyncSend(true);
+        Connection c = f.createConnection();
+        c.start();
+        Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = s.createProducer(new ActiveMQQueue(DESTINATION_NAME  + "," + DESTINATION_NAME_2));
+        producer.send(s.createTextMessage("HI"));
+        try {
+            s.commit();
+        } catch (Exception expected) {
+            expected.printStackTrace();
+        }
+
+        assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
+        assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
+
+        final Destination destination1 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
+        final Destination destination2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
+
+        assertTrue("Partial commit - one dest has message", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return destination2.getMessageStore().getMessageCount() != destination1.getMessageStore().getMessageCount();
+            }
+        }));
+
+        // check completion on recovery
+        injectFailure.set(false);
+
+        // fire in many more local transactions to use N txStore journal files
+        for (int i=0; i<100; i++) {
+            producer.send(s.createTextMessage("HI"));
+            s.commit();
+        }
+
+        broker.stop();
+
+        // fail recovery processing on first attempt
+        prepareBrokerWithMultiStore(false);
+        broker.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() {
+
+            @Override
+            public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
+                // longer than CleanupInterval
+                TimeUnit.SECONDS.sleep( 2);
+                throw new RuntimeException("Sorry");
+            }
+        }});
+        broker.start();
+
+        // second recovery attempt should sort it
+        broker.stop();
+        prepareBrokerWithMultiStore(false);
+        broker.start();
+        broker.waitUntilStarted();
+
+        // verify commit completed
+        Destination destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
+        assertEquals(101, destination.getMessageStore().getMessageCount());
+
+        destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
+        assertEquals(101, destination.getMessageStore().getMessageCount());
+    }
+
+
+    protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
+        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
+        kaha.setJournalMaxFileLength(maxFileLength);
+        kaha.setCleanupInterval(CLEANUP_INTERVAL_MILLIS);
+        if (delete) {
+            kaha.deleteAllMessages();
+        }
+        kahadbs.add(kaha);
+        return kaha;
+    }
+
+    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
+
+        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        if (deleteAllMessages) {
+            multiKahaDBPersistenceAdapter.deleteAllMessages();
+        }
+        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
+
+        adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, deleteAllMessages));
+        adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME + "2", deleteAllMessages));
+
+        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+        multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4*1024);
+        multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10);
+
+        broker = createBroker(multiKahaDBPersistenceAdapter);
+    }
+
+	private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean deleteAllMessages)
+			throws IOException {
+		FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
+        template.setPersistenceAdapter(createStore(deleteAllMessages));
+        if (destinationPrefix != null) {
+            template.setQueue(destinationPrefix + ".>");
+        }
+		return template;
+	}
+}
\ No newline at end of file
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index 4b8942b..aade6d3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -16,10 +16,12 @@
  */
 package org.apache.activemq.network;
 
+import static junit.framework.TestCase.assertNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -30,6 +32,8 @@
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.broker.region.DestinationStatistics;
@@ -227,5 +231,25 @@
         }, 10000, 500));
     }
 
+    protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, final int count) {
+        assertNotNull(networkBridge);
+        DemandForwardingBridgeSupport bridge = (DemandForwardingBridgeSupport) networkBridge;
+        assertEquals(count, bridge.subscriptionMapByLocalId.size());
+        assertEquals(count, bridge.subscriptionMapByRemoteId.size());
+    }
+
+    protected DemandForwardingBridge findDuplexBridge(final TransportConnector connector) throws Exception {
+        assertNotNull(connector);
+
+        for (TransportConnection tc : connector.getConnections()) {
+            if (tc.getConnectionId().startsWith("networkConnector_")) {
+                final Field bridgeField = TransportConnection.class.getDeclaredField("duplexBridge");
+                bridgeField.setAccessible(true);
+                return (DemandForwardingBridge) bridgeField.get(tc);
+            }
+        }
+
+        return null;
+    }
 
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
index c5899a0..800c103 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.network;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+import java.lang.reflect.Field;
 import java.net.URI;
 
 import javax.jms.JMSException;
@@ -28,15 +30,18 @@
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.util.Wait;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
+import scala.annotation.bridge;
 
 /**
  * This test is to show that if a durable subscription over a network bridge is deleted and
@@ -106,7 +111,7 @@
         });
     }
 
-    public void testReceive(BrokerService receiveBroker, Session receiveSession,
+    protected void testReceive(BrokerService receiveBroker, Session receiveSession,
             BrokerService publishBroker, Session publishSession, ConsumerCreator secondConsumerCreator) throws Exception {
 
         final DestinationStatistics destinationStatistics =
@@ -118,6 +123,17 @@
 
         waitForConsumerCount(destinationStatistics, 1);
 
+        final NetworkBridge bridge;
+        if (publishBroker.getNetworkConnectors().size() > 0) {
+            Wait.waitFor(() -> publishBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, 10000, 500);
+            bridge = publishBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+        } else {
+            bridge = findDuplexBridge(publishBroker.getTransportConnectorByScheme("tcp"));
+        }
+
+        //Should be 2 - one for the durable destination and one for the advisory destinations
+        assertSubscriptionMapCounts(bridge, 2);
+
         //remove the durable
         final ConnectionContext context = new ConnectionContext();
         RemoveSubscriptionInfo info = getRemoveSubscriptionInfo(context, receiveBroker);
@@ -126,6 +142,9 @@
         receiveBroker.getBroker().removeSubscription(context, info);
         waitForConsumerCount(destinationStatistics, 0);
 
+        //Should be 1 - 0 for the durable destination and one for the advisory destinations
+        assertSubscriptionMapCounts(bridge, 1);
+
         //re-create consumer
         MessageConsumer bridgeConsumer2 = secondConsumerCreator.createConsumer();
         waitForConsumerCount(destinationStatistics, 1);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
index 86165f5..fd4ffe7 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
@@ -38,14 +38,14 @@
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.jaas.GroupPrincipal;
 import org.apache.activemq.util.Wait;
+import org.apache.directory.api.ldap.model.name.Dn;
+import org.apache.directory.api.ldap.model.name.Rdn;
+import org.apache.directory.api.ldap.model.ldif.LdifEntry;
+import org.apache.directory.api.ldap.model.ldif.LdifReader;
+import org.apache.directory.api.ldap.model.message.ModifyRequest;
+import org.apache.directory.api.ldap.model.message.ModifyRequestImpl;
 import org.apache.directory.ldap.client.api.LdapConnection;
 import org.apache.directory.server.core.integ.AbstractLdapTestUnit;
-import org.apache.directory.shared.ldap.model.ldif.LdifEntry;
-import org.apache.directory.shared.ldap.model.ldif.LdifReader;
-import org.apache.directory.shared.ldap.model.message.ModifyRequest;
-import org.apache.directory.shared.ldap.model.message.ModifyRequestImpl;
-import org.apache.directory.shared.ldap.model.name.Dn;
-import org.apache.directory.shared.ldap.model.name.Rdn;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationModuleTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationModuleTest.java
index 3f52ed3..b285571 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationModuleTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationModuleTest.java
@@ -18,7 +18,7 @@
 
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.jaas.UserPrincipal;
-import org.apache.directory.shared.ldap.model.message.ModifyRequest;
+import org.apache.directory.api.ldap.model.message.ModifyRequest;
 import org.junit.Test;
 
 import java.util.Set;
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyOpenLDAPTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyOpenLDAPTest.java
index 5c2764a..d6c366d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyOpenLDAPTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyOpenLDAPTest.java
@@ -19,10 +19,10 @@
 import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.directory.api.ldap.model.name.Dn;
+import org.apache.directory.api.ldap.model.exception.LdapException;
 import org.apache.directory.ldap.client.api.LdapConnection;
 import org.apache.directory.ldap.client.api.LdapNetworkConnection;
-import org.apache.directory.shared.ldap.model.exception.LdapException;
-import org.apache.directory.shared.ldap.model.name.Dn;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyTest.java
index f696cb3..aa5f232 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyTest.java
@@ -16,14 +16,14 @@
  */
 package org.apache.activemq.security;
 
+import org.apache.directory.api.ldap.model.name.Dn;
+import org.apache.directory.api.ldap.model.exception.LdapException;
 import org.apache.directory.ldap.client.api.LdapConnection;
 import org.apache.directory.ldap.client.api.LdapNetworkConnection;
 import org.apache.directory.server.annotations.CreateLdapServer;
 import org.apache.directory.server.annotations.CreateTransport;
 import org.apache.directory.server.core.annotations.ApplyLdifFiles;
 import org.apache.directory.server.core.integ.FrameworkRunner;
-import org.apache.directory.shared.ldap.model.exception.LdapException;
-import org.apache.directory.shared.ldap.model.name.Dn;
 import org.junit.runner.RunWith;
 
 import java.io.IOException;
@@ -41,6 +41,7 @@
     protected SimpleCachedLDAPAuthorizationMap createMap() {
         SimpleCachedLDAPAuthorizationMap map = super.createMap();
         map.setConnectionURL("ldap://localhost:" + getLdapServer().getPort());
+        map.setConnectionPassword("secret");
         return map;
     }
     
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleOpenLDAPTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleOpenLDAPTest.java
index 6d0ca93..e1a0db2 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleOpenLDAPTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleOpenLDAPTest.java
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.security;
 
+import org.apache.directory.api.ldap.model.name.Dn;
+import org.apache.directory.api.ldap.model.exception.LdapException;
 import org.apache.directory.ldap.client.api.LdapConnection;
 import org.apache.directory.ldap.client.api.LdapNetworkConnection;
-import org.apache.directory.shared.ldap.model.exception.LdapException;
-import org.apache.directory.shared.ldap.model.name.Dn;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleTest.java
index 5d6f2e7..e2a280b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleTest.java
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.security;
 
+import org.apache.directory.api.ldap.model.name.Dn;
 import org.apache.directory.ldap.client.api.LdapConnection;
 import org.apache.directory.ldap.client.api.LdapNetworkConnection;
 import org.apache.directory.server.annotations.CreateLdapServer;
 import org.apache.directory.server.annotations.CreateTransport;
 import org.apache.directory.server.core.annotations.ApplyLdifFiles;
 import org.apache.directory.server.core.integ.FrameworkRunner;
-import org.apache.directory.shared.ldap.model.name.Dn;
 import org.junit.runner.RunWith;
 
 import java.io.InputStream;
@@ -39,6 +39,7 @@
     protected SimpleCachedLDAPAuthorizationMap createMap() {
         SimpleCachedLDAPAuthorizationMap map = super.createMap();
         map.setConnectionURL("ldap://localhost:" + getLdapServer().getPort());
+        map.setConnectionPassword("secret");
         return map;
     }
     
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/LDAPAuthorizationMapTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/LDAPAuthorizationMapTest.java
index 130a0da..f4fa851 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/LDAPAuthorizationMapTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/LDAPAuthorizationMapTest.java
@@ -65,6 +65,7 @@
         authMap.setQueueSearchMatchingFormat(new MessageFormat("uid={0},ou=queues,ou=destinations,o=ActiveMQ,ou=system"));
         authMap.setAdvisorySearchBase("uid=ActiveMQ.Advisory,ou=topics,ou=destinations,o=ActiveMQ,ou=system");
         authMap.setTempSearchBase("uid=ActiveMQ.Temp,ou=topics,ou=destinations,o=ActiveMQ,ou=system");
+        authMap.setConnectionPassword("secret");
     }
 
     @Test
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/TextFileCertificateLoginModuleTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/TextFileCertificateLoginModuleTest.java
index 76681c6..9ca43c9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/TextFileCertificateLoginModuleTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/TextFileCertificateLoginModuleTest.java
@@ -38,6 +38,7 @@
 
     private static final String CERT_USERS_FILE_SMALL = "cert-users-SMALL.properties";
     private static final String CERT_USERS_FILE_LARGE = "cert-users-LARGE.properties";
+    private static final String CERT_USERS_FILE_REGEXP = "cert-users-REGEXP.properties";
     private static final String CERT_GROUPS_FILE = "cert-groups.properties";
 
     private static final Logger LOG = LoggerFactory.getLogger(TextFileCertificateLoginModuleTest.class);
@@ -76,6 +77,11 @@
         loginTest(CERT_USERS_FILE_LARGE, CERT_GROUPS_FILE);
     }
 
+    @Test
+    public void testLoginWithREGEXPUsersFile() throws Exception {
+        loginTest(CERT_USERS_FILE_REGEXP, CERT_GROUPS_FILE);
+    }
+
     private void loginTest(String usersFiles, String groupsFile) throws LoginException {
 
         HashMap options = new HashMap<String, String>();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/EmptyTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/EmptyTransactionTest.java
new file mode 100644
index 0000000..e4f6196
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/EmptyTransactionTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MessageDatabase;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.experimental.theories.Theories;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class EmptyTransactionTest extends TestCase {
+
+    private static final int CHECKPOINT_INTERVAL = 500;
+    private BrokerService broker;
+
+    public void testEmptyTransactionsCheckpoint() throws Exception {
+
+        AtomicBoolean hadRecovery = new AtomicBoolean(false);
+        DefaultTestAppender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+               if (event.getMessage().toString().contains("Recovering from the journal @")) {
+                   hadRecovery.set(true);
+               }
+            }
+        };
+
+        org.apache.log4j.Logger.getLogger(MessageDatabase.class).addAppender(appender);
+
+        start(true);
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("QueueName"));
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        sendMessage(session, producer);
+
+        // wait checkpoint
+        // When we create a new consumer a KahaProducerAuditCommand written to the journal files changing the lastUpdate pointer
+        Thread.sleep(CHECKPOINT_INTERVAL * 2);
+
+        for (int i = 0; i < 5; i++) {
+            sendMessage(session, producer);
+        }
+
+        restart();
+
+        assertFalse(hadRecovery.get());
+    }
+
+    private void sendMessage(final Session session, final MessageProducer producer) throws JMSException {
+        TextMessage m = session.createTextMessage("Hi");
+        producer.send(m);
+        session.commit();
+    }
+
+    private void restart() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        start(false);
+    }
+
+    private void start(final boolean deleteMessages) throws Exception {
+        broker = new BrokerService();
+        KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter();
+        kahaDB.setCheckpointInterval(CHECKPOINT_INTERVAL);
+        broker.setPersistenceAdapter(kahaDB);
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(deleteMessages);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingDisableConcurrentStoreAndDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingDisableConcurrentStoreAndDispatchTest.java
new file mode 100644
index 0000000..5c7de8e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingDisableConcurrentStoreAndDispatchTest.java
@@ -0,0 +1,18 @@
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+import java.io.File;
+import java.io.IOException;
+
+public class QueueBrowsingDisableConcurrentStoreAndDispatchTest extends QueueBrowsingTest {
+    @Override
+    public BrokerService createBroker() throws IOException {
+        BrokerService broker = super.createBroker();
+        KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+        kahadb.setConcurrentStoreAndDispatchQueues(false);
+        broker.setPersistenceAdapter(kahadb);
+        return broker;
+    }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
index 3e2d067..d2e2e67 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
@@ -201,6 +201,10 @@
             producer.send(session.createTextMessage(data));
         }
 
+        //Consume one message to free memory and allow the cursor to pageIn messages
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.receive(1000);
+
         QueueBrowser browser = session.createBrowser(queue);
         Enumeration<?> enumeration = browser.getEnumeration();
         int received = 0;
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
index 76f54ff..9beb4d6 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.usecases;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -33,9 +34,11 @@
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.DefaultTestAppender;
 import org.apache.activemq.util.Wait;
@@ -57,6 +60,8 @@
     private BrokerService broker;
 
     protected void setUp() throws Exception {
+        produced.set(0);
+        consumed.set(0);
         // Setup and start the broker
         broker = new BrokerService();
         broker.setBrokerName(brokerName);
@@ -202,6 +207,119 @@
         }
     }
 
+
+    public void testTransactedProducerBlockedAndClosedWillRelease() throws Exception {
+        doTestTransactedProducerBlockedAndClosedWillRelease(false);
+    }
+
+    public void testTransactedSyncSendProducerBlockedAndClosedWillRelease() throws Exception {
+        doTestTransactedProducerBlockedAndClosedWillRelease(true);
+    }
+
+    public void doTestTransactedProducerBlockedAndClosedWillRelease(final boolean alwaysSyncSend) throws Exception {
+
+        // Create the connection factory
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
+        connectionFactory.setWatchTopicAdvisories(false);
+        connectionFactory.setAlwaysSyncSend(alwaysSyncSend);
+        Connection c = connectionFactory.createConnection();
+        c.start();
+
+
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setAll(5000);
+        connectionFactory.setPrefetchPolicy(prefetchPolicy);
+        // Start the test destination listener
+        Session listenerSession = c.createSession(false, 1);
+        Destination destination = createDestination(listenerSession);
+
+
+        final AtomicInteger warnings = new AtomicInteger();
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel().equals(Level.WARN) && event.getMessage().toString().contains("Usage Manager memory limit reached")) {
+                    LOG.info("received  log message: " + event.getMessage());
+                    warnings.incrementAndGet();
+                }
+            }
+        };
+        org.apache.log4j.Logger log4jLogger =
+                org.apache.log4j.Logger.getLogger(Topic.class);
+        log4jLogger.addAppender(appender);
+        try {
+
+            // Start producing the test messages
+            final Session session = connectionFactory.createConnection().createSession(true, Session.SESSION_TRANSACTED);
+            final MessageProducer producer = session.createProducer(destination);
+
+            Thread producingThread = new Thread("Producing Thread") {
+                public void run() {
+                    try {
+                        for (long i = 0; i < numMessagesToSend; i++) {
+                            producer.send(session.createTextMessage("test"));
+
+                            long count = produced.incrementAndGet();
+                            if (count % 10000 == 0) {
+                                LOG.info("Produced " + count + " messages");
+                            }
+                        }
+                    } catch (Throwable ex) {
+                        ex.printStackTrace();
+                    } finally {
+                        try {
+                            producer.close();
+                            session.close();
+                        } catch (Exception e) {
+                        }
+                    }
+                }
+            };
+
+            producingThread.start();
+
+
+            assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
+                public boolean isSatisified() throws Exception {
+                    return warnings.get() > 0;
+                }
+            }, 5 * 1000));
+
+
+            LOG.info("Produced: " + produced.get() + ", Warnings:" + warnings.get());
+
+            assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
+                public boolean isSatisified() throws Exception {
+                    return warnings.get() > 0;
+                }
+            }, 5 * 1000));
+
+
+            final long enqueueCountWhenBlocked = broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount();
+
+            // now whack the hung connection broker side (mimic jmx), and verify usage gone b/c of rollback
+            for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections()) {
+                transportConnection.serviceException(new IOException("forcing close for hung connection"));
+            }
+
+            assertTrue("Usage gets released on close", Wait.waitFor(new Wait.Condition() {
+                public boolean isSatisified() throws Exception {
+                    LOG.info("Usage: " + broker.getSystemUsage().getMemoryUsage().getUsage());
+
+                    return broker.getSystemUsage().getMemoryUsage().getUsage() == 0;
+                }
+            }, 5 * 1000));
+
+            c.close();
+
+            // verify no pending sends completed in rolledback tx
+            assertEquals("nothing sent during close", enqueueCountWhenBlocked, broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount());
+
+        } finally {
+            log4jLogger.removeAppender(appender);
+        }
+    }
+
     protected Destination createDestination(Session listenerSession) throws Exception {
         return new ActiveMQTopic("test");
     }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest.java
new file mode 100644
index 0000000..48ff54d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest.java
@@ -0,0 +1,17 @@
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+import java.io.IOException;
+
+public class UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest  extends UsageBlockedDispatchTest {
+    @Override
+    protected BrokerService createBroker() throws IOException {
+        BrokerService broker = new BrokerService();
+        KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+        kahadb.setConcurrentStoreAndDispatchQueues(false);
+        broker.setPersistenceAdapter(kahadb);
+        return broker;
+    }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java
index 7767672..dba73c1 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java
@@ -32,6 +32,7 @@
 
 import javax.jms.*;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -48,14 +49,13 @@
     @Override
     public void setUp() throws Exception {
 
-        broker = new BrokerService();
+        broker = createBroker();
         broker.setDataDirectory("target" + File.separator + "activemq-data");
         broker.setPersistent(true);
         broker.setUseJmx(true);
         broker.setAdvisorySupport(false);
         broker.setDeleteAllMessagesOnStartup(true);
 
-        setDefaultPersistenceAdapter(broker);
         SystemUsage sysUsage = broker.getSystemUsage();
         sysUsage.getMemoryUsage().setLimit(100*1024);
 
@@ -75,6 +75,12 @@
         connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
     }
 
+    protected BrokerService createBroker() throws IOException {
+        BrokerService broker = new BrokerService();
+        setDefaultPersistenceAdapter(broker);
+        return broker;
+    }
+
     @Override
     public void tearDown() throws Exception {
         if (broker != null) {
@@ -125,8 +131,7 @@
         Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = consumerSession.createConsumer(willGetAPage);
 
-        Message m = consumer.receive(messageReceiveTimeout);
-        assertNotNull("got a message", m);
+        consumer.receive(messageReceiveTimeout);
 
         final AtomicBoolean gotExpectedLogEvent = new AtomicBoolean(false);
         Appender appender = new DefaultTestAppender() {
@@ -144,7 +149,7 @@
 
             MessageConsumer noDispatchConsumer = consumerSession.createConsumer(shouldBeStuckForDispatch);
 
-            m = noDispatchConsumer.receive(messageReceiveTimeout);
+            Message m = noDispatchConsumer.receive(messageReceiveTimeout);
             assertNull("did not get a message", m);
 
             assertTrue("Got the new warning about the blocked cursor", gotExpectedLogEvent.get());
diff --git a/activemq-unit-tests/src/test/resources/cert-users-REGEXP.properties b/activemq-unit-tests/src/test/resources/cert-users-REGEXP.properties
new file mode 100644
index 0000000..c422738
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/cert-users-REGEXP.properties
@@ -0,0 +1,19 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+CNODD=/DN=TEST_USER_\\d*[13579]/
+CNEVEN=/DN=TEST_USER_\\d*[02468]/
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds-legacy.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds-legacy.xml
index 911acba..a2e1ea3 100644
--- a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds-legacy.xml
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds-legacy.xml
@@ -39,7 +39,7 @@
 
           <authorizationPlugin>
               <map>
-                <cachedLDAPAuthorizationMap connectionURL="ldap://localhost:${ldapPort}"/>
+                <cachedLDAPAuthorizationMap connectionURL="ldap://localhost:${ldapPort}" connectionPassword="secret" />
               </map>
           </authorizationPlugin>
       </plugins>
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml
index 67768c1..f684ee1 100644
--- a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml
@@ -39,7 +39,7 @@
 
           <authorizationPlugin>
               <map>
-                <cachedLDAPAuthorizationMap legacyGroupMapping="false" connectionURL="ldap://localhost:${ldapPort}" groupClass="org.apache.activemq.jaas.GroupPrincipal"/>
+                <cachedLDAPAuthorizationMap legacyGroupMapping="false" connectionURL="ldap://localhost:${ldapPort}" groupClass="org.apache.activemq.jaas.GroupPrincipal" connectionPassword="secret" />
               </map>
           </authorizationPlugin>
       </plugins>
diff --git a/activemq-web-console/src/main/java/org/apache/activemq/web/config/OsgiConfiguration.java b/activemq-web-console/src/main/java/org/apache/activemq/web/config/OsgiConfiguration.java
index 276f3be..9b67be7 100644
--- a/activemq-web-console/src/main/java/org/apache/activemq/web/config/OsgiConfiguration.java
+++ b/activemq-web-console/src/main/java/org/apache/activemq/web/config/OsgiConfiguration.java
@@ -34,12 +34,12 @@
     private ServiceRegistration service;
 
     private String jmxUrl = "service:jmx:rmi:///jndi/rmi://localhost:1099/karaf-root";
-    private String jmxUser = "karaf";
-    private String jmxPassword = "karaf";
+    private String jmxUser;
+    private String jmxPassword;
 
     private String jmsUrl = "tcp://localhost:61616";
-    private String jmsUser = "karaf";
-    private String jmsPassword = "karaf";
+    private String jmsUser;
+    private String jmsPassword;
 
     public OsgiConfiguration() {
 
diff --git a/activemq-web-console/src/main/webapp/login.html b/activemq-web-console/src/main/webapp/login.html
index 393bcb8..2801e4a 100644
--- a/activemq-web-console/src/main/webapp/login.html
+++ b/activemq-web-console/src/main/webapp/login.html
@@ -105,7 +105,7 @@
             <div class="footer_l">
                 <div class="footer_r">
                     <div>
-                        Copyright 2005-2013 The Apache Software Foundation.
+                        Copyright 2005-2019 The Apache Software Foundation.
 
                         (<a href="?printable=true">printable version</a>)
                     </div>
diff --git a/activemq-web/pom.xml b/activemq-web/pom.xml
index 30ea042..5380de8 100644
--- a/activemq-web/pom.xml
+++ b/activemq-web/pom.xml
@@ -90,16 +90,12 @@
 
     <!-- Rome RSS Reader -->
     <dependency>
-      <groupId>rome</groupId>
+      <groupId>com.rometools</groupId>
       <artifactId>rome</artifactId>
     </dependency>
 
     <!-- XML -->
     <dependency>
-      <groupId>jdom</groupId>
-      <artifactId>jdom</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.xbean</groupId>
       <artifactId>xbean-spring</artifactId>
     </dependency>
@@ -182,4 +178,4 @@
       </build>
     </profile>
   </profiles>
-</project>
\ No newline at end of file
+</project>
diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
index 1b8897c..38546d7 100644
--- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
+++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
@@ -66,7 +66,7 @@
     private long defaultReadTimeout = -1;
     private long maximumReadTimeout = 20000;
     private long requestTimeout = 1000;
-    private String defaultContentType = "application/xml";
+    private String defaultContentType;
 
     private final HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
     private final HashSet<MessageAvailableConsumer> activeConsumers = new HashSet<MessageAvailableConsumer>();
@@ -285,15 +285,16 @@
             response.setHeader("Pragma", "no-cache"); // HTTP 1.0
             response.setDateHeader("Expires", 0);
 
-
             // Set content type as in request. This should be done before calling getWriter by specification
-            String type = request.getContentType();
+            String type = getContentType(request);
 
             if (type != null) {
                 response.setContentType(type);
             } else {
-                if (isXmlContent(message)) {
+                if (defaultContentType != null) {
                     response.setContentType(defaultContentType);
+                } else if (isXmlContent(message)) {
+                    response.setContentType("application/xml");
                 } else {
                     response.setContentType("text/plain");
                 }
diff --git a/activemq-web/src/main/java/org/apache/activemq/web/view/RssMessageRenderer.java b/activemq-web/src/main/java/org/apache/activemq/web/view/RssMessageRenderer.java
index d677e93..21b7a2e 100644
--- a/activemq-web/src/main/java/org/apache/activemq/web/view/RssMessageRenderer.java
+++ b/activemq-web/src/main/java/org/apache/activemq/web/view/RssMessageRenderer.java
@@ -29,14 +29,14 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import com.sun.syndication.feed.synd.SyndContent;
-import com.sun.syndication.feed.synd.SyndContentImpl;
-import com.sun.syndication.feed.synd.SyndEntry;
-import com.sun.syndication.feed.synd.SyndEntryImpl;
-import com.sun.syndication.feed.synd.SyndFeed;
-import com.sun.syndication.feed.synd.SyndFeedImpl;
-import com.sun.syndication.io.FeedException;
-import com.sun.syndication.io.SyndFeedOutput;
+import com.rometools.rome.feed.synd.SyndContent;
+import com.rometools.rome.feed.synd.SyndContentImpl;
+import com.rometools.rome.feed.synd.SyndEntry;
+import com.rometools.rome.feed.synd.SyndEntryImpl;
+import com.rometools.rome.feed.synd.SyndFeed;
+import com.rometools.rome.feed.synd.SyndFeedImpl;
+import com.rometools.rome.io.FeedException;
+import com.rometools.rome.io.SyndFeedOutput;
 
 /**
  * This renderer uses XStream to render messages on a queue as full XML elements
@@ -53,7 +53,6 @@
 
     public void renderMessage(PrintWriter writer, HttpServletRequest request, HttpServletResponse response, QueueBrowser browser, Message message) throws JMSException {
         SyndFeed feed = getFeed(browser, request);
-
         List<SyndEntry> entries = feed.getEntries();
         SyndEntry entry = createEntry(browser, message, request);
         SyndContent description = createEntryContent(browser, message, request);
diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml
index 8c707c8..d9bf1de 100644
--- a/assembly/src/main/descriptors/common-bin.xml
+++ b/assembly/src/main/descriptors/common-bin.xml
@@ -283,8 +283,9 @@
         <include>org.eclipse.jetty.orbit:org.eclipse.jdt.core</include>
 
         <!-- Atom/RSS support -->
-        <include>rome:rome</include>
-        <include>jdom:jdom</include>
+        <include>com.rometools:rome</include>
+        <include>com.rometools:rome-utils</include>
+        <include>org.jdom:jdom2</include>
 
         <!-- REST API -->
         <include>org.jolokia:jolokia-core</include>
diff --git a/assembly/src/release/conf/jetty.xml b/assembly/src/release/conf/jetty.xml
index 95f83d1..0e44442 100644
--- a/assembly/src/release/conf/jetty.xml
+++ b/assembly/src/release/conf/jetty.xml
@@ -54,6 +54,16 @@
                   <property name="name" value="X-FRAME-OPTIONS"/>
                   <property name="value" value="SAMEORIGIN"/>
                 </bean>
+                <bean id="header" class="org.eclipse.jetty.rewrite.handler.HeaderPatternRule">
+                  <property name="pattern" value="*"/>
+                  <property name="name" value="X-XSS-Protection"/>
+                  <property name="value" value="1; mode=block"/>
+                </bean>
+                <bean id="header" class="org.eclipse.jetty.rewrite.handler.HeaderPatternRule">
+                  <property name="pattern" value="*"/>
+                  <property name="name" value="X-Content-Type-Options"/>
+                  <property name="value" value="nosniff"/>
+                </bean>
             </list>
         </property>
     </bean>
@@ -172,4 +182,4 @@
     </bean>
     
     
-</beans>
\ No newline at end of file
+</beans>
diff --git a/pom.xml b/pom.xml
index 4bdff14..7a3e65b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,40 +47,40 @@
     <aries-version>1.1.0</aries-version>
     <aries-transaction-version>1.1.1</aries-transaction-version>
     <axion-version>1.0-M3-dev</axion-version>
-    <camel-version>2.22.0</camel-version>
+    <camel-version>2.24.1</camel-version>
     <camel-version-range>[2.20,3)</camel-version-range>
     <cglib-version>2.2</cglib-version>
-    <commons-beanutils-version>1.9.3</commons-beanutils-version>
+    <commons-beanutils-version>1.9.4</commons-beanutils-version>
     <commons-collections-version>3.2.2</commons-collections-version>
     <commons-daemon-version>1.0.15</commons-daemon-version>
-    <commons-dbcp2-version>2.1.1</commons-dbcp2-version>
+    <commons-dbcp2-version>2.6.0</commons-dbcp2-version>
     <commons-io-version>2.6</commons-io-version>
     <commons-lang-version>2.6</commons-lang-version>
     <commons-logging-version>1.2</commons-logging-version>
-    <commons-pool2-version>2.4.2</commons-pool2-version>
+    <commons-pool2-version>2.6.2</commons-pool2-version>
     <commons-primitives-version>1.0</commons-primitives-version>
     <commons-net-version>3.6</commons-net-version>
-    <directory-version>2.0.0-M6</directory-version>
-    <ftpserver-version>1.0.6</ftpserver-version>
+    <directory-version>2.0.0.AM25</directory-version>
+    <ftpserver-version>1.1.1</ftpserver-version>
     <geronimo-version>1.0</geronimo-version>
-    <guava-version>18.0</guava-version>
+    <guava-version>28.0-jre</guava-version>
     <hadoop-version>1.0.4</hadoop-version>
     <hawtbuf-version>1.11</hawtbuf-version>
     <hawtdispatch-version>1.22</hawtdispatch-version>
     <howl-version>0.1.8</howl-version>
     <hsqldb-version>1.8.0.12</hsqldb-version>
-    <httpclient-version>4.5.6</httpclient-version>
-    <httpcore-version>4.4.10</httpcore-version>
+    <httpclient-version>4.5.9</httpclient-version>
+    <httpcore-version>4.4.11</httpcore-version>
     <insight-version>1.2.0.Beta4</insight-version>
-    <jackson-version>2.9.8</jackson-version>
+    <jackson-version>2.9.9</jackson-version>
+    <jackson-databind-version>2.9.9.3</jackson-databind-version>
     <jasypt-version>1.9.2</jasypt-version>
     <jaxb-bundle-version>2.2.11_1</jaxb-bundle-version>
-    <jdom-version>1.0</jdom-version>
     <jetty9-version>9.2.26.v20180806</jetty9-version>
     <jetty-version>${jetty9-version}</jetty-version>
     <jmdns-version>3.4.1</jmdns-version>
     <tomcat-api-version>8.0.53</tomcat-api-version>
-    <jettison-version>1.3.8</jettison-version>
+    <jettison-version>1.4.0</jettison-version>
     <jmock-version>2.5.1</jmock-version>
     <jolokia-version>1.6.0</jolokia-version>
     <josql-version>1.5_5</josql-version>
@@ -89,13 +89,13 @@
     <junit-version>4.12</junit-version>
     <hamcrest-version>1.3</hamcrest-version>
     <jxta-version>2.0</jxta-version>
-    <karaf-version>4.2.3</karaf-version>
+    <karaf-version>4.2.6</karaf-version>
     <leveldb-api-version>0.9</leveldb-api-version>
     <leveldb-version>0.9</leveldb-version>
     <leveldbjni-version>1.8</leveldbjni-version>
     <log4j-version>1.2.17</log4j-version>
     <mockito-version>1.10.19</mockito-version>
-    <owasp-dependency-check-version>3.3.0</owasp-dependency-check-version>
+    <owasp-dependency-check-version>4.0.2</owasp-dependency-check-version>
     <powermock-version>1.6.5</powermock-version>
     <mqtt-client-version>1.15</mqtt-client-version>
     <openjpa-version>1.2.0</openjpa-version>
@@ -103,23 +103,23 @@
     <org.osgi.core-version>4.3.1</org.osgi.core-version>
     <p2psockets-version>1.1.2</p2psockets-version>
     <linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
-    <zookeeper-version>3.4.6</zookeeper-version>
-    <qpid-proton-version>0.32.0</qpid-proton-version>
-    <qpid-jms-version>0.41.0</qpid-jms-version>
-    <qpid-jms-netty-version>4.1.34.Final</qpid-jms-netty-version>
-    <qpid-jms-proton-version>0.32.0</qpid-jms-proton-version>
-    <netty-all-version>4.1.34.Final</netty-all-version>
+    <zookeeper-version>3.4.14</zookeeper-version>
+    <qpid-proton-version>0.33.2</qpid-proton-version>
+    <qpid-jms-version>0.44.0</qpid-jms-version>
+    <qpid-jms-netty-version>4.1.37.Final</qpid-jms-netty-version>
+    <qpid-jms-proton-version>0.33.2</qpid-jms-proton-version>
+    <netty-all-version>4.1.37.Final</netty-all-version>
     <regexp-version>1.3</regexp-version>
-    <rome-version>1.0</rome-version>
+    <rome-version>1.12.1</rome-version>
     <saxon-version>9.5.1-5</saxon-version>
     <saxon-bundle-version>9.5.1-5_1</saxon-bundle-version>
     <scala-plugin-version>3.1.0</scala-plugin-version>
     <scala-version>2.11.0</scala-version>
-    <shiro-version>1.2.6</shiro-version>
+    <shiro-version>1.4.1</shiro-version>
     <scalatest-version>2.1.5</scalatest-version>
     <slf4j-version>1.7.25</slf4j-version>
     <snappy-version>1.1.2</snappy-version>
-    <spring-version>4.3.18.RELEASE</spring-version>
+    <spring-version>4.3.24.RELEASE</spring-version>
     <stax2-api-version>3.0.2</stax2-api-version>
     <taglibs-version>1.2.5</taglibs-version>
     <velocity-version>1.7</velocity-version>
@@ -129,8 +129,8 @@
     <xmlbeans-bundle-version>2.6.0_2</xmlbeans-bundle-version>
     <xmlresolver-bundle-version>1.2_5</xmlresolver-bundle-version>
     <xpp3-version>1.1.4c</xpp3-version>
-    <xstream-version>1.4.10</xstream-version>
-    <xbean-version>4.2</xbean-version>
+    <xstream-version>1.4.11.1</xstream-version>
+    <xbean-version>4.14</xbean-version>
     <xerces-version>2.12.0</xerces-version>
     <jaxb-basics-version>0.6.4</jaxb-basics-version>
     <stompjms-version>1.19</stompjms-version>
@@ -138,7 +138,7 @@
     <pax-exam-version>4.9.1</pax-exam-version>
     <paxexam-karaf-container-version>1.0.0</paxexam-karaf-container-version>
     <pax-runner-version>1.8.6</pax-runner-version>
-    <pax-url-version>2.4.3</pax-url-version>
+    <pax-url-version>2.6.1</pax-url-version>
     <felix-configadmin-version>1.8.0</felix-configadmin-version>
     <felix-framework-version>5.0.1</felix-framework-version>
 
@@ -151,7 +151,7 @@
 
     <!-- Maven Plugin Version for this Project -->
     <maven-bundle-plugin-version>2.3.7</maven-bundle-plugin-version>
-    <maven-surefire-plugin-version>2.16</maven-surefire-plugin-version>
+    <maven-surefire-plugin-version>2.22.1</maven-surefire-plugin-version>
     <maven-antrun-plugin-version>1.3</maven-antrun-plugin-version>
     <maven-assembly-plugin-version>2.4</maven-assembly-plugin-version>
     <maven-release-plugin-version>2.4.1</maven-release-plugin-version>
@@ -175,6 +175,8 @@
     <maven-dependency-plugin-version>2.8</maven-dependency-plugin-version>
     <maven-project-info-reports-plugin-version>2.7</maven-project-info-reports-plugin-version>
     <maven-graph-plugin-version>1.30</maven-graph-plugin-version>
+    <maven-plugin-plugin-version>3.6.0</maven-plugin-plugin-version>
+    <maven-core-version>3.6.1</maven-core-version>
     <!-- OSGi bundles properties -->
     <activemq.osgi.import.pkg>*</activemq.osgi.import.pkg>
     <activemq.osgi.export.pkg>org.apache.activemq*</activemq.osgi.export.pkg>
@@ -672,7 +674,7 @@
       <dependency>
         <groupId>com.fasterxml.jackson.core</groupId>
         <artifactId>jackson-databind</artifactId>
-        <version>${jackson-version}</version>
+        <version>${jackson-databind-version}</version>
       </dependency>
 
       <!-- Used to configure the activemq logs -->
@@ -1082,16 +1084,11 @@
 
       <!-- ACTIVEMQ-WEB Specific Dependencies -->
       <dependency>
-        <groupId>rome</groupId>
+        <groupId>com.rometools</groupId>
         <artifactId>rome</artifactId>
         <version>${rome-version}</version>
       </dependency>
       <dependency>
-        <groupId>jdom</groupId>
-        <artifactId>jdom</artifactId>
-        <version>${jdom-version}</version>
-      </dependency>
-      <dependency>
         <groupId>org.fusesource.mqtt-client</groupId>
         <artifactId>mqtt-client</artifactId>
         <version>${mqtt-client-version}</version>
@@ -1287,6 +1284,11 @@
           <artifactId>maven-archiver</artifactId>
           <version>${maven-archiver-version}</version>
         </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-plugin-plugin</artifactId>
+          <version>${maven-plugin-plugin-version}</version>
+        </plugin>
         <!--This plugin's configuration is used to store Eclipse m2e settings only.
             It has no influence on the Maven build itself.-->
         <plugin>