Doing release candidate 4

git-svn-id: https://svn.apache.org/repos/asf/activemq/tags/activemq-5.0.0@602137 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml
index d787adb..c031ed6 100755
--- a/activemq-core/pom.xml
+++ b/activemq-core/pom.xml
@@ -37,6 +37,7 @@
     <activemq.osgi.import.pkg>
       javax.jmdns*;resolution:=optional,
       org.apache.activeio*;resolution:=optional,
+      org.apache.camel*;resolution:=optional,
       org.apache.commons.pool*;resolution:=optional,
       org.apache.derby*;resolution:=optional,
       org.apache.tools.ant*;resolution:=optional,
@@ -403,6 +404,11 @@
             <exclude>**/nio/**</exclude>
              <!-- A test used for memory profiling only. -->
              <exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
+             
+             <exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
+             
+             <exclude>**/amq1490/*</exclude>
+             <exclude>**/archive/*</exclude>
 
             <exclude>**/AMQDeadlockTest3.*</exclude>
           </excludes>
diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
index faf39bc..eba6818 100755
--- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
+++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
@@ -33,8 +33,8 @@
  */
 public class ActiveMQMessageAudit {
 
-    private static final int DEFAULT_WINDOW_SIZE = 1024;
-    private static final int MAXIMUM_PRODUCER_COUNT = 128;
+    private static final int DEFAULT_WINDOW_SIZE = 2048;
+    private static final int MAXIMUM_PRODUCER_COUNT = 64;
     private int auditDepth;
     private int maximumNumberOfProducersToTrack;
     private LRUCache<Object, BitArrayBin> map;
@@ -220,23 +220,33 @@
     
     /**
      * Check the MessageId is in order
+     * @param message 
+     * @return
+     */
+    public synchronized boolean isInOrder(final MessageReference message) {
+        return isInOrder(message.getMessageId());
+    }
+    
+    /**
+     * Check the MessageId is in order
      * @param id
      * @return
      */
     public synchronized boolean isInOrder(final MessageId id) {
-        boolean answer = true;
-        
+        boolean answer = false;
+
         if (id != null) {
             ProducerId pid = id.getProducerId();
             if (pid != null) {
                 BitArrayBin bab = map.get(pid);
-                if (bab != null) {
-                    answer = bab.isInOrder(id.getProducerSequenceId());
+                if (bab == null) {
+                    bab = new BitArrayBin(auditDepth);
+                    map.put(pid, bab);
                 }
-               
+                answer = bab.isInOrder(id.getProducerSequenceId());
+
             }
         }
         return answer;
     }
-
 }
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
index 0e4477a..83b5a65 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -113,7 +113,7 @@
     private TaskRunnerFactory persistenceTaskRunnerFactory;
     private SystemUsage systemUsage;
     private SystemUsage producerSystemUsage;
-    private SystemUsage consumerSystemUsage;
+    private SystemUsage consumerSystemUsaage;
     private PersistenceAdapter persistenceAdapter;
     private PersistenceAdapterFactory persistenceFactory;
     private DestinationFactory destinationFactory;
@@ -668,23 +668,23 @@
      * @throws IOException 
      */
     public SystemUsage getConsumerSystemUsage() throws IOException {
-        if (consumerSystemUsage == null) {
-            consumerSystemUsage = new SystemUsage(getSystemUsage(), "Consumer");
-            consumerSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
-            addService(consumerSystemUsage);
+        if (this.consumerSystemUsaage == null) {
+            this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
+            this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(0.5f);
+            addService(this.consumerSystemUsaage);
         }
-        return consumerSystemUsage;
+        return this.consumerSystemUsaage;
     }
 
     /**
-     * @param consumerUsageManager the consumerUsageManager to set
+     * @param consumerSystemUsaage the storeSystemUsage to set
      */
-    public void setConsumerSystemUsage(SystemUsage consumerUsageManager) {
-        if (this.consumerSystemUsage != null) {
-            removeService(this.consumerSystemUsage);
+    public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
+        if (this.consumerSystemUsaage != null) {
+            removeService(this.consumerSystemUsaage);
         }
-        this.consumerSystemUsage = consumerUsageManager;
-        addService(this.producerSystemUsage);
+        this.consumerSystemUsaage = consumerSystemUsaage;
+        addService(this.consumerSystemUsaage);
     }
 
     /**
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 539d9b4..3917e47 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -56,7 +56,7 @@
     protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
     protected final DestinationMap destinationMap = new DestinationMap();
     protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
-    protected final SystemUsage memoryManager;
+    protected final SystemUsage usageManager;
     protected final DestinationFactory destinationFactory;
     protected final DestinationStatistics destinationStatistics;
     protected final RegionBroker broker;
@@ -73,7 +73,7 @@
         }
         this.broker = broker;
         this.destinationStatistics = destinationStatistics;
-        this.memoryManager = memoryManager;
+        this.usageManager = memoryManager;
         this.taskRunnerFactory = taskRunnerFactory;
         if (broker == null) {
             throw new IllegalArgumentException("null destinationFactory");
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 673aad1..a9ac0b7 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -17,7 +17,9 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.jms.InvalidSelectorException;
 import org.apache.activemq.broker.Broker;
@@ -38,19 +40,18 @@
 
 public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
 
-    private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
+    private static final Log LOG = LogFactory.getLog(DurableTopicSubscription.class);
     private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
     private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
     private final SubscriptionKey subscriptionKey;
     private final boolean keepDurableSubsActive;
-    private final SystemUsage usageManager;
     private boolean active;
 
     public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
         throws InvalidSelectorException {
-        super(broker, context, info);
+        super(broker,usageManager, context, info);
         this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this);
-        this.usageManager = usageManager;
+        this.pending.setSystemUsage(usageManager);
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
     }
@@ -119,9 +120,8 @@
                 topic.deactivate(context, this);
             }
         }
-        for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
+        for (final MessageReference node : dispatched) {
             // Mark the dispatched messages as redelivered for next time.
-            MessageReference node = (MessageReference)iter.next();
             Integer count = redeliveredMessages.get(node.getMessageId());
             if (count != null) {
                 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
@@ -135,8 +135,8 @@
             } else {
                 node.decrementReferenceCount();
             }
-            iter.remove();
         }
+        dispatched.clear();
         if (!keepDurableSubsActive && pending.isTransient()) {
             synchronized (pending) {
                 try {
@@ -191,7 +191,7 @@
         return active;
     }
 
-    protected synchronized void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
+    protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
         node.getRegionDestination().acknowledge(context, this, ack, node);
         redeliveredMessages.remove(node.getMessageId());
         node.decrementReferenceCount();
@@ -238,7 +238,7 @@
     }
 
     /**
-     * @param memoryManager
+     * @param usageManager
      * @param oldPercentUsage
      * @param newPercentUsage
      * @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage,
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 8cc2761..8fcd668 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -17,12 +17,15 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -38,6 +41,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -50,19 +54,25 @@
 
     private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
     protected PendingMessageCursor pending;
-    protected final LinkedList<MessageReference> dispatched = new LinkedList<MessageReference>();
+    protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
     protected int prefetchExtension;
     protected long enqueueCounter;
     protected long dispatchCounter;
     protected long dequeueCounter;
+    protected boolean optimizedDispatch=true;
+    private int maxProducersToAudit=32;
+    private int maxAuditDepth=2048;
+    protected final SystemUsage usageManager;
+    protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
 
-    public PrefetchSubscription(Broker broker, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
+    public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
         super(broker, context, info);
+        this.usageManager=usageManager;
         pending = cursor;
     }
 
-    public PrefetchSubscription(Broker broker, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
-        this(broker, context, info, new VMPendingMessageCursor());
+    public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+        this(broker,usageManager,context, info, new VMPendingMessageCursor());
     }
 
     /**
@@ -118,8 +128,7 @@
         boolean pendingEmpty = false;
         pendingEmpty = pending.isEmpty();
         enqueueCounter++;
-
-        if (!isFull() && pendingEmpty && !isSlave()) {
+        if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) {
             dispatch(node);
         } else {
             optimizePrefetch();
@@ -128,6 +137,7 @@
                     LOG.debug("Prefetch limit.");
                 }
                 pending.addMessageLast(node);
+                dispatchMatched();
             }
         }
     }
@@ -140,7 +150,7 @@
                 if (node.getMessageId().equals(mdn.getMessageId())) {
                     pending.remove();
                     createMessageDispatch(node, node.getMessage());
-                    dispatched.addLast(node);
+                    dispatched.add(node);
                     return;
                 }
             }
@@ -150,7 +160,8 @@
         throw new JMSException("Slave broker out of sync with master: Dispatched message (" + mdn.getMessageId() + ") was not in the pending list");
     }
 
-    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
+    public synchronized void acknowledge(final ConnectionContext context,
+            final MessageAck ack) throws Exception {
         // Handle the standard acknowledgment case.
         boolean callDispatchMatched = false;
         if (ack.isStandardAck()) {
@@ -158,36 +169,42 @@
             // acknowledgment.
             int index = 0;
             boolean inAckRange = false;
-            for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
-                final MessageReference node = iter.next();
+            List<MessageReference> removeList = new ArrayList<MessageReference>();
+            for (final MessageReference node : dispatched) {
                 MessageId messageId = node.getMessageId();
-                if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
+                if (ack.getFirstMessageId() == null
+                        || ack.getFirstMessageId().equals(messageId)) {
                     inAckRange = true;
                 }
                 if (inAckRange) {
                     // Don't remove the nodes until we are committed.
                     if (!context.isInTransaction()) {
                         dequeueCounter++;
-                        node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                        iter.remove();
+                        node.getRegionDestination().getDestinationStatistics()
+                                .getDequeues().increment();
+                        removeList.add(node);
                     } else {
                         // setup a Synchronization to remove nodes from the
                         // dispatched list.
-                        context.getTransaction().addSynchronization(new Synchronization() {
+                        context.getTransaction().addSynchronization(
+                                new Synchronization() {
 
-                            public void afterCommit() throws Exception {
-                                synchronized (PrefetchSubscription.this) {
-                                    dequeueCounter++;
-                                    dispatched.remove(node);
-                                    node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                                    prefetchExtension--;
-                                }
-                            }
+                                    public void afterCommit() throws Exception {
+                                        synchronized (PrefetchSubscription.this) {
+                                            dequeueCounter++;
+                                            dispatched.remove(node);
+                                            node.getRegionDestination()
+                                                    .getDestinationStatistics()
+                                                    .getDequeues().increment();
+                                            prefetchExtension--;
+                                        }
+                                    }
 
-                            public void afterRollback() throws Exception {
-                                super.afterRollback();
-                            }
-                        });
+                                    public void afterRollback()
+                                            throws Exception {
+                                        super.afterRollback();
+                                    }
+                                });
                     }
                     index++;
                     acknowledge(context, ack, node);
@@ -196,21 +213,28 @@
                             // extend prefetch window only if not a pulling
                             // consumer
                             if (getPrefetchSize() != 0) {
-                                prefetchExtension = Math.max(prefetchExtension, index + 1);
+                                prefetchExtension = Math.max(prefetchExtension,
+                                        index + 1);
                             }
                         } else {
-                            prefetchExtension = Math.max(0, prefetchExtension - (index + 1));
+                            prefetchExtension = Math.max(0, prefetchExtension
+                                    - (index + 1));
                         }
                         callDispatchMatched = true;
                         break;
                     }
                 }
             }
+            for (final MessageReference node : removeList) {
+                dispatched.remove(node);
+            }
             // this only happens after a reconnect - get an ack which is not
             // valid
             if (!callDispatchMatched) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Could not correlate acknowledgment with dispatched message: " + ack);
+                    LOG
+                            .debug("Could not correlate acknowledgment with dispatched message: "
+                                    + ack);
                 }
             }
         } else if (ack.isDeliveredAck()) {
@@ -219,7 +243,8 @@
             // Acknowledge all dispatched messages up till the message id of the
             // acknowledgment.
             int index = 0;
-            for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
+            for (Iterator<MessageReference> iter = dispatched.iterator(); iter
+                    .hasNext(); index++) {
                 final MessageReference node = iter.next();
                 if (ack.getLastMessageId().equals(node.getMessageId())) {
                     prefetchExtension = Math.max(prefetchExtension, index + 1);
@@ -228,17 +253,20 @@
                 }
             }
             if (!callDispatchMatched) {
-                throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
+                throw new JMSException(
+                        "Could not correlate acknowledgment with dispatched message: "
+                                + ack);
             }
-        } else if (ack.isRedeliveredAck() ) {
-            // Message was re-delivered but it was not yet considered to be a DLQ message.
+        } else if (ack.isRedeliveredAck()) {
+            // Message was re-delivered but it was not yet considered to be a
+            // DLQ message.
             // Acknowledge all dispatched messages up till the message id of the
             // acknowledgment.
             boolean inAckRange = false;
-            for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
-                final MessageReference node = iter.next();
+            for (final MessageReference node : dispatched) {
                 MessageId messageId = node.getMessageId();
-                if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
+                if (ack.getFirstMessageId() == null
+                        || ack.getFirstMessageId().equals(messageId)) {
                     inAckRange = true;
                 }
                 if (inAckRange) {
@@ -250,49 +278,65 @@
                 }
             }
             if (!callDispatchMatched) {
-                throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
+                throw new JMSException(
+                        "Could not correlate acknowledgment with dispatched message: "
+                                + ack);
             }
         } else if (ack.isPoisonAck()) {
             // TODO: what if the message is already in a DLQ???
             // Handle the poison ACK case: we need to send the message to a DLQ
             if (ack.isInTransaction()) {
-                throw new JMSException("Poison ack cannot be transacted: " + ack);
+                throw new JMSException("Poison ack cannot be transacted: "
+                        + ack);
             }
             // Acknowledge all dispatched messages up till the message id of the
             // acknowledgment.
             int index = 0;
             boolean inAckRange = false;
-            for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
-                final MessageReference node = iter.next();
+            List<MessageReference> removeList = new ArrayList<MessageReference>();
+            for (final MessageReference node : dispatched) {
                 MessageId messageId = node.getMessageId();
-                if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
+                if (ack.getFirstMessageId() == null
+                        || ack.getFirstMessageId().equals(messageId)) {
                     inAckRange = true;
                 }
                 if (inAckRange) {
                     sendToDLQ(context, node);
-                    node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                    iter.remove();
+                    node.getRegionDestination().getDestinationStatistics()
+                            .getDequeues().increment();
+                    removeList.add(node);
                     dequeueCounter++;
                     index++;
                     acknowledge(context, ack, node);
                     if (ack.getLastMessageId().equals(messageId)) {
-                        prefetchExtension = Math.max(0, prefetchExtension - (index + 1));
+                        prefetchExtension = Math.max(0, prefetchExtension
+                                - (index + 1));
                         callDispatchMatched = true;
                         break;
                     }
                 }
             }
+            for (final MessageReference node : removeList) {
+                dispatched.remove(node);
+            }
             if (!callDispatchMatched) {
-                throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
+                throw new JMSException(
+                        "Could not correlate acknowledgment with dispatched message: "
+                                + ack);
             }
         }
         if (callDispatchMatched) {
             dispatchMatched();
         } else {
             if (isSlave()) {
-                throw new JMSException("Slave broker out of sync with master: Acknowledgment (" + ack + ") was not in the dispatch list: " + dispatched);
+                throw new JMSException(
+                        "Slave broker out of sync with master: Acknowledgment ("
+                                + ack + ") was not in the dispatch list: "
+                                + dispatched);
             } else {
-                LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): " + ack);
+                LOG
+                        .debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
+                                + ack);
             }
         }
     }
@@ -364,6 +408,9 @@
 
     public synchronized void setPending(PendingMessageCursor pending) {
         this.pending = pending;
+        if (this.pending!=null) {
+            this.pending.setSystemUsage(usageManager);
+        }
     }
 
     /**
@@ -439,7 +486,10 @@
             // NULL messages don't count... they don't get Acked.
             if (node != QueueMessageReference.NULL_MESSAGE) {
                 dispatchCounter++;
-                dispatched.addLast(node);
+                dispatched.add(node);
+                if(pending != null) {
+                    pending.dispatched(message);
+                }
             } else {
                 prefetchExtension = Math.max(0, prefetchExtension - 1);
             }
@@ -459,8 +509,6 @@
                 context.getConnection().dispatchSync(md);
                 onDispatch(node, message);
             }
-            // System.err.println(broker.getBrokerName() + " " + this + " (" +
-            // enqueueCounter + ", " + dispatchCounter +") " + node);
             return true;
         } else {
             return false;
@@ -536,4 +584,28 @@
     protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException {
     }
 
+    public boolean isOptimizedDispatch() {
+        return optimizedDispatch;
+    }
+
+    public void setOptimizedDispatch(boolean optimizedDispatch) {
+        this.optimizedDispatch = optimizedDispatch;
+    }
+
+    public int getMaxProducersToAudit() {
+        return maxProducersToAudit;
+    }
+
+    public void setMaxProducersToAudit(int maxProducersToAudit) {
+        this.maxProducersToAudit = maxProducersToAudit;
+    }
+
+    public int getMaxAuditDepth() {
+        return maxAuditDepth;
+    }
+
+    public void setMaxAuditDepth(int maxAuditDepth) {
+        this.maxAuditDepth = maxAuditDepth;
+    }
+
 }
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
index 2d4e2c7..c7b3bbe 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -96,7 +96,7 @@
     private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
     private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
     private final Object exclusiveLockMutex = new Object();
-    private TaskRunner taskRunner;
+    private final TaskRunner taskRunner;
     
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
@@ -449,7 +449,11 @@
         final ConnectionContext context = producerExchange.getConnectionContext();
         message.setRegionDestination(this);
         if (store != null && message.isPersistent()) {
-            systemUsage.getStoreUsage().waitForSpace();
+            while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
+                if (context.getStopping().get()) {
+                    throw new IOException("Connection closed, send aborted.");
+                }
+            }
             store.addMessage(context, message);
         }
         if (context.isInTransaction()) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
index b0bf3d9..9dc76c5 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
@@ -25,14 +25,15 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.usage.SystemUsage;
 
 public class QueueBrowserSubscription extends QueueSubscription {
 
     boolean browseDone;
 
-    public QueueBrowserSubscription(Broker broker, ConnectionContext context, ConsumerInfo info)
+    public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
         throws InvalidSelectorException {
-        super(broker, context, info);
+        super(broker,usageManager, context, info);
     }
 
     protected boolean canDispatch(MessageReference node) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
index a555f7b..41143b1 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
@@ -41,15 +41,15 @@
 
     public String toString() {
         return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size()
-               + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
+               + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
     }
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
         throws InvalidSelectorException {
         if (info.isBrowser()) {
-            return new QueueBrowserSubscription(broker, context, info);
+            return new QueueBrowserSubscription(broker,usageManager, context, info);
         } else {
-            return new QueueSubscription(broker, context, info);
+            return new QueueSubscription(broker, usageManager,context, info);
         }
     }
 
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
index 8f22505..425a8db 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
@@ -28,6 +28,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -35,8 +36,8 @@
 
     private static final Log LOG = LogFactory.getLog(QueueSubscription.class);
 
-    public QueueSubscription(Broker broker, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
-        super(broker, context, info);
+    public QueueSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+        super(broker,usageManager, context, info);
     }
 
     /**
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
index 8ff5a45..1374930 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
@@ -41,7 +41,7 @@
 
     protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
         final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
-        return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory, null) {
+        return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory, null) {
 
             public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
 
@@ -58,14 +58,14 @@
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
         if (info.isBrowser()) {
-            return new QueueBrowserSubscription(broker, context, info);
+            return new QueueBrowserSubscription(broker,usageManager,context, info);
         } else {
-            return new QueueSubscription(broker, context, info);
+            return new QueueSubscription(broker, usageManager,context, info);
         }
     }
 
     public String toString() {
-        return "TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
+        return "TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
     }
 
     public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
index 3758233..b077ff4 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
@@ -47,13 +47,13 @@
             throw new JMSException("A durable subscription cannot be created for a temporary topic.");
         }
         try {
-            TopicSubscription answer = new TopicSubscription(broker, context, info, memoryManager);
+            TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
             // lets configure the subscription depending on the destination
             ActiveMQDestination destination = info.getDestination();
             if (destination != null && broker.getDestinationPolicy() != null) {
                 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
                 if (entry != null) {
-                    entry.configure(broker, memoryManager, answer);
+                    entry.configure(broker, usageManager, answer);
                 }
             }
             answer.init();
@@ -67,7 +67,7 @@
     }
 
     public String toString() {
-        return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
+        return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
     }
 
     public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
index 1c7082f..f4b1611 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -50,6 +50,8 @@
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
@@ -65,7 +67,7 @@
  * 
  * @version $Revision: 1.21 $
  */
-public class Topic  extends BaseDestination {
+public class Topic  extends BaseDestination  implements Task{
     private static final Log LOG = LogFactory.getLog(Topic.class);
     protected final ActiveMQDestination destination;
     protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
@@ -81,28 +83,20 @@
     private boolean sendAdvisoryIfNoConsumers;
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
     private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
-    
+    private final TaskRunner taskRunner;
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
-
-            // We may need to do this in async thread since this is run for
-            // within a synchronization
-            // that the UsageManager is holding.
-
-            synchronized (messagesWaitingForSpace) {
-                while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
-                    Runnable op = messagesWaitingForSpace.removeFirst();
-                    op.run();
+                try {
+                    Topic.this.taskRunner.wakeup();
+                } catch (InterruptedException e) {
                 }
-            }
-
         };
     };
     private final Broker broker;
 
     public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats,
-                 TaskRunnerFactory taskFactory) {
+                 TaskRunnerFactory taskFactory) throws Exception {
         this.broker = broker;
         this.destination = destination;
         this.store = store; // this could be NULL! (If an advisory)
@@ -115,7 +109,8 @@
         }else{
         	//set the default
         	subscriptionRecoveryPolicy= new FixedSizedSubscriptionRecoveryPolicy();
-        }
+        } 
+        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
         // Let the store know what usage manager we are using so that he can
         // flush messages to disk
         // when usage gets high.
@@ -388,41 +383,53 @@
      * @throws IOException
      * @throws Exception
      */
-    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
-        final ConnectionContext context = producerExchange.getConnectionContext();
+    synchronized void doMessageSend(
+            final ProducerBrokerExchange producerExchange, final Message message)
+            throws IOException, Exception {
+        final ConnectionContext context = producerExchange
+                .getConnectionContext();
         message.setRegionDestination(this);
 
-        if (store != null && message.isPersistent() && !canOptimizeOutPersistence()) {
-            systemUsage.getStoreUsage().waitForSpace();
+        if (store != null && message.isPersistent()
+                && !canOptimizeOutPersistence()) {
+            while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
+                if (context.getStopping().get()) {
+                    throw new IOException("Connection closed, send aborted.");
+                }
+            }
             store.addMessage(context, message);
         }
 
         message.incrementReferenceCount();
-        try {
 
-            if (context.isInTransaction()) {
-                context.getTransaction().addSynchronization(new Synchronization() {
-                    public void afterCommit() throws Exception {
-                        // It could take while before we receive the commit
-                        // operration.. by that time the message could have
-                        // expired..
-                        if (broker.isExpired(message)) {
-                            broker.messageExpired(context, message);
-                            message.decrementReferenceCount();
-                            destinationStatistics.getMessages().decrement();
-                            return;
-                        }
-                        dispatch(context, message);
+        if (context.isInTransaction()) {
+            context.getTransaction().addSynchronization(new Synchronization() {
+                public void afterCommit() throws Exception {
+                    // It could take while before we receive the commit
+                    // operration.. by that time the message could have
+                    // expired..
+                    if (broker.isExpired(message)) {
+                        broker.messageExpired(context, message);
+                        message.decrementReferenceCount();
+                        destinationStatistics.getMessages().decrement();
+                        return;
                     }
-                });
+                    try {
+                        dispatch(context, message);
+                    } finally {
+                        message.decrementReferenceCount();
+                    }
+                }
+            });
 
-            } else {
+        } else {
+            try {
                 dispatch(context, message);
+            } finally {
+                message.decrementReferenceCount();
             }
-
-        } finally {
-            message.decrementReferenceCount();
         }
+
     }
 
     private boolean canOptimizeOutPersistence() {
@@ -463,6 +470,9 @@
     }
 
     public void stop() throws Exception {
+        if (taskRunner != null) {
+            taskRunner.shutdown();
+        }
         this.subscriptionRecoveryPolicy.stop();
         if (memoryUsage != null) {
             memoryUsage.stop();
@@ -499,6 +509,15 @@
         }
         return result.toArray(new Message[result.size()]);
     }
+    
+    public boolean iterate() {
+        while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
+            Runnable op = messagesWaitingForSpace.removeFirst();
+            op.run();
+        }
+        return false;
+    }
+
 
     // Properties
     // -------------------------------------------------------------------------
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
index f8f5cc0..59b8813 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
@@ -100,7 +100,7 @@
                                            + " subscriberName: " + key.getSubscriptionName());
                 }
             }
-            sub.activate(memoryManager, context, info);
+            sub.activate(usageManager, context, info);
             return sub;
         } else {
             return super.addConsumer(context, info);
@@ -140,7 +140,7 @@
     }
 
     public String toString() {
-        return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
+        return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
     }
 
     @Override
@@ -230,12 +230,12 @@
             SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
             DurableTopicSubscription sub = durableSubscriptions.get(key);
             if (sub == null) {
-                sub = new DurableTopicSubscription(broker, memoryManager, context, info, keepDurableSubsActive);
+                sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
                 ActiveMQDestination destination = info.getDestination();
                 if (destination != null && broker.getDestinationPolicy() != null) {
                     PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
                     if (entry != null) {
-                        entry.configure(broker, memoryManager, sub);
+                        entry.configure(broker, usageManager, sub);
                     }
                 }
                 durableSubscriptions.put(key, sub);
@@ -245,13 +245,13 @@
             return sub;
         }
         try {
-            TopicSubscription answer = new TopicSubscription(broker, context, info, memoryManager);
+            TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
             // lets configure the subscription depending on the destination
             ActiveMQDestination destination = info.getDestination();
             if (destination != null && broker.getDestinationPolicy() != null) {
                 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
                 if (entry != null) {
-                    entry.configure(broker, memoryManager, answer);
+                    entry.configure(broker, usageManager, answer);
                 }
             }
             answer.init();
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
index 80c4976..7395439 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
@@ -50,6 +50,7 @@
 
     public synchronized void stop() throws Exception  {
         started=false;
+        audit=null;
         gc();
     }
 
@@ -238,6 +239,13 @@
     public boolean isTransient() {
         return false;
     }
+    
+    /**
+     * Mark a message as already dispatched
+     * @param message
+     */
+    public void dispatched(MessageReference message) {   
+    }
 
 
     protected synchronized boolean  isDuplicate(MessageId messageId) {
@@ -246,7 +254,12 @@
         }
         return this.audit.isDuplicate(messageId);
     }
-
-   
+    
+    protected synchronized void rollback(MessageId id) {
+        if (this.audit != null) {
+            audit.rollback(id);
+        }
+    }
+  
    
 }
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 0921071..7bcb9c1 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -142,6 +142,7 @@
             for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) {
                 Message message = (Message)i.next();
                 message.setRegionDestination(regionDestination);
+                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
                 message.incrementReferenceCount();
                 result.add(message);
                 count++;
@@ -210,6 +211,7 @@
         if (!isDiskListEmpty()) {
             // got from disk
             message.setRegionDestination(regionDestination);
+            message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
             message.incrementReferenceCount();
         }
         return message;
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
index 8ff3f84..13e3fad 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
@@ -247,6 +247,12 @@
      * disappears when the broker shuts down
      */
     public boolean isTransient();
+    
+    /**
+     * Mark a message as already dispatched
+     * @param message
+     */
+    public void dispatched(MessageReference message);
 
 
 }
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index 4f66292..56f597b 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -119,6 +119,7 @@
         Message result = batchList.removeFirst();
         result.decrementReferenceCount();
         result.setRegionDestination(regionDestination);
+        result.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
         return result;
     }
 
@@ -133,6 +134,7 @@
             throws Exception {
         if (!isDuplicate(message.getMessageId())) {
             message.setRegionDestination(regionDestination);
+            message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
             message.incrementReferenceCount();
             batchList.addLast(message);
         } else {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
index 13396ba..f5d38d6 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
@@ -288,6 +288,20 @@
             nonPersistent.setEnableAudit(enableAudit);
         }
     }
+    
+    /**
+     * Mark a message as already dispatched
+     * @param message
+     */
+    public void dispatched(MessageReference message) {
+        super.dispatched(message);
+        for (PendingMessageCursor cursor : storePrefetches) {
+            cursor.dispatched(message);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.dispatched(message);
+        }
+    }
 
     protected synchronized PendingMessageCursor getNextCursor() throws Exception {
         if (currentCursor == null || currentCursor.isEmpty()) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index a281c12..495b8ae 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -18,7 +18,9 @@
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -28,6 +30,9 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -37,21 +42,19 @@
  * 
  * @version $Revision$
  */
-class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener {
+class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
 
     private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
     private TopicMessageStore store;
-    private final LinkedList<Message> batchList = new LinkedList<Message>();
+    private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
     private String clientId;
     private String subscriberName;
     private Destination regionDestination;
-    private MessageId firstMessageId;
-    private MessageId lastMessageId;
     private boolean batchResetNeeded = true;
     private boolean storeMayHaveMoreMessages = true;
     private boolean started;
     private final Subscription subscription;
-
+   
     /**
      * @param topic
      * @param clientId
@@ -63,12 +66,15 @@
         this.store = (TopicMessageStore)topic.getMessageStore();
         this.clientId = clientId;
         this.subscriberName = subscriberName;
+        this.maxProducersToAudit=32;
+        this.maxAuditDepth=10000;
     }
 
     public synchronized void start() throws Exception {
         if (!started) {
             started = true;
             super.start();
+            getSystemUsage().getMemoryUsage().addUsageListener(this);
             safeFillBatch();
         }
     }
@@ -76,6 +82,7 @@
     public synchronized void stop() throws Exception {
         if (started) {
             started = false;
+            getSystemUsage().getMemoryUsage().removeUsageListener(this);
             super.stop();
             store.resetBatching(clientId, subscriberName);
             gc();
@@ -97,22 +104,16 @@
 
     public synchronized void addMessageLast(MessageReference node) throws Exception {
         if (node != null) {
-            if (isEmpty() && started) {
-                firstMessageId = node.getMessageId();
-            }
-            lastMessageId = node.getMessageId();
-            node.decrementReferenceCount();
             storeMayHaveMoreMessages=true;
+            node.decrementReferenceCount();
         }
     }
 
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
         if (node != null) {
-            if (started) {
-                firstMessageId = node.getMessageId();
-            }
-            node.decrementReferenceCount();
             storeMayHaveMoreMessages=true;
+            node.decrementReferenceCount();
+            rollback(node.getMessageId());
         }
     }
 
@@ -127,7 +128,8 @@
     }
 
     public synchronized boolean hasNext() {
-        return !isEmpty();
+        boolean result =  !isEmpty();
+        return result;
     }
 
     public synchronized MessageReference next() {
@@ -136,13 +138,11 @@
         if (batchList.isEmpty()) {
             return null;
         } else {
-            result = batchList.removeFirst();
-            if (lastMessageId != null) {
-                if (result.getMessageId().equals(lastMessageId)) {
-                    // pendingCount=0;
-                }
-            }
+            Iterator i = batchList.entrySet().iterator();
+            result = (Message) ((Map.Entry)i.next()).getValue();
+            i.remove();
             result.setRegionDestination(regionDestination);
+            result.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
         }
         return result;
     }
@@ -154,16 +154,23 @@
     public void finished() {
     }
 
-    public synchronized boolean recoverMessage(Message message) throws Exception {
+    public synchronized boolean recoverMessage(Message message)
+            throws Exception {
         MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
         messageEvaluationContext.setMessageReference(message);
-        if( subscription.matches(message, messageEvaluationContext) ) {
+        if (subscription.matches(message, messageEvaluationContext)) {
             message.setRegionDestination(regionDestination);
-            // only increment if count is zero (could have been cached)
-            if (message.getReferenceCount() == 0) {
-                message.incrementReferenceCount();
+            if (!isDuplicate(message.getMessageId())) {
+                // only increment if count is zero (could have been cached)
+                if (message.getReferenceCount() == 0) {
+                    message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+                    message.incrementReferenceCount();
+                   
+                }
+                batchList.put(message.getMessageId(), message);
+            }else {
+                this.storeMayHaveMoreMessages=true;
             }
-            batchList.addLast(message);
         }
         return true;
     }
@@ -172,9 +179,23 @@
         // shouldn't get called
         throw new RuntimeException("Not supported");
     }
+    
+    /**
+     * Mark a message as already dispatched
+     * @param message
+     */  
+    public synchronized void dispatched(MessageReference message) {
+        if (this.audit != null) {
+            isDuplicate(message.getMessageId());
+            Message removed = this.batchList.remove(message.getMessageId());
+            if (removed != null) {
+                removed.decrementReferenceCount();
+            }
+        }
+    }
 
     // implementation
-    protected void safeFillBatch() {
+    protected synchronized void safeFillBatch() {
         try {
             fillBatch();
         } catch (Exception e) {
@@ -184,29 +205,17 @@
     }
 
     protected synchronized void fillBatch() throws Exception {
-        if( batchResetNeeded ) {
-            store.resetBatching(clientId, subscriberName);
-            batchResetNeeded=false;
-            storeMayHaveMoreMessages=true;
+        if (batchResetNeeded) {
+            this.store.resetBatching(clientId, subscriberName);
+            this.batchResetNeeded = false;
+            this.storeMayHaveMoreMessages = true;
         }
-        
-        while( batchList.isEmpty() && storeMayHaveMoreMessages ) {
-            store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this);
-            if( batchList.isEmpty() ) {
-                storeMayHaveMoreMessages = false;
-            } else {
-                if (firstMessageId != null) {
-                    int pos = 0;
-                    for (Iterator<Message> iter = batchList.iterator(); iter.hasNext();) {
-                        Message msg = iter.next();
-                        if (msg.getMessageId().equals(firstMessageId)) {
-                            firstMessageId = null;
-                            break;
-                        } else {
-                            iter.remove();
-                        }
-                    }
-                }
+        while (this.batchList.isEmpty() && this.storeMayHaveMoreMessages) {
+            this.storeMayHaveMoreMessages = false;
+            this.store.recoverNextMessages(clientId, subscriberName,
+                    maxBatchSize, this);
+            if (!this.batchList.isEmpty()) {
+                this.storeMayHaveMoreMessages=true;
             }
         }
     }
@@ -221,12 +230,23 @@
     }
 
     public synchronized void gc() {
-        for (Message msg : batchList) {
+        for (Message msg : batchList.values()) {
             msg.decrementReferenceCount();
         }
         batchList.clear();
         batchResetNeeded = true;
     }
+    
+    public void onUsageChanged(Usage usage, int oldPercentUsage,int newPercentUsage) {
+        if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
+            storeMayHaveMoreMessages = true;
+            try {
+                fillBatch();
+            } catch (Exception e) {
+                LOG.error("Failed to fill batch ", e);
+            }
+        }
+    }
 
     public String toString() {
         return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")";
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 0697c3a..c20cfaa 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -51,11 +51,13 @@
     private PendingQueueMessageStoragePolicy pendingQueuePolicy;
     private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
     private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
-    private int maxProducersToAudit=1024;
-    private int maxAuditDepth=1;
+    private int maxProducersToAudit=32;
+    private int maxAuditDepth=1024;
+    private int maxQueueAuditDepth=1;
     private boolean enableAudit=true;
     private boolean producerFlowControl = true;
-
+    private boolean optimizedDispatch=false;
+   
     public void configure(Queue queue, Store tmpStore) {
         if (dispatchPolicy != null) {
             queue.setDispatchPolicy(dispatchPolicy);
@@ -73,7 +75,7 @@
         }
         queue.setProducerFlowControl(isProducerFlowControl());
         queue.setEnableAudit(isEnableAudit());
-        queue.setMaxAuditDepth(getMaxAuditDepth());
+        queue.setMaxAuditDepth(getMaxQueueAuditDepth());
         queue.setMaxProducersToAudit(getMaxProducersToAudit());
     }
 
@@ -132,6 +134,8 @@
             cursor.setSystemUsage(memoryManager);
             sub.setPending(cursor);
         }
+        sub.setMaxAuditDepth(getMaxAuditDepth());
+        sub.setMaxProducersToAudit(getMaxProducersToAudit());
     }
 
     // Properties
@@ -331,4 +335,20 @@
         this.enableAudit = enableAudit;
     }
 
+    public int getMaxQueueAuditDepth() {
+        return maxQueueAuditDepth;
+    }
+
+    public void setMaxQueueAuditDepth(int maxQueueAuditDepth) {
+        this.maxQueueAuditDepth = maxQueueAuditDepth;
+    }
+
+    public boolean isOptimizedDispatch() {
+        return optimizedDispatch;
+    }
+
+    public void setOptimizedDispatch(boolean optimizedDispatch) {
+        this.optimizedDispatch = optimizedDispatch;
+    }
+
 }
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Message.java b/activemq-core/src/main/java/org/apache/activemq/command/Message.java
index ba61fc1..03150cd 100755
--- a/activemq-core/src/main/java/org/apache/activemq/command/Message.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/Message.java
@@ -26,6 +26,7 @@
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -81,6 +82,7 @@
     private transient short referenceCount;
     private transient ActiveMQConnection connection;
     private transient org.apache.activemq.broker.region.Destination regionDestination;
+    private transient MemoryUsage memoryUsage;
 
     private BrokerId[] brokerPath;
     private BrokerId[] cluster;
@@ -127,6 +129,7 @@
         copy.regionDestination = regionDestination;
         copy.brokerInTime = brokerInTime;
         copy.brokerOutTime = brokerOutTime;
+        copy.memoryUsage=this.memoryUsage;
         // copying the broker path breaks networks - if a consumer re-uses a
         // consumed
         // message and forwards it on
@@ -567,6 +570,17 @@
 
     public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
         this.regionDestination = destination;
+        if(this.memoryUsage==null) {
+            this.memoryUsage=regionDestination.getBrokerMemoryUsage();
+        }
+    }
+    
+    public MemoryUsage getMemoryUsage() {
+        return this.memoryUsage;
+    }
+    
+    public void setMemoryUsage(MemoryUsage usage) {
+        this.memoryUsage=usage;
     }
 
     public boolean isMarshallAware() {
@@ -581,16 +595,15 @@
             size = getSize();
         }
 
-        if (rc == 1 && regionDestination != null) {
-            regionDestination.getBrokerMemoryUsage().increaseUsage(size);
+        if (rc == 1 && getMemoryUsage() != null) {
+            getMemoryUsage().increaseUsage(size);
         }
 
-        // System.out.println(" + "+getDestination()+" :::: "+getMessageId()+"
-        // "+rc);
+        //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
         return rc;
     }
 
-    public synchronized int decrementReferenceCount() {
+    public int decrementReferenceCount() {
         int rc;
         int size;
         synchronized (this) {
@@ -598,11 +611,10 @@
             size = getSize();
         }
 
-        if (rc == 0 && regionDestination != null) {
-            regionDestination.getBrokerMemoryUsage().decreaseUsage(size);
+        if (rc == 0 && getMemoryUsage() != null) {
+            getMemoryUsage().decreaseUsage(size);
         }
-        // System.out.println(" - "+getDestination()+" :::: "+getMessageId()+"
-        // "+rc);
+        //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
 
         return rc;
     }
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java b/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
index c073dc6..c7872f9 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
@@ -49,6 +49,11 @@
      * Command Marshaller
      */
     Marshaller COMMAND_MARSHALLER = new CommandMarshaller();
+    
+    /**
+     * MessageId marshaller
+     */
+    Marshaller MESSAGEID_MARSHALLER = new MessageIdMarshaller();
 
     /**
      * close the store
@@ -270,4 +275,5 @@
     public boolean isPersistentIndex();
     
 	public void setPersistentIndex(boolean persistentIndex);
+	
 }
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
index 5745417..d26bf52 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
@@ -178,15 +178,7 @@
             }
         }
         if (directory != null && directory.isDirectory()) {
-            File[] files = directory.listFiles();
-            if (files != null) {
-                for (int i = 0; i < files.length; i++) {
-                    File file = files[i];
-                    if (!file.isDirectory()) {
-                        result &= file.delete();
-                    }
-                }
-            }
+            result =IOHelper.deleteChildren(directory);
             String str = result ? "successfully deleted" : "failed to delete";
             LOG.info("Kaha Store " + str + " data directory " + directory);
         }
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
index 1d68e2e..bebff6c 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
@@ -28,6 +28,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -39,10 +40,11 @@
 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+
+
 /**
  * Manages DataFiles
  * 
@@ -87,6 +89,7 @@
     private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
 
     private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+    private Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
     private DataFile currentWriteFile;
 
     private Location mark;
@@ -131,7 +134,7 @@
                 return dir.equals(directory) && n.startsWith(filePrefix);
             }
         });
-
+       
         if (files != null) {
             for (int i = 0; i < files.length; i++) {
                 try {
@@ -157,6 +160,7 @@
                     currentWriteFile.linkAfter(df);
                 }
                 currentWriteFile = df;
+                fileByFileMap.put(df.getFile(), df);
             }
         }
 
@@ -254,8 +258,10 @@
             int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1;
 
             String fileName = filePrefix + nextNum;
-            DataFile nextWriteFile = new DataFile(new File(directory, fileName), nextNum, preferedFileLength);
+            File file = new File(directory, fileName);
+            DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
             fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
+            fileByFileMap.put(file, nextWriteFile);
             if (currentWriteFile != null) {
                 currentWriteFile.linkAfter(nextWriteFile);
                 if (currentWriteFile.isUnused()) {
@@ -289,6 +295,16 @@
         }
         return dataFile;
     }
+    
+    File getFile(Location item) throws IOException {
+        Integer key = Integer.valueOf(item.getDataFileId());
+        DataFile dataFile = fileMap.get(key);
+        if (dataFile == null) {
+            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+            throw new IOException("Could not locate data file " + filePrefix + "-" + item.getDataFileId());
+        }
+        return dataFile.getFile();
+    }
 
     private DataFile getNextDataFile(DataFile dataFile) {
         return (DataFile)dataFile.getNext();
@@ -303,6 +319,7 @@
         storeState(false);
         appender.close();
         fileMap.clear();
+        fileByFileMap.clear();
         controlFile.unlock();
         controlFile.dispose();
         started = false;
@@ -327,6 +344,7 @@
             result &= dataFile.delete();
         }
         fileMap.clear();
+        fileByFileMap.clear();
         lastAppendLocation.set(null);
         mark = null;
         currentWriteFile = null;
@@ -415,6 +433,7 @@
     private synchronized void forceRemoveDataFile(DataFile dataFile)
             throws IOException {
         accessorPool.disposeDataFileAccessors(dataFile);
+        fileByFileMap.remove(dataFile.getFile());
         DataFile removed = fileMap.remove(dataFile.getDataFileId());
         storeSize.addAndGet(-dataFile.getLength());
         dataFile.unlink();
@@ -461,16 +480,6 @@
                     cur = new Location();
                     cur.setDataFileId(head.getDataFileId());
                     cur.setOffset(0);
-
-                    // DataFileAccessor reader =
-                    // accessorPool.openDataFileAccessor(head);
-                    // try {
-                    // if( !reader.readLocationDetailsAndValidate(cur) ) {
-                    // return null;
-                    // }
-                    // } finally {
-                    // accessorPool.closeDataFileAccessor(reader);
-                    // }
                 } else {
                     // Set to the next offset..
                     cur = new Location(location);
@@ -509,6 +518,64 @@
             }
         }
     }
+    
+    public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{
+        DataFile df = fileByFileMap.get(file);
+        return getNextLocation(df, lastLocation,thisFileOnly);
+    }
+    
+    public synchronized Location getNextLocation(DataFile dataFile,
+            Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException {
+
+        Location cur = null;
+        while (true) {
+            if (cur == null) {
+                if (lastLocation == null) {
+                    DataFile head = (DataFile)dataFile.getHeadNode();
+                    cur = new Location();
+                    cur.setDataFileId(head.getDataFileId());
+                    cur.setOffset(0);
+                } else {
+                    // Set to the next offset..
+                    cur = new Location(lastLocation);
+                    cur.setOffset(cur.getOffset() + cur.getSize());
+                }
+            } else {
+                cur.setOffset(cur.getOffset() + cur.getSize());
+            }
+
+            
+            // Did it go into the next file??
+            if (dataFile.getLength() <= cur.getOffset()) {
+                if (thisFileOnly) {
+                    return null;
+                }else {
+                dataFile = getNextDataFile(dataFile);
+                if (dataFile == null) {
+                    return null;
+                } else {
+                    cur.setDataFileId(dataFile.getDataFileId().intValue());
+                    cur.setOffset(0);
+                }
+                }
+            }
+
+            // Load in location size and type.
+            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+            try {
+                reader.readLocationDetails(cur);
+            } finally {
+                accessorPool.closeDataFileAccessor(reader);
+            }
+
+            if (cur.getType() == 0) {
+                return null;
+            } else if (cur.getType() > 0) {
+                // Only return user records.
+                return cur;
+            }
+        }
+    }
 
     public ByteSequence read(Location location) throws IOException, IllegalStateException {
         DataFile dataFile = getDataFile(location);
@@ -611,4 +678,12 @@
             return null;
         return currentWriteFile.getDataFileId();
     }
+    
+    /**
+     * Get a set of files - only valid after start()
+     * @return files currently being used
+     */
+    public Set<File> getFiles(){
+        return fileByFileMap.keySet();
+    }
 }
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
index e6672f9..43e9c83 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
@@ -43,6 +43,10 @@
         this.dataFileId = Integer.valueOf(number);
         length = (int)(file.exists() ? file.length() : 0);
     }
+    
+    File getFile() {
+        return file;
+    }
 
     public Integer getDataFileId() {
         return dataFileId;
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
index e1f1b76..1bbab91 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
@@ -187,8 +187,9 @@
     protected final void delete(final IndexItem keyItem, final IndexItem prevItem, final IndexItem nextItem) {
         if (keyItem != null) {
             try {
+                root = indexList.getRoot();
                 IndexItem prev = prevItem == null ? root : prevItem;
-                IndexItem next = nextItem != root ? nextItem : null;
+                IndexItem next = (nextItem == null || !nextItem.equals(root)) ? nextItem : null;
                 dataManager.removeInterestInFile(keyItem.getKeyFile());
                 dataManager.removeInterestInFile(keyItem.getValueFile());
                 if (next != null) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
index 9aeda1f..fcaa268 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
@@ -821,6 +821,9 @@
                     next = indexList.getNextEntry(root);
                 } else if (insertPos >= indexList.size()) {
                     prev = indexList.getLast();
+                    if (prev==null) {
+                        prev=root;
+                    }
                     next = null;
                 } else {
                     prev = indexList.get(insertPos);
@@ -836,6 +839,7 @@
                     updateIndexes(next);
                 }
                 storeIndex(index);
+                indexList.setRoot(root);
             }
         } catch (IOException e) {
             LOG.error("Failed to insert " + value, e);
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
index 626c84e..c34bf37 100755
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
@@ -37,6 +37,7 @@
 import org.apache.activemq.kaha.impl.index.IndexManager;
 import org.apache.activemq.kaha.impl.index.VMIndex;
 import org.apache.activemq.kaha.impl.index.hash.HashIndex;
+import org.apache.activemq.util.IOHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -67,7 +68,7 @@
         if (index == null) {
             if (persistentIndex) {
                 String name = containerId.getDataContainerName() + "_" + containerId.getKey();
-                name = name.replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_");
+                name=IOHelper.toFileSystemSafeName(name);
                 try {
                     HashIndex hashIndex = new HashIndex(directory, name, indexManager);
                     hashIndex.setNumberOfBins(getIndexBinSize());
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
index 57a9242..b6b601d 100755
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
@@ -42,7 +42,7 @@
         return root;
     }
 
-    void setRoot(IndexItem e) {
+    public void setRoot(IndexItem e) {
         this.root = e;
     }
 
@@ -186,7 +186,7 @@
      * @throws IndexOutOfBoundsException if the specified index is out of range (<tt>index &lt; 0 || index &gt; size()</tt>).
      */
     public synchronized void add(int index, IndexItem element) {
-        if (index == size - 1) {
+        if (index == size) {
             last = element;
         }
         size++;
@@ -297,7 +297,7 @@
 		}
 		// essential root get's updated consistently
 		if (result != null && root != null && root.equals(result)) {
-			return root;
+			return null;
 		}
 		return result;
 	}
@@ -340,7 +340,7 @@
     }
 
     public synchronized void remove(IndexItem e) {
-        if (e == root || e.equals(root)) {
+        if (e==null || e == root || e.equals(root)) {
             return;
         }
         if (e == last || e.equals(last)) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java
index 8c58316..1ea1a89 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java
@@ -24,6 +24,12 @@
  * @version $Revision$
  */
 public interface IndexLinkedList {
+    
+    /**
+     * Set the new Root
+     * @param newRoot
+     */
+    void setRoot(IndexItem newRoot);
 
     /**
      * @return the root used by the List
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
index 8b93236..99c5062 100755
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
@@ -35,6 +35,10 @@
         this.root = header;
         this.root.next=this.root.prev=this.root;
     }
+    
+    public  void setRoot(IndexItem newRoot) {
+        this.root=newRoot;
+    }
 
     public synchronized IndexItem getRoot() {
         return root;
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
index b14785a..2d40489 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
@@ -59,13 +59,15 @@
         topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
     }
 
-    public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, final MessageRecoveryListener listener) throws Exception {
+    public void recoverNextMessages(String clientId, String subscriptionName,
+            int maxReturned, final MessageRecoveryListener listener)
+            throws Exception {
         RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
-        topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, recoveryListener);
-        if (recoveryListener.size() == 0) {
-            flush();
-            topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, recoveryListener);
-        }
+            topicReferenceStore.recoverNextMessages(clientId, subscriptionName,maxReturned, recoveryListener);
+            if (recoveryListener.size() == 0) {
+                flush();
+                topicReferenceStore.recoverNextMessages(clientId,subscriptionName, maxReturned, recoveryListener);
+            }
     }
 
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
@@ -145,14 +147,18 @@
      * @param key
      * @throws IOException 
      */
-    protected void acknowledge(ConnectionContext context,MessageId messageId, Location location, String clientId,String subscriptionName) throws IOException {
+    protected void acknowledge(ConnectionContext context, MessageId messageId,
+            Location location, String clientId, String subscriptionName)
+            throws IOException {
         synchronized (this) {
             lastLocation = location;
-            if (topicReferenceStore.acknowledgeReference(context, clientId, subscriptionName, messageId)){
-                MessageAck ack = new MessageAck();
-                ack.setLastMessageId(messageId);
-                removeMessage(context, ack);
-            }
+        }
+        if (topicReferenceStore.acknowledgeReference(context, clientId,
+                subscriptionName, messageId)) {
+            MessageAck ack = new MessageAck();
+            ack.setLastMessageId(messageId);
+            removeMessage(context, ack);
+
         }
         try {
             asyncWriteTask.wakeup();
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
index 9fddf34..ef43ed2 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
@@ -63,10 +63,14 @@
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    protected final boolean recoverReference(MessageRecoveryListener listener, ReferenceRecord record)
-        throws Exception {
-        listener.recoverMessageReference(new MessageId(record.getMessageId()));
-        return listener.hasSpace();
+    protected final boolean recoverReference(MessageRecoveryListener listener,
+            ReferenceRecord record) throws Exception {
+        MessageId id = new MessageId(record.getMessageId());
+        if (listener.hasSpace()) {
+            listener.recoverMessageReference(id);
+            return true;
+        }
+        return false;
     }
 
     public synchronized void recover(MessageRecoveryListener listener) throws Exception {
@@ -90,14 +94,15 @@
                 entry = messageContainer.getNext(entry);
             }
         }
-        if (entry != null) {
+        if (entry != null) {      
             int count = 0;
             do {
                 ReferenceRecord msg = messageContainer.getValue(entry);
-                if (msg != null) {
-                    recoverReference(listener, msg);
-                    count++;
-                    lastBatchId = msg.getMessageId();
+                if (msg != null ) {
+                    if ( recoverReference(listener, msg)) {
+                        count++;
+                        lastBatchId = msg.getMessageId();
+                    }
                 } else {
                     lastBatchId = null;
                 }
@@ -134,7 +139,7 @@
         removeMessage(ack.getLastMessageId());
     }
 
-    public synchronized void removeMessage(MessageId msgId) throws IOException {
+    public synchronized void removeMessage(MessageId msgId) throws IOException {    
         StoreEntry entry = messageContainer.getEntry(msgId);
         if (entry != null) {
             ReferenceRecord rr = messageContainer.remove(msgId);
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
index 4744256..4eed8eb 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
@@ -56,7 +56,7 @@
     private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
     private static final String STORE_STATE = "store-state";
     private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
-    private static final Integer INDEX_VERSION = new Integer(2);
+    private static final Integer INDEX_VERSION = new Integer(3);
     private static final String RECORD_REFERENCES = "record-references";
     private static final String TRANSACTIONS = "transactions-state";
     private MapContainer stateMap;
@@ -91,11 +91,13 @@
         boolean empty = store.getMapContainerIds().isEmpty();
         stateMap = store.getMapContainer("state", STORE_STATE);
         stateMap.load();
+        storeValid=true;
         if (!empty) {
             AtomicBoolean status = (AtomicBoolean)stateMap.get(STORE_STATE);
             if (status != null) {
                 storeValid = status.get();
             }
+           
             if (storeValid) {
                 //check what version the indexes are at
                 Integer indexVersion = (Integer) stateMap.get(INDEX_VERSION_NAME);
@@ -236,7 +238,9 @@
      * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
      */
     public void clearMessages() throws IOException {
-        deleteAllMessages();
+        //don't delete messages as it will clear state - call base
+        //class method to clear out the data instead
+        super.deleteAllMessages();
     }
 
     /**
@@ -247,6 +251,7 @@
     public void recoverState() throws IOException {
         for (Iterator<SubscriptionInfo> i = durableSubscribers.iterator(); i.hasNext();) {
             SubscriptionInfo info = i.next();
+            LOG.info("Recovering subscriber state for durable subscriber: " + info);
             TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
             ts.addSubsciption(info, false);
         }
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
index 61a45af..f58689f 100755
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
@@ -117,7 +117,7 @@
             subscriberContainer.put(key, info);
         }
         // add the subscriber
-        ListContainer container = addSubscriberMessageContainer(key);
+        addSubscriberMessageContainer(key);
         /*
          * if(retroactive){ for(StoreEntry
          * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
@@ -200,33 +200,39 @@
         return result;
     }
 
-    protected ListContainer addSubscriberMessageContainer(Object key) throws IOException {
-        ListContainer container = store.getListContainer(key, "topic-subs");
+    protected MapContainer addSubscriberMessageContainer(Object key) throws IOException {
+        MapContainer container = store.getMapContainer(key, "topic-subs");
+        container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
         Marshaller marshaller = new ConsumerMessageRefMarshaller();
-        container.setMarshaller(marshaller);
+        container.setValueMarshaller(marshaller);
         TopicSubContainer tsc = new TopicSubContainer(container);
         subscriberMessages.put(key, tsc);
         return container;
     }
 
-    protected void removeSubscriberMessageContainer(Object key) throws IOException {
+    protected void removeSubscriberMessageContainer(Object key)
+            throws IOException {
         subscriberContainer.remove(key);
         TopicSubContainer container = subscriberMessages.remove(key);
-        for (Iterator i = container.iterator(); i.hasNext();) {
-            ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
-            if (ref != null) {
-                TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
-                if (tsa != null) {
-                    if (tsa.decrementCount() <= 0) {
-                        ackContainer.remove(ref.getAckEntry());
-                        messageContainer.remove(tsa.getMessageEntry());
-                    } else {
-                        ackContainer.update(ref.getAckEntry(), tsa);
+        if (container != null) {
+            for (Iterator i = container.iterator(); i.hasNext();) {
+                ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
+                if (ref != null) {
+                    TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
+                    if (tsa != null) {
+                        if (tsa.decrementCount() <= 0) {
+                            ackContainer.remove(ref.getAckEntry());
+                            messageContainer.remove(tsa.getMessageEntry());
+                        } else {
+                            ackContainer.update(ref.getAckEntry(), tsa);
+                        }
                     }
                 }
             }
+            container.clear();
         }
         store.deleteListContainer(key, "topic-subs");
+
     }
 
     public int getMessageCount(String clientId, String subscriberName) throws IOException {
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
index 2da1537..607a316 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
@@ -33,6 +33,7 @@
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.util.SubscriptionKey;
 
 public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
 
@@ -40,6 +41,7 @@
     protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>();
     private Map<String, SubscriptionInfo> subscriberContainer;
     private Store store;
+    private static final String TOPIC_SUB_NAME = "tsn";
 
     public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter,
                                    MapContainer<MessageId, ReferenceRecord> messageContainer, ListContainer<TopicSubAck> ackContainer,
@@ -108,10 +110,12 @@
         }
     }
 
-    protected ListContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
-        ListContainer container = store.getListContainer(clientId+":"+subscriptionName+":"+destination.getQualifiedName(), "topic-subs-references");
+    
+    protected MapContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
+        MapContainer container = store.getMapContainer(getSubscriptionContainerName(getSubscriptionKey(clientId, subscriptionName)));
+        container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
         Marshaller marshaller = new ConsumerMessageRefMarshaller();
-        container.setMarshaller(marshaller);
+        container.setValueMarshaller(marshaller);
         TopicSubContainer tsc = new TopicSubContainer(container);
         subscriberMessages.put(getSubscriptionKey(clientId, subscriptionName), tsc);
         return container;
@@ -192,7 +196,7 @@
             adapter.addSubscriberState(info);
         }
         // add the subscriber
-        ListContainer container = addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
+        addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
         if (retroactive) {
             /*
              * for(StoreEntry
@@ -210,13 +214,13 @@
         if (info != null) {
             adapter.removeSubscriberState(info);
         }
-        String key = getSubscriptionKey(clientId, subscriptionName);
-        removeSubscriberMessageContainer(key);
+        removeSubscriberMessageContainer(clientId,subscriptionName);
     }
 
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-        return subscriberContainer.values()
+        SubscriptionInfo[] result = subscriberContainer.values()
             .toArray(new SubscriptionInfo[subscriberContainer.size()]);
+        return result;
     }
 
     public int getMessageCount(String clientId, String subscriberName) throws IOException {
@@ -244,15 +248,19 @@
                     entry = container.getNextEntry(entry);
                 }
             }
-
+           
             if (entry != null) {
                 do {
                     ConsumerMessageRef consumerRef = container.get(entry);
-                    ReferenceRecord msg = messageContainer.getValue(consumerRef.getMessageEntry());
+                    ReferenceRecord msg = messageContainer.getValue(consumerRef
+                            .getMessageEntry());
                     if (msg != null) {
-                        recoverReference(listener, msg);
-                        count++;
-                        container.setBatchEntry(msg.getMessageId(), entry);
+                        if (recoverReference(listener, msg)) {
+                            count++;
+                            container.setBatchEntry(msg.getMessageId(), entry);
+                        } else {
+                            break;
+                        }
                     } else {
                         container.reset();
                     }
@@ -288,9 +296,11 @@
         }
     }
 
-    protected void removeSubscriberMessageContainer(String key) throws IOException {
-        subscriberContainer.remove(key);
-        TopicSubContainer container = subscriberMessages.remove(key);
+    protected void removeSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
+        String subscriberKey = getSubscriptionKey(clientId, subscriptionName);
+        String containerName = getSubscriptionContainerName(subscriberKey);
+        subscriberContainer.remove(subscriberKey);
+        TopicSubContainer container = subscriberMessages.remove(subscriberKey);
         for (Iterator i = container.iterator(); i.hasNext();) {
             ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
             if (ref != null) {
@@ -305,12 +315,18 @@
                 }
             }
         }
-        store.deleteListContainer(destination, "topic-subs-references-" + key);
+        store.deleteMapContainer(containerName);
     }
 
     protected String getSubscriptionKey(String clientId, String subscriberName) {
-        String result = clientId + ":";
-        result += subscriberName != null ? subscriberName : "NOT_SET";
-        return result;
+        StringBuffer buffer = new StringBuffer();
+        buffer.append(clientId).append(":");  
+        String name = subscriberName != null ? subscriberName : "NOT_SET";
+        return buffer.append(name).toString();
+    }
+    
+    private String getSubscriptionContainerName(String subscriptionKey) {
+        StringBuffer buffer = new StringBuffer(subscriptionKey);
+        return buffer.append(":").append(destination.getQualifiedName()).append(TOPIC_SUB_NAME).toString();
     }
 }
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
index 8497fea..5e4b576 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
@@ -19,7 +19,7 @@
 import java.util.Iterator;
 
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.StoreEntry;
 
 /**
@@ -28,11 +28,11 @@
  * @version $Revision: 1.10 $
  */
 public class TopicSubContainer {
-    private transient ListContainer listContainer;
+    private transient MapContainer mapContainer;
     private transient StoreEntry batchEntry;
 
-    public TopicSubContainer(ListContainer container) {
-        this.listContainer = container;
+    public TopicSubContainer(MapContainer container) {
+        this.mapContainer = container;
     }
 
     /**
@@ -55,75 +55,56 @@
     }
 
     public boolean isEmpty() {
-        return listContainer.isEmpty();
+        return mapContainer.isEmpty();
     }
 
     public StoreEntry add(ConsumerMessageRef ref) {
-        return listContainer.placeLast(ref);
+        return mapContainer.place(ref.getMessageId(),ref);
     }
 
     public ConsumerMessageRef remove(MessageId id) {
         ConsumerMessageRef result = null;
-        if (!listContainer.isEmpty()) {
-            StoreEntry entry = listContainer.getFirst();
-            while (entry != null) {
-                ConsumerMessageRef ref = (ConsumerMessageRef)listContainer.get(entry);
-                listContainer.remove(entry);
-                if (listContainer != null && batchEntry != null && (listContainer.isEmpty() || batchEntry.equals(entry))) {
-                    reset();
-                }
-                if (ref != null && ref.getMessageId().equals(id)) {
-                    result = ref;
-                    break;
-                }
-                entry = listContainer.getFirst();
+        StoreEntry entry = mapContainer.getEntry(id);
+        if (entry != null) {
+            result = (ConsumerMessageRef) mapContainer.getValue(entry);
+            mapContainer.remove(entry);
+            if (batchEntry != null && batchEntry.equals(entry)) {
+                reset();
             }
         }
+        if(mapContainer.isEmpty()) {
+            reset();
+        }
         return result;
     }
     
-    public ConsumerMessageRef removeFirst() {
-		ConsumerMessageRef result = null;
-		if (!listContainer.isEmpty()) {
-			StoreEntry entry = listContainer.getFirst();
-
-			result = (ConsumerMessageRef) listContainer.get(entry);
-			listContainer.remove(entry);
-			if (listContainer != null && batchEntry != null
-					&& (listContainer.isEmpty() || batchEntry.equals(entry))) {
-				reset();
-			}
-
-		}
-		return result;
-	}
-
+    
     public ConsumerMessageRef get(StoreEntry entry) {
-        return (ConsumerMessageRef)listContainer.get(entry);
+        return (ConsumerMessageRef)mapContainer.getValue(entry);
     }
 
     public StoreEntry getEntry() {
-        return listContainer.getFirst();
+        return mapContainer.getFirst();
     }
 
     public StoreEntry refreshEntry(StoreEntry entry) {
-        return listContainer.refresh(entry);
+        return mapContainer.refresh(entry);
     }
 
     public StoreEntry getNextEntry(StoreEntry entry) {
-        return listContainer.getNext(entry);
+        return mapContainer.getNext(entry);
     }
 
     public Iterator iterator() {
-        return listContainer.iterator();
+        return mapContainer.values().iterator();
     }
 
     public int size() {
-        return listContainer.size();
+        return mapContainer.size();
     }
 
     public void clear() {
         reset();
-        listContainer.clear();
+        mapContainer.clear();
     }
 }
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
index dfb2f55..6953415 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
@@ -74,13 +74,18 @@
 
         if (!commandSent.get()) {
             LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
-            try {
-                synchronized (writeChecker) {
-                    next.oneway(new KeepAliveInfo());
-                }
-            } catch (IOException e) {
-                onException(e);
-            }
+            // TODO: use a thread pool for this..
+            Thread thread = new Thread("ActiveMQ: Activity Generator: "+next.getRemoteAddress()) {
+                public void run() {
+                    try {
+                        oneway(new KeepAliveInfo());
+                    } catch (IOException e) {
+                        onException(e);
+                    }
+                };
+            };
+            thread.setDaemon(true);
+            thread.start();
         } else {
             LOG.trace("Message sent since last write check, resetting flag");
         }
@@ -96,9 +101,16 @@
 
         if (!commandReceived.get()) {
             LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
-            synchronized (readChecker) {
-                onException(new InactivityIOException("Channel was inactive for too long."));
-            }
+
+            // TODO: use a thread pool for this..
+            Thread thread = new Thread("ActiveMQ: Inactivity Handler: "+next.getRemoteAddress()) {
+                public void run() {
+                        onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
+                };
+            };
+            thread.setDaemon(true);
+            thread.start();
+
         } else {
             LOG.trace("Message received since last read check, resetting flag: ");
         }
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
index 281dce0..183d5fd 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
@@ -104,6 +104,7 @@
     private Map<String, Object> socketOptions;
     private Boolean keepAlive;
     private Boolean tcpNoDelay;
+    private Thread runnerThread;
 
     /**
      * Connect to a remote Node - e.g. a Broker
@@ -165,6 +166,7 @@
      */
     public void run() {
         LOG.trace("TCP consumer thread starting");
+        this.runnerThread=Thread.currentThread();
         try {
             while (!isStopped()) {
                 doRun();
@@ -436,7 +438,7 @@
     public void stop() throws Exception {
         super.stop();
         CountDownLatch countDownLatch = stoppedLatch.get();
-        if (countDownLatch != null) {
+        if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
             countDownLatch.await();
         }
     }
diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java b/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
index 9c9e2d7..8c2ce06 100755
--- a/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
+++ b/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
@@ -118,15 +118,15 @@
         if (value == 0) {
             return;
         }
-        if (parent != null) {
-            ((MemoryUsage)parent).increaseUsage(value);
-        }
         int percentUsage;
         synchronized (usageMutex) {
             usage += value;
             percentUsage = caclPercentUsage();
         }
         setPercentUsage(percentUsage);
+        if (parent != null) {
+            ((MemoryUsage)parent).increaseUsage(value);
+        }
     }
 
     /**
@@ -138,15 +138,15 @@
         if (value == 0) {
             return;
         }
-        if (parent != null) {
-            parent.decreaseUsage(value);
-        }
         int percentUsage;
         synchronized (usageMutex) {
             usage -= value;
             percentUsage = caclPercentUsage();
         }
         setPercentUsage(percentUsage);
+        if (parent != null) {
+            parent.decreaseUsage(value);
+        }
     }
 
     protected long retrieveUsage() {
diff --git a/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java b/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
index 30e245f..9d88f35 100755
--- a/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
+++ b/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
@@ -30,6 +30,7 @@
     private int firstIndex = -1;
     private int firstBin = -1;
     private long lastBitSet=-1;
+    private long lastInOrderBit=-1;
 
     /**
      * Create a BitArrayBin to a certain window size (number of messages to
@@ -76,10 +77,15 @@
      * @return true if next message is in order
      */
     public boolean isInOrder(long index) {
-        if (lastBitSet== -1) {
-            return true;
+        boolean result = false;
+        if (lastInOrderBit == -1) {
+            result = true;
+        } else {
+            result = lastInOrderBit + 1 == index;
         }
-        return lastBitSet+1==index;
+        lastInOrderBit = index;
+        return result;
+
     }
 
     /**
diff --git a/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java b/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java
index 374d356..c2257c1 100644
--- a/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java
+++ b/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java
@@ -23,7 +23,7 @@
  * @version $Revision$
  */
 public final class IOHelper {
-
+    protected static final int MAX_FILE_NAME_LENGTH;
     private IOHelper() {
     }
 
@@ -74,6 +74,10 @@
                 rc.append(HexSupport.toHexFromInt(c, true));
             }
         }
+        String result = rc.toString();
+        if (result.length() > MAX_FILE_NAME_LENGTH) {
+            result = result.substring(0,MAX_FILE_NAME_LENGTH);
+        }
         return rc.toString();
     }
 
@@ -120,6 +124,10 @@
             throw new IOException("Failed to move " + src + " to " + targetDirectory);
         }
     }
+    
+    static {
+        MAX_FILE_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumFileNameLength","200")).intValue();             
+    }
 
    
 }
diff --git a/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java b/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
index 1157d31..5ac7d0c 100755
--- a/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
@@ -105,6 +105,7 @@
             String id = idGen.generateId();
             if (i==0) {
                 assertFalse(audit.isDuplicate(id));
+                assertTrue(audit.isInOrder(id));
             }
             if (i > 1 && i%2 != 0) {
                 list.add(id);
diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
index 34f6f0d..2c4dbcf 100755
--- a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
@@ -63,6 +63,8 @@
         MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
         consumerConnection.start();
         consumerConnection.close();
+        broker.stop();
+        broker =createBroker(false);
         
         Connection producerConnection = factory.createConnection();
        
@@ -79,7 +81,8 @@
             }
         }
         producerConnection.close();
-        
+        broker.stop();
+        broker =createBroker(false);
         
         consumerConnection = factory.createConnection();
         consumerConnection.setClientID(CONSUMER_NAME);
@@ -102,7 +105,7 @@
     
     protected void setUp() throws Exception {
         if (broker == null) {
-            broker = createBroker();
+            broker = createBroker(true);
         }
         
        
@@ -128,19 +131,19 @@
      * 
      * @throws Exception
      */
-    protected BrokerService createBroker() throws Exception {
+    protected BrokerService createBroker(boolean deleteStore) throws Exception {
         BrokerService answer = new BrokerService();
-        configureBroker(answer);
+        configureBroker(answer,deleteStore);
         answer.start();
         return answer;
     }
 
     
 
-    protected void configureBroker(BrokerService answer) throws Exception {
+    protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception {
+        answer.setDeleteAllMessagesOnStartup(deleteStore);
         answer.addConnector(bindAddress);
-        answer.setDeleteAllMessagesOnStartup(true);
-        answer.setUseShutdownHook(false);
+        answer.setUseShutdownHook(true);
     }
 
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
index 39a739e..99aa98a 100644
--- a/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
@@ -123,10 +123,7 @@
                 WorkMessage work = (WorkMessage)((ObjectMessage)message).getObject();
                 
                 long c = counter.incrementAndGet();
-                if (c % 1 == 0) {
-                    System.out.println("Worker now has message count of: " + c);
-                }
-
+                
                 // Don't create a new work item for every BATCH_SIZE message. */
                 if (c % BATCH_SIZE != 0)
                 {
diff --git a/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedListTest.java b/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedListTest.java
new file mode 100644
index 0000000..033c35c
--- /dev/null
+++ b/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedListTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.kaha.impl.DataManager;
+import org.apache.activemq.kaha.impl.data.DataManagerImpl;
+import org.apache.activemq.kaha.impl.data.Item;
+import org.apache.activemq.util.IOHelper;
+
+
+public class DiskIndexLinkedListTest extends VMIndexLinkedListTest {
+
+    private IndexManager im;
+    protected IndexLinkedList createList(IndexItem root) throws IOException {
+        String dirName = System.getProperty("basedir", ".") + "/target/activemq-data/testIndex";
+        File file = new File(dirName);
+        file.mkdirs();
+        IOHelper.deleteChildren(file);
+        DataManager dm = new DataManagerImpl(file,"test",new AtomicLong());
+        im = new IndexManager(file,"test","rw",dm,new AtomicLong());
+        root = im.createNewIndex();
+        im.storeIndex(root);
+        return new DiskIndexLinkedList(im,root);
+    }
+    
+    IndexItem createIndex(IndexLinkedList indexList,int offset) throws IOException {
+        IndexItem result =  im.createNewIndex();
+        im.storeIndex(result);
+        return result;
+    }
+    
+    protected void addToList(IndexLinkedList list,IndexItem item) throws IOException {
+        IndexItem root = list.getRoot();
+        IndexItem prev = list.getLast();
+        prev = prev != null ? prev : root;
+        IndexItem next = list.getNextEntry(prev);
+        prev.setNextItem(item.getOffset());
+        item.setPreviousItem(prev.getOffset());
+        im.updateIndexes(prev);
+        if (next != null) {
+            next.setPreviousItem(item.getOffset());
+            item.setNextItem(next.getOffset());
+            im.updateIndexes(next);
+        }
+        im.storeIndex(item);
+        list.add(item);
+    }
+    
+    protected void insertToList(IndexLinkedList list,int pos,IndexItem item) throws IOException {
+        IndexItem root = list.getRoot();
+        IndexItem prev = null;
+        IndexItem next = null;
+        if (pos <= 0) {
+            prev = root;
+            next = list.getNextEntry(root);
+        } else if (pos >= list.size()) {
+            prev = list.getLast();
+            if (prev==null) {
+                prev=root;
+            }
+            next = null;
+        } else {
+            prev = list.get(pos);
+            prev = prev != null ? prev : root;
+            next = list.getNextEntry(prev);
+        }
+        prev.setNextItem(item.getOffset());
+        item.setPreviousItem(prev.getOffset());
+        im.updateIndexes(prev);
+        if (next != null) {
+            next.setPreviousItem(item.getOffset());
+            item.setNextItem(next.getOffset());
+            im.updateIndexes(next);
+        }
+        im.storeIndex(item);
+        list.setRoot(root);
+        list.add(pos,item);
+    }
+    
+    protected void insertFirst(IndexLinkedList list,IndexItem item) throws IOException {
+        IndexItem root = list.getRoot();
+        IndexItem prev = root;
+        IndexItem next = list.getNextEntry(prev);
+        prev.setNextItem(item.getOffset());
+        item.setPreviousItem(prev.getOffset());
+        im.updateIndexes(prev);
+        if (next != null) {
+            next.setPreviousItem(item.getOffset());
+            item.setNextItem(next.getOffset());
+            im.updateIndexes(next);
+        }
+        im.storeIndex(item);
+        list.addFirst(item);
+    }
+    
+    protected synchronized void remove(IndexLinkedList list,IndexItem item) throws IOException {
+        IndexItem root = list.getRoot();
+        IndexItem prev = list.getPrevEntry(item);
+        IndexItem next = list.getNextEntry(item);
+        list.remove(item);
+
+        prev = prev == null ? root : prev;
+        next = (next == null || !next.equals(root)) ? next : null;
+       
+        if (next != null) {
+            prev.setNextItem(next.getOffset());
+            next.setPreviousItem(prev.getOffset());
+            im.updateIndexes(next);
+        } else {
+            prev.setNextItem(Item.POSITION_NOT_SET);
+        }
+        im.updateIndexes(prev);
+    }
+}
diff --git a/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java b/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
index 2d0040b..63dff4b 100644
--- a/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
@@ -16,30 +16,34 @@
  */
 package org.apache.activemq.kaha.impl.index;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 import junit.framework.TestCase;
 import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.kaha.impl.data.Item;
 
 /**
  * @version $Revision: 1.2 $
  */
 public class VMIndexLinkedListTest extends TestCase {
-    static final int NUMBER = 10;
+    static final int NUMBER = 30;
     private IndexItem root;
-    private List testData = new ArrayList();
+    private List<IndexItem> testData = new ArrayList<IndexItem>();
     private IndexLinkedList list;
 
     protected void setUp() throws Exception {
         super.setUp();
+        
+        IndexItem item = new IndexItem();
+        list = createList(item);
+        this.root = list.getRoot();
+        
         for (int i = 0; i < NUMBER; i++) {
-            IndexItem item = new IndexItem();
-            item.setOffset(i);
+            item = createIndex(list,i);
             testData.add(item);
         }
-        root = new IndexItem();
-        list = new VMIndexLinkedList(root);
     }
 
     protected void tearDown() throws Exception {
@@ -47,95 +51,125 @@
         testData.clear();
         list = null;
     }
+    
+    IndexItem createIndex(IndexLinkedList list,int offset) throws IOException {
+        IndexItem result =  new IndexItem();
+        result.setOffset(offset);
+        return result;
+    }
+    protected IndexLinkedList createList(IndexItem root) throws IOException {
+        return new VMIndexLinkedList(root);
+    }
+    
+    protected void addToList(IndexLinkedList list,IndexItem item) throws IOException {
+        list.add(item);
+    }
+    
+    protected void insertToList(IndexLinkedList list,int pos,IndexItem item) throws IOException {
+        list.add(pos, item);
+    }
+    
+    protected void insertFirst(IndexLinkedList list,IndexItem item) throws IOException {
+        list.addFirst(item);
+    }
+    
+    protected synchronized void remove(IndexLinkedList list,IndexItem item) throws IOException {
+        IndexItem root = list.getRoot();
+        IndexItem prev = list.getPrevEntry(item);
+        IndexItem next = list.getNextEntry(item);
+        list.remove(item);
+    }
+    
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.getFirst()'
      */
-    public void testGetFirst() {
+    public void testGetFirst() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.add((IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
         }
-        assertTrue(list.getFirst() == testData.get(0));
+        assertNotNull(list.getFirst());
+        assertTrue(list.getFirst().equals(testData.get(0)));
     }
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.getLast()'
      */
-    public void testGetLast() {
+    public void testGetLast() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.add((IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
         }
         assertTrue(list.getLast() == testData.get(testData.size() - 1));
     }
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.removeFirst()'
      */
-    public void testRemoveFirst() {
+    public void testRemoveFirst() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.add((IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
         }
-        assertTrue(list.removeFirst() == testData.get(0));
+        assertTrue(list.removeFirst().equals(testData.get(0)));
     }
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.removeLast()'
      */
-    public void testRemoveLast() {
+    public void testRemoveLast() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.add((IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
         }
-        assertTrue(list.removeLast() == testData.get(testData.size() - 1));
+        assertTrue(list.removeLast().equals(testData.get(testData.size() - 1)));
     }
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.addFirst(IndexItem)'
      */
-    public void testAddFirst() {
+    public void testAddFirst() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.addFirst((IndexItem)testData.get(i));
+            insertFirst(list, testData.get(i));
         }
         int count = 0;
         for (int i = testData.size() - 1; i >= 0; i--) {
-            assertTrue(testData.get(i) == list.get(count++));
+            assertTrue(testData.get(i).equals(list.get(count++)));
         }
     }
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.addLast(IndexItem)'
      */
-    public void testAddLast() {
+    public void testAddLast() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.addLast((IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
         }
         for (int i = 0; i < testData.size(); i++) {
-            assertTrue(testData.get(i) == list.get(i));
+            assertTrue(testData.get(i).equals(list.get(i)));
         }
     }
 
     /*
-     * Test method for 'org.apache.activemq.kaha.impl.VMIndexLinkedList.size()'
+     * test method for 'org.apache.activemq.kaha.impl.VMIndexLinkedList.size()'
      */
-    public void testSize() {
+    public void testSize() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.addLast((IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
             assertTrue(list.size() == i + 1);
         }
     }
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.isEmpty()'
      */
-    public void testIsEmpty() {
+    public void testIsEmpty() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.addLast((IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
             assertTrue(list.size() == i + 1);
         }
         list.clear();
@@ -143,24 +177,24 @@
     }
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.add(IndexItem)'
      */
-    public void testAddIndexItem() {
+    public void testAddIndexItem() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.add((IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
         }
         for (int i = 0; i < testData.size(); i++) {
-            assertTrue(testData.get(i) == list.get(i));
+            assertTrue(testData.get(i).equals(list.get(i)));
         }
     }
 
     /*
-     * Test method for 'org.apache.activemq.kaha.impl.VMIndexLinkedList.clear()'
+     * test method for 'org.apache.activemq.kaha.impl.VMIndexLinkedList.clear()'
      */
-    public void testClear() {
+    public void testClear() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.addLast((IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
             assertTrue(list.size() == i + 1);
         }
         list.clear();
@@ -168,32 +202,32 @@
     }
 
     /*
-     * Test method for 'org.apache.activemq.kaha.impl.VMIndexLinkedList.add(int,
+     * test method for 'org.apache.activemq.kaha.impl.VMIndexLinkedList.add(int,
      * IndexItem)'
      */
-    public void testAddIntIndexItem() {
-        for (int i = 0; i < testData.size(); i++) {
-            list.add(i, (IndexItem)testData.get(i));
+    public void testAddIntIndexItem() throws IOException {
+        for (int i = 0; i < this.testData.size(); i++) {
+            insertToList(list, i, testData.get(i));
         }
         for (int i = 0; i < testData.size(); i++) {
-            assertTrue(testData.get(i) == list.get(i));
+            assertTrue(testData.get(i).equals(list.get(i)));
         }
     }
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.remove(int)'
      */
-    public void testRemoveInt() {
+    public void testRemoveInt() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.add(i, (IndexItem)testData.get(i));
+            insertToList(list, i, testData.get(i));
         }
         for (int i = 0; i < testData.size(); i++) {
             list.remove(0);
         }
         assertTrue(list.isEmpty());
         for (int i = 0; i < testData.size(); i++) {
-            list.add(i, (IndexItem)testData.get(i));
+            insertToList(list, i, testData.get(i));
         }
         for (int i = 0; i < testData.size(); i++) {
             list.remove(list.size() - 1);
@@ -202,63 +236,81 @@
     }
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.indexOf(IndexItem)'
      */
-    public void testIndexOf() {
+    public void testIndexOf() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.add(i, (IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
         }
         for (int i = 0; i < testData.size(); i++) {
-            assertTrue(list.indexOf((StoreEntry)testData.get(i)) == i);
+            assertTrue(list.indexOf(testData.get(i)) == i);
         }
     }
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.getNextEntry(IndexItem)'
      */
-    public void testGetNextEntry() {
+    public void testGetNextEntry() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.add(i, (IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
         }
         IndexItem next = list.getFirst();
         int count = 0;
         while (next != null) {
-            assertTrue(next == testData.get(count++));
+            assertTrue(next.equals(testData.get(count++)));
             next = list.getNextEntry(next);
-            assertTrue(next != root);
+            assertTrue(next == null || !next.equals(root));
         }
     }
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.getPrevEntry(IndexItem)'
      */
-    public void testGetPrevEntry() {
+    public void testGetPrevEntry() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.add(i, (IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
         }
         IndexItem next = list.getLast();
         int count = testData.size() - 1;
         while (next != null) {
-            assertTrue(next == testData.get(count--));
+            assertTrue(next.equals(testData.get(count--)));
             next = list.getPrevEntry(next);
-            assertTrue(next != root);
+            assertTrue(next == null || !next.equals(root));
         }
     }
 
     /*
-     * Test method for
+     * test method for
      * 'org.apache.activemq.kaha.impl.VMIndexLinkedList.remove(IndexItem)'
      */
-    public void testRemoveIndexItem() {
+    public void testRemoveIndexItem() throws IOException {
         for (int i = 0; i < testData.size(); i++) {
-            list.add(i, (IndexItem)testData.get(i));
+            addToList(list,testData.get(i));
         }
         for (int i = 0; i < testData.size(); i++) {
-            list.remove((IndexItem)testData.get(i));
+            list.remove(testData.get(i));
             assertTrue(list.size() == testData.size() - i - 1);
         }
     }
+    
+    public void testAddRemove() throws IOException {
+        IndexItem a = createIndex(list,0);
+        addToList(list, a);
+        IndexItem b = createIndex(list,1);
+        addToList(list, b);
+        IndexItem c = createIndex(list,2);
+        addToList(list, c);
+        IndexItem d = createIndex(list,3);
+        addToList(list, d);
+        remove(list, d);
+        assertTrue(list.getLast().equals(c));
+        assertTrue(list.getNextEntry(b).equals(c));
+        remove(list, b);
+        assertTrue(list.getNextEntry(a).equals(c));
+        assertTrue(list.getLast().equals(c));
+        
+    }
 }
diff --git a/activemq-web-demo/src/main/webapp/index.html b/activemq-web-demo/src/main/webapp/index.html
index 1aa5e9a..726bbd7 100755
--- a/activemq-web-demo/src/main/webapp/index.html
+++ b/activemq-web-demo/src/main/webapp/index.html
@@ -38,7 +38,7 @@
 </p>
 
 <p>
-<a href="portfolio/portfolio.html">Porfolio</a> example shows how you could make an interactive trading portfolio which
+<a href="portfolio/portfolio.html">Portfolio</a> example shows how you could make an interactive trading portfolio which
 updates in real time as the market prices change
 </p>