Merge from trunk

git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-config-store-changes@1584926 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index aae0a56..a58bc27 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -29,15 +29,15 @@
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.DurableConfigurationRecoverer;
+import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
 import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
-import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider;
+import org.apache.qpid.server.virtualhost.MessageStoreRecoverer;
 import org.apache.qpid.server.virtualhost.State;
-import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 import com.sleepycat.je.rep.StateChangeEvent;
@@ -98,17 +98,12 @@
         {
             _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
 
-            DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(this);
-
-            DurableConfigurationRecoverer configRecoverer =
-                    new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
-                            upgraderProvider, getEventLogger());
-            _messageStore.recoverConfigurationStore(configRecoverer);
+            ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
+            _messageStore.visitConfiguredObjectRecords(upgraderRecoverer);
 
             initialiseModel();
 
-            VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject());
-            _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
+            new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
 
             attainActivation();
         }
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index c64bc43..652e4c1 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -27,8 +27,6 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
@@ -38,29 +36,23 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.Event;
 import org.apache.qpid.server.store.EventListener;
 import org.apache.qpid.server.store.EventManager;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMemoryMessage;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Xid;
 import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
 import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
 import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
 import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
 import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
 import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
 import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding;
@@ -70,6 +62,10 @@
 import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
 import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
 import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.util.FileUtils;
 
@@ -129,7 +125,6 @@
     private long _persistentSizeHighThreshold;
 
     private final EventManager _eventManager = new EventManager();
-    private final String _type;
 
     private final EnvironmentFacadeFactory _environmentFacadeFactory;
 
@@ -143,7 +138,6 @@
 
     public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
     {
-        _type = environmentFacadeFactory.getType();
         _environmentFacadeFactory = environmentFacadeFactory;
     }
 
@@ -160,18 +154,19 @@
         {
             if (_environmentFacade == null)
             {
-                String[] databaseNames = null;
+                EnvironmentFacadeTask[] initialisationTasks = null;
                 if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false))
                 {
-                    databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
+                    String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
                     System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length);
                     System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length);
+                    initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask() };
                 }
                 else
                 {
-                    databaseNames = CONFIGURATION_STORE_DATABASE_NAMES;
+                    initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)};
                 }
-                _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames));
+                _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks);
             }
             else
             {
@@ -181,11 +176,88 @@
     }
 
     @Override
-    public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
     {
         checkConfigurationStoreOpen();
 
-        recoverConfig(recoveryHandler);
+        try
+        {
+            int configVersion = getConfigVersion();
+
+            handler.begin(configVersion);
+            doVisitAllConfiguredObjectRecords(handler);
+
+            int newConfigVersion = handler.end();
+            if(newConfigVersion != configVersion)
+            {
+                updateConfigVersion(newConfigVersion);
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e);
+        }
+
+    }
+
+    private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
+    {
+        Map<UUID, BDBConfiguredObjectRecord> configuredObjects = new HashMap<UUID, BDBConfiguredObjectRecord>();
+        Cursor objectsCursor = null;
+        Cursor hierarchyCursor = null;
+        try
+        {
+            objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+
+
+            while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+
+                BDBConfiguredObjectRecord configuredObject =
+                        (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
+                configuredObjects.put(configuredObject.getId(), configuredObject);
+            }
+
+            // set parents
+            hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
+            while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
+                UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
+                BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
+                if(child != null)
+                {
+                    ConfiguredObjectRecord parent = configuredObjects.get(parentId);
+                    if(parent != null)
+                    {
+                        child.addParent(hk.getParentType(), parent);
+                    }
+                    else if(hk.getParentType().equals("Exchange"))
+                    {
+                        // TODO - remove this hack for the pre-defined exchanges
+                        child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
+                    }
+                }
+            }
+        }
+        finally
+        {
+            closeCursorSafely(objectsCursor);
+            closeCursorSafely(hierarchyCursor);
+        }
+
+        for (ConfiguredObjectRecord record : configuredObjects.values())
+        {
+            boolean shoudlContinue = handler.handle(record);
+            if (!shoudlContinue)
+            {
+                break;
+            }
+        }
+
     }
 
     @Override
@@ -209,7 +281,8 @@
 
             if (_environmentFacade == null)
             {
-                _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask());
+                _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings,
+                        new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask());
             }
 
             _committer = _environmentFacade.createCommitter(parent.getName());
@@ -218,21 +291,6 @@
     }
 
     @Override
-    public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException
-    {
-        checkMessageStoreOpen();
-
-        if(messageRecoveryHandler != null)
-        {
-            recoverMessages(messageRecoveryHandler);
-        }
-        if(transactionLogRecoveryHandler != null)
-        {
-            recoverQueueEntries(transactionLogRecoveryHandler);
-        }
-    }
-
-    @Override
     public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
     {
         checkMessageStoreOpen();
@@ -314,27 +372,6 @@
         }
     }
 
-    private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) throws StoreException
-    {
-        try
-        {
-            final int configVersion = getConfigVersion();
-            recoveryHandler.beginConfigurationRecovery(this, configVersion);
-            loadConfiguredObjects(recoveryHandler);
-
-            final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
-            if(newConfigVersion != configVersion)
-            {
-                updateConfigVersion(newConfigVersion);
-            }
-        }
-        catch (DatabaseException e)
-        {
-            throw _environmentFacade.handleDatabaseException("Error recovering persistent state: " + e.getMessage(), e);
-        }
-
-    }
-
     @SuppressWarnings("resource")
     private void updateConfigVersion(int newConfigVersion) throws StoreException
     {
@@ -399,62 +436,6 @@
         }
     }
 
-    private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException, StoreException
-    {
-        Cursor objectsCursor = null;
-        Cursor hierarchyCursor = null;
-        try
-        {
-            objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            DatabaseEntry value = new DatabaseEntry();
-
-            Map<UUID, BDBConfiguredObjectRecord> configuredObjects =
-                    new HashMap<UUID, BDBConfiguredObjectRecord>();
-
-            while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-
-                BDBConfiguredObjectRecord configuredObject =
-                        (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
-                configuredObjects.put(configuredObject.getId(), configuredObject);
-            }
-
-            // set parents
-            hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
-            while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
-                UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
-                BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
-                if(child != null)
-                {
-                    ConfiguredObjectRecord parent = configuredObjects.get(parentId);
-                    if(parent != null)
-                    {
-                        child.addParent(hk.getParentType(), parent);
-                    }
-                    else if(hk.getParentType().equals("Exchange"))
-                    {
-                        // TODO - remove this hack for the pre-defined exchanges
-                        child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
-                    }
-                }
-            }
-
-            for (ConfiguredObjectRecord record : configuredObjects.values())
-            {
-                crh.configuredObject(record);
-            }
-        }
-        finally
-        {
-            closeCursorSafely(objectsCursor);
-            closeCursorSafely(hierarchyCursor);
-        }
-    }
-
     private void closeCursorSafely(Cursor cursor) throws StoreException
     {
         if (cursor != null)
@@ -470,124 +451,6 @@
         }
     }
 
-    private void recoverMessages(MessageStoreRecoveryHandler msrh) throws StoreException
-    {
-        StoredMessageRecoveryHandler mrh = msrh.begin();
-
-        Cursor cursor = null;
-        try
-        {
-            cursor = getMessageMetaDataDb().openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            DatabaseEntry value = new DatabaseEntry();
-            MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
-
-            long maxId = 0;
-
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                long messageId = LongBinding.entryToLong(key);
-                StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
-
-                mrh.message(message);
-
-                maxId = Math.max(maxId, messageId);
-            }
-
-            _messageId.set(maxId);
-            mrh.completeMessageRecovery();
-        }
-        catch (DatabaseException e)
-        {
-            throw _environmentFacade.handleDatabaseException("Cannot recover messages", e);
-        }
-        finally
-        {
-            closeCursorSafely(cursor);
-        }
-    }
-
-    private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
-    throws StoreException
-    {
-        QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
-
-        ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
-
-        Cursor cursor = null;
-        try
-        {
-            cursor = getDeliveryDb().openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-
-            DatabaseEntry value = new DatabaseEntry();
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                QueueEntryKey qek = keyBinding.entryToObject(key);
-
-                entries.add(qek);
-            }
-
-            try
-            {
-                cursor.close();
-            }
-            finally
-            {
-                cursor = null;
-            }
-
-            for(QueueEntryKey entry : entries)
-            {
-                UUID queueId = entry.getQueueId();
-                long messageId = entry.getMessageId();
-                qerh.queueEntry(queueId, messageId);
-            }
-        }
-        catch (DatabaseException e)
-        {
-            throw _environmentFacade.handleDatabaseException("Cannot recover queue entries", e);
-        }
-        finally
-        {
-            closeCursorSafely(cursor);
-        }
-
-        TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
-
-        cursor = null;
-        try
-        {
-            cursor = getXidDb().openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            XidBinding keyBinding = XidBinding.getInstance();
-            PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
-            DatabaseEntry value = new DatabaseEntry();
-
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                Xid xid = keyBinding.entryToObject(key);
-                PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
-                dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
-                                preparedTransaction.getEnqueues(),preparedTransaction.getDequeues());
-            }
-
-        }
-        catch (DatabaseException e)
-        {
-            throw _environmentFacade.handleDatabaseException("Cannot recover transactions", e);
-        }
-        finally
-        {
-            closeCursorSafely(cursor);
-        }
-
-
-        dtxrh.completeDtxRecordRecovery();
-    }
 
     void removeMessage(long messageId, boolean sync) throws StoreException
     {
@@ -738,6 +601,12 @@
     public void create(ConfiguredObjectRecord configuredObject) throws StoreException
     {
         checkConfigurationStoreOpen();
+
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Create " + configuredObject);
+        }
+
         com.sleepycat.je.Transaction txn = null;
         try
         {
@@ -831,7 +700,7 @@
     {
         if (LOGGER.isDebugEnabled())
         {
-            LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId());
+            LOGGER.debug("Updating, creating " + createIfNecessary + " : "  + record);
         }
 
         DatabaseEntry key = new DatabaseEntry();
@@ -889,8 +758,7 @@
             if (LOGGER.isDebugEnabled())
             {
                 LOGGER.debug("Enqueuing message " + messageId + " on queue "
-                        + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
-                        + " in transaction " + tx);
+                        + queue.getName() + " with id " + queue.getId() + " in transaction " + tx);
             }
             getDeliveryDb().put(tx, key, value);
         }
@@ -898,8 +766,7 @@
         {
             LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
             throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue "
-                    + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
-                    + " to database", e);
+                    + queue.getName() + " with id " + queue.getId() + " to database", e);
         }
     }
 
@@ -924,7 +791,7 @@
         if (LOGGER.isDebugEnabled())
         {
             LOGGER.debug("Dequeue message id " + messageId + " from queue "
-                    + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+                    + queue.getName() + " with id " + id);
         }
 
         try
@@ -934,19 +801,18 @@
             if (status == OperationStatus.NOTFOUND)
             {
                 throw new StoreException("Unable to find message with id " + messageId + " on queue "
-                        + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+                        + queue.getName() + " with id "  + id);
             }
             else if (status != OperationStatus.SUCCESS)
             {
                 throw new StoreException("Unable to remove message with id " + messageId + " on queue"
-                        + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+                        + queue.getName() + " with id " + id);
             }
 
             if (LOGGER.isDebugEnabled())
             {
                 LOGGER.debug("Removed message " + messageId + " on queue "
-                        + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id
-                        + " from delivery db");
+                        + queue.getName() + " with id " + id);
 
             }
         }
@@ -1072,57 +938,6 @@
     }
 
     /**
-     * Primarily for testing purposes.
-     *
-     * @param queueId
-     *
-     * @return a list of message ids for messages enqueued for a particular queue
-     */
-    List<Long> getEnqueuedMessages(UUID queueId) throws StoreException
-    {
-        Cursor cursor = null;
-        try
-        {
-            cursor = getDeliveryDb().openCursor(null, null);
-
-            DatabaseEntry key = new DatabaseEntry();
-
-            QueueEntryKey dd = new QueueEntryKey(queueId, 0);
-
-            QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-            keyBinding.objectToEntry(dd, key);
-
-            DatabaseEntry value = new DatabaseEntry();
-
-            LinkedList<Long> messageIds = new LinkedList<Long>();
-
-            OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
-            dd = keyBinding.entryToObject(key);
-
-            while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId))
-            {
-
-                messageIds.add(dd.getMessageId());
-                status = cursor.getNext(key, value, LockMode.DEFAULT);
-                if (status == OperationStatus.SUCCESS)
-                {
-                    dd = keyBinding.entryToObject(key);
-                }
-            }
-
-            return messageIds;
-        }
-        catch (DatabaseException e)
-        {
-            throw new StoreException("Database error: " + e.getMessage(), e);
-        }
-        finally
-        {
-            closeCursorSafely(cursor);
-        }
-    }
-
-    /**
      * Return a valid, currently unused message id.
      *
      * @return A fresh message id.
@@ -1792,12 +1607,6 @@
         }
     }
 
-    @Override
-    public String getStoreType()
-    {
-        return _type;
-    }
-
     private Database getConfiguredObjectsDb()
     {
         return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME);
@@ -1901,4 +1710,147 @@
         }
 
     }
+
+    public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler
+    {
+        private long _maxId;
+
+        @Override
+        public void execute(EnvironmentFacade facade)
+        {
+            visitMessagesInternal(this, facade);
+            _messageId.set(_maxId);
+        }
+
+        @Override
+        public boolean handle(StoredMessage<?> storedMessage)
+        {
+            long id = storedMessage.getMessageNumber();
+            if (_maxId<id)
+            {
+                _maxId = id;
+            }
+            return true;
+        }
+
+    }
+
+    @Override
+    public void visitMessages(MessageHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+        visitMessagesInternal(handler, _environmentFacade);
+    }
+
+    private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade)
+    {
+        Cursor cursor = null;
+        try
+        {
+            cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+            MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
+
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                long messageId = LongBinding.entryToLong(key);
+                StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+
+                if (!handler.handle(message))
+                {
+                    break;
+                }
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw environmentFacade.handleDatabaseException("Cannot recover messages", e);
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                try
+                {
+                    cursor.close();
+                }
+                catch(DatabaseException e)
+                {
+                    throw environmentFacade.handleDatabaseException("Cannot close cursor", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+
+        Cursor cursor = null;
+        try
+        {
+            cursor = getDeliveryDb().openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+
+            DatabaseEntry value = new DatabaseEntry();
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                QueueEntryKey entry = keyBinding.entryToObject(key);
+                UUID queueId = entry.getQueueId();
+                long messageId = entry.getMessageId();
+                if (!handler.handle(queueId, messageId))
+                {
+                    break;
+                }
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e);
+        }
+        finally
+        {
+            closeCursorSafely(cursor);
+        }
+    }
+
+    @Override
+    public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+
+        Cursor cursor = null;
+        try
+        {
+            cursor = getXidDb().openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            XidBinding keyBinding = XidBinding.getInstance();
+            PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+            DatabaseEntry value = new DatabaseEntry();
+
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                Xid xid = keyBinding.entryToObject(key);
+                PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
+                if (!handler.handle(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
+                                preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()))
+                {
+                    break;
+                }
+            }
+
+        }
+        catch (DatabaseException e)
+        {
+            throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e);
+        }
+        finally
+        {
+            closeCursorSafely(cursor);
+        }
+    }
 }
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
deleted file mode 100644
index bed7575..0000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.store.berkeleydb.entry;
-
-public class Xid
-{
-
-    private final long _format;
-    private final byte[] _globalId;
-    private final byte[] _branchId;
-
-    public Xid(long format, byte[] globalId, byte[] branchId)
-    {
-        _format = format;
-        _globalId = globalId;
-        _branchId = branchId;
-    }
-
-    public long getFormat()
-    {
-        return _format;
-    }
-
-    public byte[] getGlobalId()
-    {
-        return _globalId;
-    }
-
-    public byte[] getBranchId()
-    {
-        return _branchId;
-    }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
index 01a5b75..5918e5a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
@@ -25,7 +25,7 @@
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.Xid;
 
 public class XidBinding extends TupleBinding<Xid>
 {
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
similarity index 64%
rename from qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
rename to qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 465c49e..6fba1b2 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -20,14 +20,11 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -35,25 +32,15 @@
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10;
 import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
 import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
 import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreTest;
+import org.apache.qpid.server.store.MessageStoreTestCase;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageAcceptMode;
@@ -62,15 +49,31 @@
 import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.util.FileUtils;
 
 /**
- * Subclass of MessageStoreTest which runs the standard tests from the superclass against
+ * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
  * the BDB Store as well as additional tests specific to the BDB store-implementation.
  */
-public class BDBMessageStoreTest extends MessageStoreTest
+public class BDBMessageStoreTest extends MessageStoreTestCase
 {
     private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
 
+    private String _storeLocation;
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            super.tearDown();
+        }
+        finally
+        {
+            deleteStoreIfExists();
+        }
+    }
+
     /**
      * Tests that message metadata and content are successfully read back from a
      * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
@@ -78,9 +81,7 @@
      */
     public void testBDBMessagePersistence() throws Exception
     {
-        MessageStore store = getVirtualHost().getMessageStore();
-
-        BDBMessageStore bdbStore = assertBDBStore(store);
+        BDBMessageStore bdbStore = (BDBMessageStore)getStore();
 
         // Create content ByteBuffers.
         // Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
@@ -133,12 +134,13 @@
         /*
          * reload the store only (read-only)
          */
-        BDBMessageStore readOnlyStore = reloadStore(bdbStore);
+        reopenStore();
 
         /*
          * Read back and validate the 0-8 message metadata and content
          */
-        StorableMessageMetaData storeableMMD_0_8 = readOnlyStore.getMessageMetaData(messageid_0_8);
+        BDBMessageStore reopenedBdbStore = (BDBMessageStore) getStore();
+        StorableMessageMetaData storeableMMD_0_8 = reopenedBdbStore.getMessageMetaData(messageid_0_8);
 
         assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal());
         assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
@@ -162,7 +164,7 @@
         assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
 
         ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ;
-        long recoveredCount_0_8 = readOnlyStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
+        long recoveredCount_0_8 = reopenedBdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
         assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8);
         String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
         assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
@@ -170,7 +172,7 @@
         /*
          * Read back and validate the 0-10 message metadata and content
          */
-        StorableMessageMetaData storeableMMD_0_10 = readOnlyStore.getMessageMetaData(messageid_0_10);
+        StorableMessageMetaData storeableMMD_0_10 = reopenedBdbStore.getMessageMetaData(messageid_0_10);
 
         assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal());
         assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
@@ -193,13 +195,13 @@
         assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
 
         ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
-        long recoveredCount = readOnlyStore.getContent(messageid_0_10, 0, recoveredContent);
+        long recoveredCount = reopenedBdbStore.getContent(messageid_0_10, 0, recoveredContent);
         assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
 
         String returnedPayloadString_0_10 = new String(recoveredContent.array());
         assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
 
-        readOnlyStore.closeMessageStore();
+        reopenedBdbStore.closeMessageStore();
     }
 
     private DeliveryProperties createDeliveryProperties_0_10()
@@ -226,28 +228,6 @@
         return msgProps_0_10;
     }
 
-    /**
-     * Close the provided store and create a new (read-only) store to read back the data.
-     *
-     * Use this method instead of reloading the virtual host like other tests in order
-     * to avoid the recovery handler deleting the message for not being on a queue.
-     */
-    private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception
-    {
-        messageStore.closeMessageStore();
-
-
-        BDBMessageStore newStore = new BDBMessageStore();
-
-        MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
-        when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
-        VirtualHost<?> virtualHost = getVirtualHostModel();
-        newStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings());
-
-        newStore.recoverMessageStore(recoveryHandler, null);
-
-        return newStore;
-    }
 
     private MessagePublishInfo createPublishInfoBody_0_8()
     {
@@ -258,20 +238,24 @@
                 return new AMQShortString("exchange12345");
             }
 
+            @Override
             public void setExchange(AMQShortString exchange)
             {
             }
 
+            @Override
             public boolean isImmediate()
             {
                 return false;
             }
 
+            @Override
             public boolean isMandatory()
             {
                 return true;
             }
 
+            @Override
             public AMQShortString getRoutingKey()
             {
                 return new AMQShortString("routingKey12345");
@@ -298,9 +282,8 @@
 
     public void testGetContentWithOffset() throws Exception
     {
-        MessageStore store = getVirtualHost().getMessageStore();
-        BDBMessageStore bdbStore = assertBDBStore(store);
-        StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+        BDBMessageStore bdbStore = (BDBMessageStore) getStore();
+        StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
         long messageid_0_8 = storedMessage_0_8.getMessageNumber();
 
         // normal case: offset is 0
@@ -350,6 +333,7 @@
         System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5);
         assertTrue("Unexpected content", Arrays.equals(expected, array));
     }
+
     /**
      * Tests that messages which are added to the store and then removed using the
      * public MessageStore interfaces are actually removed from the store by then
@@ -358,10 +342,9 @@
      */
     public void testMessageCreationAndRemoval() throws Exception
     {
-        MessageStore store = getVirtualHost().getMessageStore();
-        BDBMessageStore bdbStore = assertBDBStore(store);
+        BDBMessageStore bdbStore = (BDBMessageStore)getStore();
 
-        StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+        StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
         long messageid_0_8 = storedMessage_0_8.getMessageNumber();
 
         bdbStore.removeMessage(messageid_0_8, true);
@@ -384,13 +367,6 @@
         assertEquals("Retrieved content when none was expected",
                         0, bdbStore.getContent(messageid_0_8, 0, dst));
     }
-    private BDBMessageStore assertBDBStore(MessageStore store)
-    {
-
-        assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass());
-
-        return (BDBMessageStore) store;
-    }
 
     private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
     {
@@ -413,254 +389,48 @@
         return storedMessage_0_8;
     }
 
-    /**
-     * Tests transaction commit by utilising the enqueue and dequeue methods available
-     * in the TransactionLog interface implemented by the store, and verifying the
-     * behaviour using BDB implementation methods.
-     */
-    public void testTranCommit() throws Exception
-    {
-        MessageStore log = getVirtualHost().getMessageStore();
-
-        BDBMessageStore bdbStore = assertBDBStore(log);
-
-        final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
-        TransactionLogResource mockQueue = new TransactionLogResource()
-        {
-            @Override
-            public String getName()
-            {
-                return getId().toString();
-            }
-
-            @Override
-            public UUID getId()
-            {
-                return mockQueueId;
-            }
-
-            @Override
-            public boolean isDurable()
-            {
-                return true;
-            }
-        };
-
-        Transaction txn = log.newTransaction();
-
-        txn.enqueueMessage(mockQueue, new MockMessage(1L));
-        txn.enqueueMessage(mockQueue, new MockMessage(5L));
-        txn.commitTran();
-
-        List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
-
-        assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
-        Long val = enqueuedIds.get(0);
-        assertEquals("First Message is incorrect", 1L, val.longValue());
-        val = enqueuedIds.get(1);
-        assertEquals("Second Message is incorrect", 5L, val.longValue());
-    }
-
-
-    /**
-     * Tests transaction rollback before a commit has occurred by utilising the
-     * enqueue and dequeue methods available in the TransactionLog interface
-     * implemented by the store, and verifying the behaviour using BDB
-     * implementation methods.
-     */
-    public void testTranRollbackBeforeCommit() throws Exception
-    {
-        MessageStore log = getVirtualHost().getMessageStore();
-
-        BDBMessageStore bdbStore = assertBDBStore(log);
-
-        final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
-        TransactionLogResource mockQueue = new TransactionLogResource()
-        {
-            @Override
-            public String getName()
-            {
-                return getId().toString();
-            }
-
-            @Override
-            public UUID getId()
-            {
-                return mockQueueId;
-            }
-
-            @Override
-            public boolean isDurable()
-            {
-                return true;
-            }
-        };
-
-        Transaction txn = log.newTransaction();
-
-        txn.enqueueMessage(mockQueue, new MockMessage(21L));
-        txn.abortTran();
-
-        txn = log.newTransaction();
-        txn.enqueueMessage(mockQueue, new MockMessage(22L));
-        txn.enqueueMessage(mockQueue, new MockMessage(23L));
-        txn.commitTran();
-
-        List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
-
-        assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
-        Long val = enqueuedIds.get(0);
-        assertEquals("First Message is incorrect", 22L, val.longValue());
-        val = enqueuedIds.get(1);
-        assertEquals("Second Message is incorrect", 23L, val.longValue());
-    }
-
     public void testOnDelete() throws Exception
     {
-        MessageStore log = getVirtualHost().getMessageStore();
-        BDBMessageStore bdbStore = assertBDBStore(log);
-        String storeLocation = bdbStore.getStoreLocation();
+        String storeLocation = getStore().getStoreLocation();
 
         File location = new File(storeLocation);
         assertTrue("Store does not exist at " + storeLocation, location.exists());
 
-        bdbStore.closeMessageStore();
+        getStore().closeMessageStore();
         assertTrue("Store does not exist at " + storeLocation, location.exists());
 
-        bdbStore.onDelete();
+        getStore().onDelete();
         assertFalse("Store exists at " + storeLocation, location.exists());
     }
 
-    /**
-     * Tests transaction rollback after a commit has occurred by utilising the
-     * enqueue and dequeue methods available in the TransactionLog interface
-     * implemented by the store, and verifying the behaviour using BDB
-     * implementation methods.
-     */
-    public void testTranRollbackAfterCommit() throws Exception
+
+    @Override
+    protected Map<String, Object> getStoreSettings() throws Exception
     {
-        MessageStore log = getVirtualHost().getMessageStore();
+        _storeLocation = TMP_FOLDER + File.separator + getTestName();
+        deleteStoreIfExists();
+        Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+        messageStoreSettings.put(MessageStore.STORE_PATH, _storeLocation);
+        return messageStoreSettings;
 
-        BDBMessageStore bdbStore = assertBDBStore(log);
-
-        final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
-        TransactionLogResource mockQueue = new TransactionLogResource()
-        {
-            @Override
-            public String getName()
-            {
-                return getId().toString();
-            }
-
-            @Override
-            public UUID getId()
-            {
-                return mockQueueId;
-            }
-
-            @Override
-            public boolean isDurable()
-            {
-                return true;
-            }
-        };
-
-        Transaction txn = log.newTransaction();
-
-        txn.enqueueMessage(mockQueue, new MockMessage(30L));
-        txn.commitTran();
-
-        txn = log.newTransaction();
-        txn.enqueueMessage(mockQueue, new MockMessage(31L));
-        txn.abortTran();
-
-        txn = log.newTransaction();
-        txn.enqueueMessage(mockQueue, new MockMessage(32L));
-        txn.commitTran();
-
-        List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
-
-        assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
-        Long val = enqueuedIds.get(0);
-        assertEquals("First Message is incorrect", 30L, val.longValue());
-        val = enqueuedIds.get(1);
-        assertEquals("Second Message is incorrect", 32L, val.longValue());
     }
 
-    @SuppressWarnings("rawtypes")
-    private static class MockMessage implements ServerMessage, EnqueueableMessage
+    private void deleteStoreIfExists()
     {
-        private long _messageId;
-
-        public MockMessage(long messageId)
+        if (_storeLocation != null)
         {
-            _messageId = messageId;
-        }
-
-        public String getInitialRoutingAddress()
-        {
-            return null;
-        }
-
-        public AMQMessageHeader getMessageHeader()
-        {
-            return null;
-        }
-
-        public StoredMessage getStoredMessage()
-        {
-            return null;
-        }
-
-        public boolean isPersistent()
-        {
-            return true;
-        }
-
-        public long getSize()
-        {
-            return 0;
-        }
-
-        public boolean isImmediate()
-        {
-            return false;
-        }
-
-        public long getExpiration()
-        {
-            return 0;
-        }
-
-        public MessageReference newReference()
-        {
-            return null;
-        }
-
-        public long getMessageNumber()
-        {
-            return _messageId;
-        }
-
-        public long getArrivalTime()
-        {
-            return 0;
-        }
-
-        public int getContent(ByteBuffer buf, int offset)
-        {
-            return 0;
-        }
-
-        public ByteBuffer getContent(int offset, int length)
-        {
-            return null;
-        }
-
-        @Override
-        public Object getConnectionReference()
-        {
-            return null;
+            File location = new File(_storeLocation);
+            if (location.exists())
+            {
+                FileUtils.delete(location, true);
+            }
         }
     }
+
+    @Override
+    protected MessageStore createMessageStore()
+    {
+        return new BDBMessageStore();
+    }
+
 }
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
index 0460b1c..717534a 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
@@ -48,7 +48,7 @@
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.Xid;
 import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
 import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
 import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java
index 1446cca..b7b672f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java
@@ -35,12 +35,12 @@
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.model.SystemContext;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader;
 import org.apache.qpid.server.store.NonNullUpgrader;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 
 public class BrokerStoreUpgrader
 {
@@ -583,17 +583,17 @@
 
     public Broker upgrade(DurableConfigurationStore store)
     {
-        final BrokerStoreRecoveryHandler recoveryHandler = new BrokerStoreRecoveryHandler(_systemContext);
+        final BrokerStoreRecoveryHandler recoveryHandler = new BrokerStoreRecoveryHandler(_systemContext, store);
         store.openConfigurationStore(_systemContext, Collections.<String,Object>emptyMap());
-        store.recoverConfigurationStore(recoveryHandler);
+        store.visitConfiguredObjectRecords(recoveryHandler);
 
         return recoveryHandler.getBroker();
     }
 
 
-    private static class BrokerStoreRecoveryHandler implements ConfigurationRecoveryHandler
+    private static class BrokerStoreRecoveryHandler implements ConfiguredObjectRecordHandler
     {
-        private static Logger LOGGER = Logger.getLogger(ConfigurationRecoveryHandler.class);
+        private static Logger LOGGER = Logger.getLogger(BrokerStoreRecoveryHandler.class);
 
         private DurableConfigurationStoreUpgrader _upgrader;
         private DurableConfigurationStore _store;
@@ -601,27 +601,28 @@
         private int _version;
         private final SystemContext _systemContext;
 
-        private BrokerStoreRecoveryHandler(final SystemContext systemContext)
+        private BrokerStoreRecoveryHandler(final SystemContext systemContext, DurableConfigurationStore store)
         {
             _systemContext = systemContext;
+            _store = store;
         }
 
 
         @Override
-        public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+        public void begin(final int configVersion)
         {
-            _store = store;
             _version = configVersion;
         }
 
         @Override
-        public void configuredObject(final ConfiguredObjectRecord object)
+        public boolean handle(final ConfiguredObjectRecord object)
         {
             _records.put(object.getId(), object);
+            return true;
         }
 
         @Override
-        public int completeConfigurationRecovery()
+        public int end()
         {
             String version = getCurrentVersion();
 
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
index 7024068..59f248c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
@@ -20,14 +20,6 @@
  */
 package org.apache.qpid.server.configuration.store;
 
-import org.apache.qpid.server.configuration.ConfigurationEntry;
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.StoreException;
-
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -35,6 +27,14 @@
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
 public class JsonConfigurationEntryStore extends MemoryConfigurationEntryStore
 {
     public static final String STORE_TYPE = "json";
@@ -124,30 +124,31 @@
         else
         {
             final Collection<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>();
-            final ConfigurationRecoveryHandler replayHandler = new ConfigurationRecoveryHandler()
+            final ConfiguredObjectRecordHandler replayHandler = new ConfiguredObjectRecordHandler()
             {
                 private int _configVersion;
                 @Override
-                public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+                public void begin(final int configVersion)
                 {
                     _configVersion = configVersion;
                 }
 
                 @Override
-                public void configuredObject(ConfiguredObjectRecord record)
+                public boolean handle(ConfiguredObjectRecord record)
                 {
                     records.add(record);
+                    return true;
                 }
 
                 @Override
-                public int completeConfigurationRecovery()
+                public int end()
                 {
                     return _configVersion;
                 }
             };
 
             initialStore.openConfigurationStore(_parentObject, Collections.<String,Object>emptyMap());
-            initialStore.recoverConfigurationStore(replayHandler);
+            initialStore.visitConfiguredObjectRecords(replayHandler);
 
             update(true, records.toArray(new ConfiguredObjectRecord[records.size()]));
         }
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java
index 21fffea..cdf4482 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java
@@ -38,11 +38,11 @@
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.server.util.MapValueConverter;
 
 public class ManagementModeStoreHandler implements DurableConfigurationStore
@@ -80,20 +80,21 @@
 
 
         _records = new HashMap<UUID, ConfiguredObjectRecord>();
-        final ConfigurationRecoveryHandler localRecoveryHandler = new ConfigurationRecoveryHandler()
+        final ConfiguredObjectRecordHandler localRecoveryHandler = new ConfiguredObjectRecordHandler()
         {
             private int _version;
             private boolean _quiesceRmiPort = _options.getManagementModeRmiPortOverride() > 0;
             private boolean _quiesceJmxPort = _options.getManagementModeJmxPortOverride() > 0;
             private boolean _quiesceHttpPort = _options.getManagementModeHttpPortOverride() > 0;
+
             @Override
-            public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+            public void begin(final int configVersion)
             {
                 _version = configVersion;
             }
 
             @Override
-            public void configuredObject(final ConfiguredObjectRecord object)
+            public boolean handle(final ConfiguredObjectRecord object)
             {
                 String entryType = object.getType();
                 Map<String, Object> attributes = object.getAttributes();
@@ -153,11 +154,12 @@
                 {
                     _records.put(object.getId(), object);
                 }
+                return true;
             }
 
 
             @Override
-            public int completeConfigurationRecovery()
+            public int end()
             {
                 return _version;
             }
@@ -166,7 +168,7 @@
 
 
 
-        _store.recoverConfigurationStore(localRecoveryHandler);
+        _store.visitConfiguredObjectRecords(localRecoveryHandler);
 
         _cliEntries = createPortsFromCommandLineOptions(_options);
 
@@ -179,17 +181,20 @@
     }
 
     @Override
-    public void recoverConfigurationStore(final ConfigurationRecoveryHandler recoveryHandler) throws StoreException
+    public void visitConfiguredObjectRecords(final ConfiguredObjectRecordHandler recoveryHandler) throws StoreException
     {
 
 
-        recoveryHandler.beginConfigurationRecovery(this,0);
+        recoveryHandler.begin(0);
 
         for(ConfiguredObjectRecord record : _records.values())
         {
-            recoveryHandler.configuredObject(record);
+            if(!recoveryHandler.handle(record))
+            {
+                break;
+            }
         }
-        recoveryHandler.completeConfigurationRecovery();
+        recoveryHandler.end();
     }
 
 
@@ -357,16 +362,16 @@
         final int managementModeJmxPortOverride = options.getManagementModeJmxPortOverride();
         final int managementModeHttpPortOverride = options.getManagementModeHttpPortOverride();
 
-        _store.recoverConfigurationStore(new ConfigurationRecoveryHandler()
+        _store.visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler()
         {
             @Override
-            public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+            public void begin(final int configVersion)
             {
 
             }
 
             @Override
-            public void configuredObject(final ConfiguredObjectRecord entry)
+            public boolean handle(final ConfiguredObjectRecord entry)
             {
                 String entryType = entry.getType();
                 Map<String, Object> attributes = entry.getAttributes();
@@ -417,11 +422,12 @@
                     // save original state
                     quiescedEntries.put(entry.getId(), attributes.get(ATTRIBUTE_STATE));
                 }
+                return true;
             }
 
 
             @Override
-            public int completeConfigurationRecovery()
+            public int end()
             {
                 return 0;
             }
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
index b4f095b..d534814 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
@@ -58,10 +58,9 @@
 import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.model.SystemContext;
 import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.util.Strings;
 import org.apache.qpid.util.Strings.ChainedResolver;
 
@@ -128,30 +127,31 @@
                 _storeLocation = initialStore.getStoreLocation();
             }
             final Collection<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>();
-            final ConfigurationRecoveryHandler replayHandler = new ConfigurationRecoveryHandler()
+            final ConfiguredObjectRecordHandler replayHandler = new ConfiguredObjectRecordHandler()
             {
                 private int _configVersion;
                 @Override
-                public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+                public void begin(final int configVersion)
                 {
                     _configVersion = configVersion;
                 }
 
                 @Override
-                public void configuredObject(ConfiguredObjectRecord record)
+                public boolean handle(ConfiguredObjectRecord record)
                 {
                     records.add(record);
+                    return true;
                 }
 
                 @Override
-                public int completeConfigurationRecovery()
+                public int end()
                 {
                     return _configVersion;
                 }
             };
 
             initialStore.openConfigurationStore(parentObject, Collections.<String,Object>emptyMap());
-            initialStore.recoverConfigurationStore(replayHandler);
+            initialStore.visitConfiguredObjectRecords(replayHandler);
 
             update(true, records.toArray(new ConfiguredObjectRecord[records.size()]));
 
@@ -365,10 +365,10 @@
     }
 
     @Override
-    public void recoverConfigurationStore(final ConfigurationRecoveryHandler recoveryHandler) throws StoreException
+    public void visitConfiguredObjectRecords(final ConfiguredObjectRecordHandler recoveryHandler) throws StoreException
     {
 
-        recoveryHandler.beginConfigurationRecovery(this,0);
+        recoveryHandler.begin(0);
 
         final Map<UUID,Map<String,UUID>> parentMap = new HashMap<UUID, Map<String, UUID>>();
 
@@ -435,9 +435,12 @@
         }
         for(ConfiguredObjectRecord record : records.values())
         {
-            recoveryHandler.configuredObject(record);
+            if(!recoveryHandler.handle(record))
+            {
+                break;
+            }
         }
-        recoveryHandler.completeConfigurationRecovery();
+        recoveryHandler.end();
 
     }
 
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index e7b6ada..6be5460 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -49,7 +49,10 @@
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonParseException;
@@ -221,19 +224,125 @@
     }
 
     @Override
-    public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
     {
         checkConfigurationStoreOpen();
 
         try
         {
-            recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
-            loadConfiguredObjects(recoveryHandler);
-            setConfigVersion(recoveryHandler.completeConfigurationRecovery());
+            int configVersion = getConfigVersion();
+
+            handler.begin(configVersion);
+            doVisitAllConfiguredObjectRecords(handler);
+
+            int newConfigVersion = handler.end();
+            if(newConfigVersion != configVersion)
+            {
+                setConfigVersion(newConfigVersion);
+            }
         }
         catch (SQLException e)
         {
-            throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+            throw new StoreException("Cannot visit configured object records", e);
+        }
+
+    }
+
+    private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException
+    {
+        Connection conn = newAutoCommitConnection();
+        Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
+        final ObjectMapper objectMapper = new ObjectMapper();
+        try
+        {
+            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+            try
+            {
+                ResultSet rs = stmt.executeQuery();
+                try
+                {
+                    while (rs.next())
+                    {
+                        String id = rs.getString(1);
+                        String objectType = rs.getString(2);
+                        String attributes = getBlobAsString(rs, 3);
+                        final ConfiguredObjectRecordImpl configuredObjectRecord =
+                                new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
+                                                               objectMapper.readValue(attributes, Map.class));
+                        configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
+
+                    }
+                }
+                catch (JsonMappingException e)
+                {
+                    throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+                }
+                catch (JsonParseException e)
+                {
+                    throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+                }
+                catch (IOException e)
+                {
+                    throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+            stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+            try
+            {
+                ResultSet rs = stmt.executeQuery();
+                try
+                {
+                    while (rs.next())
+                    {
+                        UUID childId = UUID.fromString(rs.getString(1));
+                        String parentType = rs.getString(2);
+                        UUID parentId = UUID.fromString(rs.getString(3));
+
+                        ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
+                        ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
+
+                        if(child != null && parent != null)
+                        {
+                            child.addParent(parentType, parent);
+                        }
+                        else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
+                        {
+                            // TODO - remove this hack for amq. exchanges
+                            child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
+                        }
+                    }
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+        }
+        finally
+        {
+            conn.close();
+        }
+
+        for(ConfiguredObjectRecord record : configuredObjects.values())
+        {
+            boolean shoudlContinue = handler.handle(record);
+            if (!shoudlContinue)
+            {
+                break;
+            }
         }
     }
 
@@ -282,6 +391,20 @@
             {
                 createOrOpenMessageStoreDatabase();
                 upgradeIfNecessary(parent);
+
+                visitMessages(new MessageHandler()
+                {
+                    @Override
+                    public boolean handle(StoredMessage<?> storedMessage)
+                    {
+                        long id = storedMessage.getMessageNumber();
+                        if (_messageId.get() < id)
+                        {
+                            _messageId.set(id);
+                        }
+                        return true;
+                    }
+                });
             }
             catch (SQLException e)
             {
@@ -290,39 +413,6 @@
         }
     }
 
-    @Override
-    public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
-    {
-        checkMessageStoreOpen();
-
-        if(messageRecoveryHandler != null)
-        {
-            try
-            {
-                recoverMessages(messageRecoveryHandler);
-            }
-            catch (SQLException e)
-            {
-                throw new StoreException("Error encountered when restoring message data from " +
-                                                       "persistent store ", e);
-            }
-        }
-        if(transactionLogRecoveryHandler != null)
-        {
-            try
-            {
-                TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(transactionLogRecoveryHandler);
-                recoverXids(dtxrh);
-            }
-            catch (SQLException e)
-            {
-                throw new StoreException("Error encountered when restoring distributed transaction " +
-                                                       "data from persistent store ", e);
-            }
-
-        }
-    }
-
     protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException
     {
         Connection conn = newAutoCommitConnection();
@@ -1043,11 +1133,9 @@
                 getLogger().debug("Enqueuing message "
                                    + messageId
                                    + " on queue "
-                                   + (queue instanceof AMQQueue
-                                      ? ((AMQQueue) queue).getName()
-                                      : "")
-                                   + queue.getId()
-                                   + "[Connection"
+                                   + queue.getName()
+                                   + " with id " + queue.getId()
+                                   + " [Connection"
                                    + conn
                                    + "]");
             }
@@ -1068,7 +1156,7 @@
         catch (SQLException e)
         {
             getLogger().error("Failed to enqueue: " + e.getMessage(), e);
-            throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()
+            throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId()
                 + " to database", e);
         }
 
@@ -1093,15 +1181,13 @@
 
                 if(results != 1)
                 {
-                    throw new StoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+                    throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName()
                            + " with id " + queue.getId());
                 }
 
                 if (getLogger().isDebugEnabled())
                 {
-                    getLogger().debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue
-                                                                                          ? ((AMQQueue) queue).getName()
-                                                                                          : "")
+                    getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName()
                                        + " with id " + queue.getId());
                 }
             }
@@ -1114,7 +1200,7 @@
         catch (SQLException e)
         {
             getLogger().error("Failed to dequeue: " + e.getMessage(), e);
-            throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+            throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName()
                     + " with id " + queue.getId() + " from database", e);
         }
 
@@ -1363,131 +1449,6 @@
 
     }
 
-    private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
-    {
-        Connection conn = newAutoCommitConnection();
-        try
-        {
-            MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
-
-            Statement stmt = conn.createStatement();
-            try
-            {
-                ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
-                try
-                {
-
-                    long maxId = 0;
-
-                    while(rs.next())
-                    {
-
-                        long messageId = rs.getLong(1);
-                        if(messageId > maxId)
-                        {
-                            maxId = messageId;
-                        }
-
-                        byte[] dataAsBytes = getBlobAsBytes(rs, 2);
-
-                        ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
-                        buf.position(1);
-                        buf = buf.slice();
-                        MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
-                        StorableMessageMetaData metaData = type.createMetaData(buf);
-                        StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
-                        messageHandler.message(message);
-                    }
-
-                    _messageId.set(maxId);
-
-                    messageHandler.completeMessageRecovery();
-                }
-                finally
-                {
-                    rs.close();
-                }
-            }
-            finally
-            {
-                stmt.close();
-            }
-        }
-        finally
-        {
-            conn.close();
-        }
-    }
-
-
-    private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
-    {
-        Connection conn = newAutoCommitConnection();
-        try
-        {
-            TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
-
-            Statement stmt = conn.createStatement();
-            try
-            {
-                ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
-                try
-                {
-                    while(rs.next())
-                    {
-
-                        String id = rs.getString(1);
-                        long messageId = rs.getLong(2);
-                        queueEntryHandler.queueEntry(UUID.fromString(id), messageId);
-                    }
-                }
-                finally
-                {
-                    rs.close();
-                }
-            }
-            finally
-            {
-                stmt.close();
-            }
-
-            return queueEntryHandler.completeQueueEntryRecovery();
-        }
-        finally
-        {
-            conn.close();
-        }
-    }
-
-    private static final class Xid
-    {
-
-        private final long _format;
-        private final byte[] _globalId;
-        private final byte[] _branchId;
-
-        public Xid(long format, byte[] globalId, byte[] branchId)
-        {
-            _format = format;
-            _globalId = globalId;
-            _branchId = branchId;
-        }
-
-        public long getFormat()
-        {
-            return _format;
-        }
-
-        public byte[] getGlobalId()
-        {
-            return _globalId;
-        }
-
-        public byte[] getBranchId()
-        {
-            return _branchId;
-        }
-    }
 
     private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage
     {
@@ -1550,93 +1511,6 @@
         }
     }
 
-    private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
-    {
-        Connection conn = newAutoCommitConnection();
-        try
-        {
-            List<Xid> xids = new ArrayList<Xid>();
-
-            Statement stmt = conn.createStatement();
-            try
-            {
-                ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
-                try
-                {
-                    while(rs.next())
-                    {
-
-                        long format = rs.getLong(1);
-                        byte[] globalId = rs.getBytes(2);
-                        byte[] branchId = rs.getBytes(3);
-                        xids.add(new Xid(format, globalId, branchId));
-                    }
-                }
-                finally
-                {
-                    rs.close();
-                }
-            }
-            finally
-            {
-                stmt.close();
-            }
-
-
-
-            for(Xid xid : xids)
-            {
-                List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
-                List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
-
-                PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
-
-                try
-                {
-                    pstmt.setLong(1, xid.getFormat());
-                    pstmt.setBytes(2, xid.getGlobalId());
-                    pstmt.setBytes(3, xid.getBranchId());
-
-                    ResultSet rs = pstmt.executeQuery();
-                    try
-                    {
-                        while(rs.next())
-                        {
-
-                            String actionType = rs.getString(1);
-                            UUID queueId = UUID.fromString(rs.getString(2));
-                            long messageId = rs.getLong(3);
-
-                            RecordImpl record = new RecordImpl(queueId, messageId);
-                            List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
-                            records.add(record);
-                        }
-                    }
-                    finally
-                    {
-                        rs.close();
-                    }
-                }
-                finally
-                {
-                    pstmt.close();
-                }
-
-                dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
-                                enqueues.toArray(new RecordImpl[enqueues.size()]),
-                                dequeues.toArray(new RecordImpl[dequeues.size()]));
-            }
-
-
-            dtxrh.completeDtxRecordRecovery();
-        }
-        finally
-        {
-            conn.close();
-        }
-
-    }
-
     private StorableMessageMetaData getMetaData(long messageId) throws SQLException
     {
 
@@ -2357,44 +2231,82 @@
         }
     }
 
-    private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException,
-                                                                                            StoreException
+    @Override
+    public void visitMessages(MessageHandler handler) throws StoreException
     {
-        Connection conn = newAutoCommitConnection();
-        Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
-        final ObjectMapper objectMapper = new ObjectMapper();
+        checkMessageStoreOpen();
+
+        Connection conn = null;
         try
         {
-            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+            conn = newAutoCommitConnection();
+            Statement stmt = conn.createStatement();
             try
             {
-                ResultSet rs = stmt.executeQuery();
+                ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
                 try
                 {
                     while (rs.next())
                     {
+                        long messageId = rs.getLong(1);
+                        byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+                        ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+                        buf.position(1);
+                        buf = buf.slice();
+                        MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+                        StorableMessageMetaData metaData = type.createMetaData(buf);
+                        StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+                        if (!handler.handle(message))
+                        {
+                            break;
+                        }
+                    }
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new StoreException("Error encountered when visiting messages", e);
+        }
+        finally
+        {
+            closeConnection(conn);
+        }
+    }
+
+    @Override
+    public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+
+        Connection conn = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+            Statement stmt = conn.createStatement();
+            try
+            {
+                ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+                try
+                {
+                    while(rs.next())
+                    {
                         String id = rs.getString(1);
-                        String objectType = rs.getString(2);
-                        String attributes = getBlobAsString(rs, 3);
-                        final ConfiguredObjectRecordImpl configuredObjectRecord =
-                                new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
-                                                               objectMapper.readValue(attributes, Map.class));
-                        configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
-
+                        long messageId = rs.getLong(2);
+                        if (!handler.handle(UUID.fromString(id), messageId))
+                        {
+                            break;
+                        }
                     }
                 }
-                catch (JsonMappingException e)
-                {
-                    throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
-                }
-                catch (JsonParseException e)
-                {
-                    throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
-                }
-                catch (IOException e)
-                {
-                    throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
-                }
                 finally
                 {
                     rs.close();
@@ -2404,31 +2316,41 @@
             {
                 stmt.close();
             }
-            stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+        }
+        catch(SQLException e)
+        {
+            throw new StoreException("Error encountered when visiting message instances", e);
+        }
+        finally
+        {
+            closeConnection(conn);
+        }
+    }
+
+    @Override
+    public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+
+        Connection conn = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+            List<Xid> xids = new ArrayList<Xid>();
+
+            Statement stmt = conn.createStatement();
             try
             {
-                ResultSet rs = stmt.executeQuery();
+                ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
                 try
                 {
-                    while (rs.next())
+                    while(rs.next())
                     {
-                        UUID childId = UUID.fromString(rs.getString(1));
-                        String parentType = rs.getString(2);
-                        UUID parentId = UUID.fromString(rs.getString(3));
 
-                        ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
-                        ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
-
-                        if(child != null && parent != null)
-                        {
-                            child.addParent(parentType, parent);
-                        }
-                        else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
-                        {
-                            // TODO - remove this hack for amq. exchanges
-                            child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
-                        }
-
+                        long format = rs.getLong(1);
+                        byte[] globalId = rs.getBytes(2);
+                        byte[] branchId = rs.getBytes(3);
+                        xids.add(new Xid(format, globalId, branchId));
                     }
                 }
                 finally
@@ -2441,18 +2363,67 @@
                 stmt.close();
             }
 
+
+
+            for(Xid xid : xids)
+            {
+                List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
+                List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
+
+                PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
+
+                try
+                {
+                    pstmt.setLong(1, xid.getFormat());
+                    pstmt.setBytes(2, xid.getGlobalId());
+                    pstmt.setBytes(3, xid.getBranchId());
+
+                    ResultSet rs = pstmt.executeQuery();
+                    try
+                    {
+                        while(rs.next())
+                        {
+
+                            String actionType = rs.getString(1);
+                            UUID queueId = UUID.fromString(rs.getString(2));
+                            long messageId = rs.getLong(3);
+
+                            RecordImpl record = new RecordImpl(queueId, messageId);
+                            List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
+                            records.add(record);
+                        }
+                    }
+                    finally
+                    {
+                        rs.close();
+                    }
+                }
+                finally
+                {
+                    pstmt.close();
+                }
+
+                if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
+                                enqueues.toArray(new RecordImpl[enqueues.size()]),
+                                dequeues.toArray(new RecordImpl[dequeues.size()])))
+                {
+                    break;
+                }
+            }
+
+        }
+        catch (SQLException e)
+        {
+            throw new StoreException("Error encountered when visiting distributed transactions", e);
+
         }
         finally
         {
-            conn.close();
-        }
-
-        for(ConfiguredObjectRecord record : configuredObjects.values())
-        {
-            recoveryHandler.configuredObject(record);
+            closeConnection(conn);
         }
     }
 
+
     protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
 
     protected abstract void storedSizeChange(int storeSizeIncrease);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
index a7e9ef2..99785c4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
@@ -20,17 +20,36 @@
  */
 package org.apache.qpid.server.store;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 
 /** A simple message store that stores the messages in a thread-safe structure in memory. */
-abstract public class AbstractMemoryMessageStore extends NullMessageStore
+abstract class AbstractMemoryMessageStore implements MessageStore, DurableConfigurationStore
 {
-    private final AtomicLong _messageId = new AtomicLong(1);
-
-    private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
+    private final class MemoryMessageStoreTransaction implements Transaction
     {
+        private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>();
+        private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>();
+
+        private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>();
+        private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
+
         @Override
         public StoreFuture commitTranAsync()
         {
@@ -40,50 +59,145 @@
         @Override
         public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
         {
+            Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
+            if (messageIds == null)
+            {
+                messageIds = new HashSet<Long>();
+                _localEnqueueMap.put(queue.getId(), messageIds);
+            }
+            messageIds.add(message.getMessageNumber());
         }
 
         @Override
-        public void dequeueMessage(TransactionLogResource  queue, EnqueueableMessage message)
+        public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
         {
+            Set<Long> messageIds = _localDequeueMap.get(queue.getId());
+            if (messageIds == null)
+            {
+                messageIds = new HashSet<Long>();
+                _localDequeueMap.put(queue.getId(), messageIds);
+            }
+            messageIds.add(message.getMessageNumber());
         }
 
         @Override
         public void commitTran()
         {
+            commitTransactionInternal(this);
+            _localEnqueueMap.clear();
+            _localDequeueMap.clear();
         }
 
         @Override
         public void abortTran()
         {
+            _localEnqueueMap.clear();
+            _localDequeueMap.clear();
         }
 
         @Override
         public void removeXid(long format, byte[] globalId, byte[] branchId)
         {
+            _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId));
         }
 
         @Override
         public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
         {
+            _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues));
         }
-    };
+    }
 
-    private final EventManager _eventManager = new EventManager();
+    private final AtomicLong _messageId = new AtomicLong(1);
 
+    private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>();
 
+    protected ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>();
+
+    private Object _transactionLock = new Object();
+    private Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
+    private Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
+
+    @SuppressWarnings("unchecked")
     @Override
-    public StoredMessage addMessage(StorableMessageMetaData metaData)
+    public StoredMessage<StorableMessageMetaData> addMessage(final StorableMessageMetaData metaData)
     {
-        final long id = _messageId.getAndIncrement();
-        StoredMemoryMessage message = new StoredMemoryMessage(id, metaData);
+        long id = _messageId.getAndIncrement();
 
-        return message;
+        if(metaData.isPersistent())
+        {
+            return new StoredMemoryMessage(id, metaData)
+            {
+
+                @Override
+                public StoreFuture flushToStore()
+                {
+                    _messages.putIfAbsent(getMessageNumber(), this) ;
+                    return super.flushToStore();
+                }
+
+                @Override
+                public void remove()
+                {
+                    _messages.remove(getMessageNumber());
+                    super.remove();
+                }
+
+            };
+        }
+        else
+        {
+            return new StoredMemoryMessage(id, metaData);
+        }
+    }
+
+    private void commitTransactionInternal(MemoryMessageStoreTransaction transaction)
+    {
+        synchronized (_transactionLock )
+        {
+            for (Map.Entry<UUID, Set<Long>> loacalEnqueuedEntry : transaction._localEnqueueMap.entrySet())
+            {
+                Set<Long> messageIds = _messageInstances.get(loacalEnqueuedEntry.getKey());
+                if (messageIds == null)
+                {
+                    messageIds = new HashSet<Long>();
+                    _messageInstances.put(loacalEnqueuedEntry.getKey(), messageIds);
+                }
+                messageIds.addAll(loacalEnqueuedEntry.getValue());
+            }
+
+            for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet())
+            {
+                Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey());
+                if (messageIds != null)
+                {
+                    messageIds.removeAll(loacalDequeueEntry.getValue());
+                    if (messageIds.isEmpty())
+                    {
+                        _messageInstances.remove(loacalDequeueEntry.getKey());
+                    }
+                }
+            }
+
+            for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet())
+            {
+                _distributedTransactions.put(entry.getKey(), entry.getValue());
+            }
+
+            for (Xid removed : transaction._localDistributedTransactionsRemoves)
+            {
+                _distributedTransactions.remove(removed);
+            }
+
+        }
+
+
     }
 
     @Override
     public Transaction newTransaction()
     {
-        return IN_MEMORY_TRANSACTION;
+        return new MemoryMessageStoreTransaction();
     }
 
     @Override
@@ -95,7 +209,164 @@
     @Override
     public void addEventListener(EventListener eventListener, Event... events)
     {
-        _eventManager.addEventListener(eventListener, events);
     }
 
+    @Override
+    public void create(ConfiguredObjectRecord record)
+    {
+        if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null)
+        {
+            throw new StoreException("Record with id " + record.getId() + " is already present");
+        }
+    }
+
+    @Override
+    public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
+    {
+        for (ConfiguredObjectRecord record : records)
+        {
+            ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record);
+            if (previousValue == null && !createIfNecessary)
+            {
+                throw new StoreException("Record with id " + record.getId() + " does not exist");
+            }
+        }
+    }
+
+    @Override
+    public UUID[] remove(final ConfiguredObjectRecord... objects)
+    {
+        List<UUID> removed = new ArrayList<UUID>();
+        for (ConfiguredObjectRecord record : objects)
+        {
+            if (_configuredObjectRecords.remove(record.getId()) != null)
+            {
+                removed.add(record.getId());
+            }
+        }
+        return removed.toArray(new UUID[removed.size()]);
+    }
+
+    @Override
+    public void closeConfigurationStore()
+    {
+        _configuredObjectRecords.clear();
+    }
+
+    @Override
+    public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+    {
+    }
+
+    @Override
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+    {
+        handler.begin(VirtualHost.CURRENT_CONFIG_VERSION);
+        for (ConfiguredObjectRecord record : _configuredObjectRecords.values())
+        {
+            if (!handler.handle(record))
+            {
+                break;
+            }
+        }
+        handler.end();
+    }
+
+    @Override
+    public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
+    {
+    }
+
+    @Override
+    public void closeMessageStore()
+    {
+        _messages.clear();
+        synchronized (_transactionLock)
+        {
+            _messageInstances.clear();
+            _distributedTransactions.clear();
+        }
+    }
+
+    @Override
+    public String getStoreLocation()
+    {
+        return null;
+    }
+
+    @Override
+    public void onDelete()
+    {
+    }
+
+    @Override
+    public void visitMessages(MessageHandler handler) throws StoreException
+    {
+        for (StoredMemoryMessage message : _messages.values())
+        {
+            if(!handler.handle(message))
+            {
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+    {
+        synchronized (_transactionLock)
+        {
+            for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
+            {
+                UUID resourceId = enqueuedEntry.getKey();
+                for (Long messageId : enqueuedEntry.getValue())
+                {
+                    if (!handler.handle(resourceId, messageId))
+                    {
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+    {
+        synchronized (_transactionLock)
+        {
+            for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
+            {
+                Xid xid = entry.getKey();
+                DistributedTransactionRecords records = entry.getValue();
+                if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), records.getEnqueues(), records.getDequeues()))
+                {
+                    break;
+                }
+            }
+        }
+    }
+
+    private static final class DistributedTransactionRecords
+    {
+        private Record[] _enqueues;
+        private Record[] _dequeues;
+
+        public DistributedTransactionRecords(Record[] enqueues, Record[] dequeues)
+        {
+            super();
+            _enqueues = enqueues;
+            _dequeues = dequeues;
+        }
+
+        public Record[] getEnqueues()
+        {
+            return _enqueues;
+        }
+
+        public Record[] getDequeues()
+        {
+            return _dequeues;
+        }
+    }
 }
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java
new file mode 100644
index 0000000..85265d9
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Map;
+
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ConfiguredObjectRecordRecoveverAndUpgrader implements ConfiguredObjectRecordHandler
+{
+    private DurableConfigurationRecoverer _configRecoverer;
+    private DurableConfigurationStore _store;
+
+    public ConfiguredObjectRecordRecoveverAndUpgrader(VirtualHost virtualHost, Map<String, DurableConfiguredObjectRecoverer> recoverers)
+    {
+        DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(virtualHost);
+        _configRecoverer = new DurableConfigurationRecoverer(virtualHost.getName(), recoverers, upgraderProvider, virtualHost.getEventLogger());
+        _store = virtualHost.getDurableConfigurationStore();
+    }
+
+    @Override
+    public void begin(int configVersion)
+    {
+        _configRecoverer.beginConfigurationRecovery(_store, configVersion);
+    }
+
+    @Override
+    public boolean handle(ConfiguredObjectRecord record)
+    {
+        _configRecoverer.configuredObject(record);
+        return true;
+    }
+
+    @Override
+    public int end()
+    {
+        return _configRecoverer.completeConfigurationRecovery();
+    }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 9f610b0..7d93f18 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -20,11 +20,12 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.qpid.server.model.ConfiguredObject;
-
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
 public interface DurableConfigurationStore
 {
     String STORE_TYPE                    = "storeType";
@@ -47,12 +48,6 @@
     void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) throws StoreException;
 
     /**
-     * Recovers configuration from the store using given recovery handler
-     * @param recoveryHandler recovery handler
-     */
-    void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) throws StoreException;
-
-    /**
      * Makes the specified object persistent.
      *
      * @param object The object to persist.
@@ -85,4 +80,11 @@
 
     void closeConfigurationStore() throws StoreException;
 
+    /**
+     * Visit all configured object records with given handler.
+     *
+     * @param handler a handler to invoke on each configured object record
+     * @throws StoreException
+     */
+    void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException;
 }
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
index 819da86..a5ace16 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
@@ -31,6 +31,7 @@
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonProcessingException;
@@ -97,22 +98,27 @@
     }
 
     @Override
-    public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
     {
-        recoveryHandler.beginConfigurationRecovery(this,_configVersion);
+        handler.begin(_configVersion);
         List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values());
         for(ConfiguredObjectRecord record : records)
         {
-            recoveryHandler.configuredObject(record);
+            boolean shouldContinue = handler.handle(record);
+            if (!shouldContinue)
+            {
+                break;
+            }
         }
         int oldConfigVersion = _configVersion;
-        _configVersion = recoveryHandler.completeConfigurationRecovery();
+        _configVersion = handler.end();
         if(oldConfigVersion != _configVersion)
         {
             save();
         }
     }
 
+
     private void setup(final Map<String, Object> configurationStoreSettings)
     {
         Object storePathAttr = configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 69f9073..433f618 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -23,6 +23,9 @@
 import java.util.Map;
 
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 
 /**
  * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
@@ -43,13 +46,6 @@
      */
     void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings);
 
-    /**
-     * Called after opening to recover messages and transactions with given recovery handlers
-     * @param messageRecoveryHandler
-     * @param transactionLogRecoveryHandler
-     */
-    void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler);
-
     public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
 
 
@@ -71,8 +67,10 @@
 
     String getStoreLocation();
 
-    // TODO dead method - remove??
-    String getStoreType();
-
     void onDelete();
+
+    void visitMessages(MessageHandler handler) throws StoreException;
+    void visitMessageInstances(MessageInstanceHandler handler) throws StoreException;
+    void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException;
+
 }
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java
deleted file mode 100755
index ba65b8e..0000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.store;
-
-public interface MessageStoreRecoveryHandler
-{
-    StoredMessageRecoveryHandler begin();
-
-    public static interface StoredMessageRecoveryHandler
-    {
-        void message(StoredMessage message);
-
-        void completeMessageRecovery();
-    }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 59b4530..a3ed4be 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -23,6 +23,10 @@
 import java.util.UUID;
 
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 
 public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
 {
@@ -33,11 +37,6 @@
     }
 
     @Override
-    public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
-    {
-    }
-
-    @Override
     public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
     {
     }
@@ -92,11 +91,6 @@
     }
 
     @Override
-    public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
-    {
-    }
-
-    @Override
     public void addEventListener(EventListener eventListener, Event... events)
     {
     }
@@ -112,4 +106,24 @@
     {
     }
 
+    @Override
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+    {
+    }
+
+    @Override
+    public void visitMessages(MessageHandler handler) throws StoreException
+    {
+    }
+
+    @Override
+    public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+    {
+    }
+
+    @Override
+    public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+    {
+    }
+
 }
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
deleted file mode 100755
index bd4da64..0000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.store;
-
-import java.util.UUID;
-
-public interface TransactionLogRecoveryHandler
-{
-    QueueEntryRecoveryHandler begin(MessageStore log);
-
-    public static interface QueueEntryRecoveryHandler
-    {
-        DtxRecordRecoveryHandler completeQueueEntryRecovery();
-
-        void queueEntry(UUID queueId, long messageId);
-    }
-
-    public static interface DtxRecordRecoveryHandler
-    {
-        void dtxRecord(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues);
-
-        void completeDtxRecordRecovery();
-    }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java
new file mode 100644
index 0000000..4db3386
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Arrays;
+
+public final class Xid
+{
+    private final long _format;
+    private final byte[] _globalId;
+    private final byte[] _branchId;
+
+    public Xid(long format, byte[] globalId, byte[] branchId)
+    {
+        _format = format;
+        _globalId = globalId;
+        _branchId = branchId;
+    }
+
+    public long getFormat()
+    {
+        return _format;
+    }
+
+    public byte[] getGlobalId()
+    {
+        return _globalId;
+    }
+
+    public byte[] getBranchId()
+    {
+        return _branchId;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + Arrays.hashCode(_branchId);
+        result = prime * result + (int) (_format ^ (_format >>> 32));
+        result = prime * result + Arrays.hashCode(_globalId);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (this == obj)
+        {
+            return true;
+        }
+
+        if (obj == null)
+        {
+            return false;
+        }
+
+        if (getClass() != obj.getClass())
+        {
+            return false;
+        }
+
+        Xid other = (Xid) obj;
+
+        if (!Arrays.equals(_branchId, other._branchId))
+        {
+            return false;
+        }
+
+        if (_format != other._format)
+        {
+            return false;
+        }
+
+        if (!Arrays.equals(_globalId, other._globalId))
+        {
+            return false;
+        }
+        return true;
+    }
+
+
+}
\ No newline at end of file
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java
new file mode 100644
index 0000000..747c735
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.handler;
+
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+
+public interface ConfiguredObjectRecordHandler
+{
+    // TODO configVersion argument will be removed.
+    void begin(int configVersion);
+
+    /**
+     * Handles the given record.
+     *
+     * @param record
+     * @return false is returned if the handler does not wish to handle other record, true otherwise
+     */
+    boolean handle(ConfiguredObjectRecord record);
+
+    //TODO: return should be void
+    // temporarily returning new config version
+    int end();
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java
similarity index 61%
copy from qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
copy to qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java
index ba9b7c1..733c933 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java
@@ -18,30 +18,13 @@
  * under the License.
  *
  */
+package org.apache.qpid.server.store.handler;
 
-package org.apache.qpid.server.store;
+import org.apache.qpid.server.store.Transaction.Record;
 
-import java.util.Map;
-
-import org.apache.qpid.server.plugin.MessageStoreFactory;
-
-public class TestableMemoryMessageStoreFactory implements MessageStoreFactory
+public interface DistributedTransactionHandler
 {
-    @Override
-    public String getType()
-    {
-        return TestableMemoryMessageStore.TYPE;
-    }
 
-    @Override
-    public MessageStore createMessageStore()
-    {
-        return new TestableMemoryMessageStore();
-    }
-
-    @Override
-    public void validateAttributes(Map<String, Object> attributes)
-    {
-    }
+    boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues);
 
 }
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java
similarity index 61%
copy from qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
copy to qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java
index ba9b7c1..30c1f7b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java
@@ -18,30 +18,13 @@
  * under the License.
  *
  */
+package org.apache.qpid.server.store.handler;
 
-package org.apache.qpid.server.store;
+import org.apache.qpid.server.store.StoredMessage;
 
-import java.util.Map;
-
-import org.apache.qpid.server.plugin.MessageStoreFactory;
-
-public class TestableMemoryMessageStoreFactory implements MessageStoreFactory
+public interface MessageHandler
 {
-    @Override
-    public String getType()
-    {
-        return TestableMemoryMessageStore.TYPE;
-    }
 
-    @Override
-    public MessageStore createMessageStore()
-    {
-        return new TestableMemoryMessageStore();
-    }
-
-    @Override
-    public void validateAttributes(Map<String, Object> attributes)
-    {
-    }
+    boolean handle(StoredMessage<?> storedMessage);
 
 }
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java
similarity index 60%
copy from qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
copy to qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java
index ba9b7c1..3775ec4 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java
@@ -18,30 +18,12 @@
  * under the License.
  *
  */
+package org.apache.qpid.server.store.handler;
 
-package org.apache.qpid.server.store;
+import java.util.UUID;
 
-import java.util.Map;
-
-import org.apache.qpid.server.plugin.MessageStoreFactory;
-
-public class TestableMemoryMessageStoreFactory implements MessageStoreFactory
+public interface MessageInstanceHandler
 {
-    @Override
-    public String getType()
-    {
-        return TestableMemoryMessageStore.TYPE;
-    }
-
-    @Override
-    public MessageStore createMessageStore()
-    {
-        return new TestableMemoryMessageStore();
-    }
-
-    @Override
-    public void validateAttributes(Map<String, Object> attributes)
-    {
-    }
+    boolean handle(UUID queueId, long messageId);
 
 }
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
index 7e0562a..46b5dbb 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
@@ -78,6 +78,10 @@
 
     public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer)
     {
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Getting upgrader for configVersion:  " + configVersion);
+        }
         DurableConfigurationStoreUpgrader currentUpgrader = null;
         switch(configVersion)
         {
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
new file mode 100644
index 0000000..df47c85
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
@@ -0,0 +1,357 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.txn.DtxBranch;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.Xid;
+import org.apache.qpid.transport.util.Functions;
+
+public class MessageStoreRecoverer
+{
+    private static final Logger _logger = Logger.getLogger(MessageStoreRecoverer.class);
+
+    private final VirtualHost _virtualHost;
+
+    private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
+    private final Map<Long, ServerMessage<?>> _recoveredMessages = new HashMap<Long, ServerMessage<?>>();
+    private final Map<Long, StoredMessage<?>> _unusedMessages = new HashMap<Long, StoredMessage<?>>();
+    private final EventLogger _eventLogger;
+
+    private final MessageStoreLogSubject _logSubject;
+    private final MessageStore _store;
+
+
+    public MessageStoreRecoverer(VirtualHost virtualHost, MessageStoreLogSubject logSubject)
+    {
+        super();
+        _virtualHost = virtualHost;
+        _eventLogger = virtualHost.getEventLogger();
+        _logSubject = logSubject;
+        _store = virtualHost.getMessageStore();
+    }
+
+
+    public void recover()
+    {
+        _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START());
+        _store.visitMessages(messageVisitor);
+
+        _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
+        _store.visitMessageInstances(messageAndMessageInstanceRecoverer);
+
+        for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
+        {
+            _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
+            _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
+        }
+
+        _store.visitDistributedTransactions(distributedTransactionRecoverer);
+
+
+
+        for(StoredMessage<?> m : _unusedMessages.values())
+        {
+            _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
+            m.remove();
+        }
+        _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
+
+        _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size()));
+        _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
+
+
+    }
+
+    MessageHandler messageVisitor = new MessageHandler()
+    {
+
+        @Override
+        public boolean handle(StoredMessage<?> message)
+        {
+            StorableMessageMetaData metaData = message.getMetaData();
+
+            @SuppressWarnings("rawtypes")
+            MessageMetaDataType type = metaData.getType();
+
+            @SuppressWarnings("unchecked")
+            ServerMessage<?> serverMessage  = type.createMessage(message);
+
+            _recoveredMessages.put(message.getMessageNumber(), serverMessage);
+            _unusedMessages.put(message.getMessageNumber(), message);
+            return true;
+        }
+
+    };
+
+    MessageInstanceHandler messageAndMessageInstanceRecoverer = new MessageInstanceHandler()
+    {
+        @Override
+        public boolean handle(final UUID queueId, long messageId)
+        {
+            AMQQueue<?> queue = _virtualHost.getQueue(queueId);
+            if(queue != null)
+            {
+                String queueName = queue.getName();
+                ServerMessage<?> message = _recoveredMessages.get(messageId);
+                _unusedMessages.remove(messageId);
+
+                if(message != null)
+                {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
+                    }
+
+                    Integer count = _queueRecoveries.get(queueName);
+                    if (count == null)
+                    {
+                        count = 0;
+                    }
+
+                    queue.enqueue(message,null);
+
+                    _queueRecoveries.put(queueName, ++count);
+                }
+                else
+                {
+                    _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
+                    Transaction txn = _store.newTransaction();
+                    txn.dequeueMessage(queue, new DummyMessage(messageId));
+                    txn.commitTranAsync();
+                }
+            }
+            else
+            {
+                _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
+                Transaction txn = _store.newTransaction();
+                TransactionLogResource mockQueue =
+                        new TransactionLogResource()
+                        {
+                            @Override
+                            public String getName()
+                            {
+                                return "<<UNKNOWN>>";
+                            }
+
+                            @Override
+                            public UUID getId()
+                            {
+                                return queueId;
+                            }
+
+                            @Override
+                            public boolean isDurable()
+                            {
+                                return false;
+                            }
+                        };
+                txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
+                txn.commitTranAsync();
+            }
+            return true;
+        }
+    };
+
+    private DistributedTransactionHandler distributedTransactionRecoverer = new DistributedTransactionHandler()
+    {
+
+        @Override
+        public boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+        {
+            Xid id = new Xid(format, globalId, branchId);
+            DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
+            DtxBranch branch = dtxRegistry.getBranch(id);
+            if(branch == null)
+            {
+                branch = new DtxBranch(id, _store, _virtualHost);
+                dtxRegistry.registerBranch(branch);
+            }
+            for(Transaction.Record record : enqueues)
+            {
+                final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
+                if(queue != null)
+                {
+                    final long messageId = record.getMessage().getMessageNumber();
+                    final ServerMessage<?> message = _recoveredMessages.get(messageId);
+                    _unusedMessages.remove(messageId);
+
+                    if(message != null)
+                    {
+                        final MessageReference<?> ref = message.newReference();
+
+                        branch.enqueue(queue,message);
+
+                        branch.addPostTransactionAction(new ServerTransaction.Action()
+                        {
+
+                            public void postCommit()
+                            {
+                                queue.enqueue(message, null);
+                                ref.release();
+                            }
+
+                            public void onRollback()
+                            {
+                                ref.release();
+                            }
+                        });
+                    }
+                    else
+                    {
+                        StringBuilder xidString = xidAsString(id);
+                        _eventLogger.message(_logSubject,
+                                          TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+                                                                                       Long.toString(messageId)));
+                    }
+                }
+                else
+                {
+                    StringBuilder xidString = xidAsString(id);
+                    _eventLogger.message(_logSubject,
+                                      TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+                                                                                 record.getResource().getId().toString()));
+
+                }
+            }
+            for(Transaction.Record record : dequeues)
+            {
+                final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
+                if(queue != null)
+                {
+                    final long messageId = record.getMessage().getMessageNumber();
+                    final ServerMessage<?> message = _recoveredMessages.get(messageId);
+                    _unusedMessages.remove(messageId);
+
+                    if(message != null)
+                    {
+                        final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
+
+                        entry.acquire();
+
+                        branch.dequeue(queue, message);
+
+                        branch.addPostTransactionAction(new ServerTransaction.Action()
+                        {
+
+                            public void postCommit()
+                            {
+                                entry.delete();
+                            }
+
+                            public void onRollback()
+                            {
+                                entry.release();
+                            }
+                        });
+                    }
+                    else
+                    {
+                        StringBuilder xidString = xidAsString(id);
+                        _eventLogger.message(_logSubject,
+                                          TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+                                                                                       Long.toString(messageId)));
+
+                    }
+
+                }
+                else
+                {
+                    StringBuilder xidString = xidAsString(id);
+                    _eventLogger.message(_logSubject,
+                                      TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+                                                                                 record.getResource().getId().toString()));
+                }
+
+            }
+
+            branch.setState(DtxBranch.State.PREPARED);
+            branch.prePrepareTransaction();
+            return true;
+        }
+
+        private StringBuilder xidAsString(Xid id)
+        {
+            return new StringBuilder("(")
+                        .append(id.getFormat())
+                        .append(',')
+                        .append(Functions.str(id.getGlobalId()))
+                        .append(',')
+                        .append(Functions.str(id.getBranchId()))
+                        .append(')');
+        }
+
+
+    };
+
+
+    private static class DummyMessage implements EnqueueableMessage
+    {
+
+        private final long _messageId;
+
+        public DummyMessage(long messageId)
+        {
+            _messageId = messageId;
+        }
+
+        public long getMessageNumber()
+        {
+            return _messageId;
+        }
+
+        public boolean isPersistent()
+        {
+            return true;
+        }
+
+        public StoredMessage getStoredMessage()
+        {
+            return null;
+        }
+    }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index e3fd938..14849ae 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -29,11 +29,11 @@
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.plugin.MessageStoreFactory;
 import org.apache.qpid.server.stats.StatisticsGatherer;
-
-import org.apache.qpid.server.store.DurableConfigurationRecoverer;
+import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.DurableConfigurationStoreCreator;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 
 public class StandardVirtualHost extends AbstractVirtualHost
 {
@@ -107,18 +107,22 @@
         if (_configurationStoreLogSubject != null)
         {
             getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString()));
+            getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_START());
         }
 
-        DurableConfigurationRecoverer configRecoverer = new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
-                new DefaultUpgraderProvider(this), getEventLogger());
+        ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
 
-        _durableConfigurationStore.recoverConfigurationStore(configRecoverer);
+        _durableConfigurationStore.visitConfiguredObjectRecords(upgraderRecoverer);
+
+        if (_configurationStoreLogSubject != null)
+        {
+            getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+        }
 
         // If store does not have entries for standard exchanges (amq.*), the following will create them.
         initialiseModel();
 
-        VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getMessageStoreLogSubject());
-        _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
+        new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
 
         attainActivation();
     }
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
deleted file mode 100755
index 3216115..0000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.virtualhost;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.txn.DtxBranch;
-import org.apache.qpid.server.txn.DtxRegistry;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.transport.Xid;
-import org.apache.qpid.transport.util.Functions;
-
-public class VirtualHostConfigRecoveryHandler implements
-                                                        MessageStoreRecoveryHandler,
-                                                        MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
-                                                        TransactionLogRecoveryHandler,
-                                                        TransactionLogRecoveryHandler.QueueEntryRecoveryHandler,
-                                                        TransactionLogRecoveryHandler.DtxRecordRecoveryHandler
-{
-    private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
-
-    private final VirtualHost _virtualHost;
-
-    private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
-    private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
-    private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
-    private final EventLogger _eventLogger;
-
-    private final MessageStoreLogSubject _logSubject;
-    private MessageStore _store;
-
-    public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost, MessageStoreLogSubject logSubject)
-    {
-        _virtualHost = virtualHost;
-        _eventLogger = virtualHost.getEventLogger();
-        _logSubject = logSubject;
-    }
-
-    public VirtualHostConfigRecoveryHandler begin(MessageStore store)
-    {
-        _store = store;
-        _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
-        return this;
-    }
-
-    public StoredMessageRecoveryHandler begin()
-    {
-        _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START());
-        return this;
-    }
-
-    public void message(StoredMessage message)
-    {
-        ServerMessage serverMessage  = message.getMetaData().getType().createMessage(message);
-
-        _recoveredMessages.put(message.getMessageNumber(), serverMessage);
-        _unusedMessages.put(message.getMessageNumber(), message);
-    }
-
-    public void completeMessageRecovery()
-    {
-    }
-
-    public void dtxRecord(long format, byte[] globalId, byte[] branchId,
-                          Transaction.Record[] enqueues,
-                          Transaction.Record[] dequeues)
-    {
-        Xid id = new Xid(format, globalId, branchId);
-        DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
-        DtxBranch branch = dtxRegistry.getBranch(id);
-        if(branch == null)
-        {
-            branch = new DtxBranch(id, _store, _virtualHost);
-            dtxRegistry.registerBranch(branch);
-        }
-        for(Transaction.Record record : enqueues)
-        {
-            final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId());
-            if(queue != null)
-            {
-                final long messageId = record.getMessage().getMessageNumber();
-                final ServerMessage message = _recoveredMessages.get(messageId);
-                _unusedMessages.remove(messageId);
-
-                if(message != null)
-                {
-                    final MessageReference ref = message.newReference();
-
-
-                    branch.enqueue(queue,message);
-
-                    branch.addPostTransactionAction(new ServerTransaction.Action()
-                    {
-
-                        public void postCommit()
-                        {
-                            queue.enqueue(message, null);
-                            ref.release();
-                        }
-
-                        public void onRollback()
-                        {
-                            ref.release();
-                        }
-                    });
-                }
-                else
-                {
-                    StringBuilder xidString = xidAsString(id);
-                    _eventLogger.message(_logSubject,
-                                      TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
-                                                                                   Long.toString(messageId)));
-
-                }
-
-            }
-            else
-            {
-                StringBuilder xidString = xidAsString(id);
-                _eventLogger.message(_logSubject,
-                                  TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
-                                                                             record.getResource().getId().toString()));
-
-            }
-        }
-        for(Transaction.Record record : dequeues)
-        {
-            final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId());
-            if(queue != null)
-            {
-                final long messageId = record.getMessage().getMessageNumber();
-                final ServerMessage message = _recoveredMessages.get(messageId);
-                _unusedMessages.remove(messageId);
-
-                if(message != null)
-                {
-                    final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
-
-                    entry.acquire();
-
-                    branch.dequeue(queue, message);
-
-                    branch.addPostTransactionAction(new ServerTransaction.Action()
-                    {
-
-                        public void postCommit()
-                        {
-                            entry.delete();
-                        }
-
-                        public void onRollback()
-                        {
-                            entry.release();
-                        }
-                    });
-                }
-                else
-                {
-                    StringBuilder xidString = xidAsString(id);
-                    _eventLogger.message(_logSubject,
-                                      TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
-                                                                                   Long.toString(messageId)));
-
-                }
-
-            }
-            else
-            {
-                StringBuilder xidString = xidAsString(id);
-                _eventLogger.message(_logSubject,
-                                  TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
-                                                                             record.getResource().getId().toString()));
-            }
-
-        }
-
-        branch.setState(DtxBranch.State.PREPARED);
-        branch.prePrepareTransaction();
-    }
-
-    private static StringBuilder xidAsString(Xid id)
-    {
-        return new StringBuilder("(")
-                    .append(id.getFormat())
-                    .append(',')
-                    .append(Functions.str(id.getGlobalId()))
-                    .append(',')
-                    .append(Functions.str(id.getBranchId()))
-                    .append(')');
-    }
-
-    public void completeDtxRecordRecovery()
-    {
-        for(StoredMessage m : _unusedMessages.values())
-        {
-            _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
-            m.remove();
-        }
-        _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
-
-        _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size()));
-        _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
-    }
-
-    public void queueEntry(final UUID queueId, long messageId)
-    {
-        AMQQueue queue = _virtualHost.getQueue(queueId);
-        if(queue != null)
-        {
-            String queueName = queue.getName();
-            ServerMessage message = _recoveredMessages.get(messageId);
-            _unusedMessages.remove(messageId);
-
-            if(message != null)
-            {
-
-
-                if (_logger.isDebugEnabled())
-                {
-                    _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
-                }
-
-                Integer count = _queueRecoveries.get(queueName);
-                if (count == null)
-                {
-                    count = 0;
-                }
-
-                queue.enqueue(message,null);
-
-                _queueRecoveries.put(queueName, ++count);
-            }
-            else
-            {
-                _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
-                Transaction txn = _store.newTransaction();
-                txn.dequeueMessage(queue, new DummyMessage(messageId));
-                txn.commitTranAsync();
-            }
-        }
-        else
-        {
-            _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
-            Transaction txn = _store.newTransaction();
-            TransactionLogResource mockQueue =
-                    new TransactionLogResource()
-                    {
-                        @Override
-                        public String getName()
-                        {
-                            return "<<UNKNOWN>>";
-                        }
-
-                        @Override
-                        public UUID getId()
-                        {
-                            return queueId;
-                        }
-
-                        @Override
-                        public boolean isDurable()
-                        {
-                            return false;
-                        }
-                    };
-            txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
-            txn.commitTranAsync();
-        }
-    }
-
-    public DtxRecordRecoveryHandler completeQueueEntryRecovery()
-    {
-
-        for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
-        {
-            _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
-
-            _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
-        }
-
-        return this;
-    }
-
-    private static class DummyMessage implements EnqueueableMessage
-    {
-
-
-        private final long _messageId;
-
-        public DummyMessage(long messageId)
-        {
-            _messageId = messageId;
-        }
-
-        public long getMessageNumber()
-        {
-            return _messageId;
-        }
-
-
-        public boolean isPersistent()
-        {
-            return true;
-        }
-
-
-        public StoredMessage getStoredMessage()
-        {
-            return null;
-        }
-    }
-
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
index 0fe9d1a..1de857d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
@@ -50,10 +50,10 @@
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.SystemContext;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
 import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class ManagementModeStoreHandlerTest extends QpidTestCase
@@ -89,20 +89,22 @@
         when(_portEntry.getParents()).thenReturn(Collections.singletonMap(Broker.class.getSimpleName(), _root));
         when(_portEntry.getType()).thenReturn(Port.class.getSimpleName());
 
-        final ArgumentCaptor<ConfigurationRecoveryHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class);
+        final ArgumentCaptor<ConfiguredObjectRecordHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class);
         doAnswer(
                 new Answer()
                 {
                     @Override
                     public Object answer(final InvocationOnMock invocation) throws Throwable
                     {
-                        ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue();
-                        recoverer.configuredObject(_root);
-                        recoverer.configuredObject(_portEntry);
+                        ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue();
+                        if(recoverer.handle(_root))
+                        {
+                            recoverer.handle(_portEntry);
+                        }
                         return null;
                     }
                 }
-                ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture());
+                ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture());
         _options = new BrokerOptions();
         _handler = new ManagementModeStoreHandler(_store, _options);
 
@@ -112,21 +114,21 @@
     private ConfiguredObjectRecord getRootEntry()
     {
         BrokerFinder brokerFinder = new BrokerFinder();
-        _handler.recoverConfigurationStore(brokerFinder);
+        _handler.visitConfiguredObjectRecords(brokerFinder);
         return brokerFinder.getBrokerRecord();
     }
 
     private ConfiguredObjectRecord getEntry(UUID id)
     {
         RecordFinder recordFinder = new RecordFinder(id);
-        _handler.recoverConfigurationStore(recordFinder);
+        _handler.visitConfiguredObjectRecords(recordFinder);
         return recordFinder.getFoundRecord();
     }
 
     private Collection<UUID> getChildrenIds(ConfiguredObjectRecord record)
     {
         ChildFinder childFinder = new ChildFinder(record);
-        _handler.recoverConfigurationStore(childFinder);
+        _handler.visitConfiguredObjectRecords(childFinder);
         return childFinder.getChildIds();
     }
 
@@ -288,21 +290,25 @@
         attributes.put(VirtualHost.TYPE, "STANDARD");
 
         final ConfiguredObjectRecord virtualHost = new ConfiguredObjectRecordImpl(virtualHostId, VirtualHost.class.getSimpleName(), attributes, Collections.singletonMap(Broker.class.getSimpleName(), _root));
-        final ArgumentCaptor<ConfigurationRecoveryHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class);
+        final ArgumentCaptor<ConfiguredObjectRecordHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class);
         doAnswer(
                 new Answer()
                 {
                     @Override
                     public Object answer(final InvocationOnMock invocation) throws Throwable
                     {
-                        ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue();
-                        recoverer.configuredObject(_root);
-                        recoverer.configuredObject(_portEntry);
-                        recoverer.configuredObject(virtualHost);
+                        ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue();
+                        if(recoverer.handle(_root))
+                        {
+                            if(recoverer.handle(_portEntry))
+                            {
+                                recoverer.handle(virtualHost);
+                            }
+                        }
                         return null;
                     }
                 }
-                ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture());
+                ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture());
 
         State expectedState = mmQuiesceVhosts ? State.QUIESCED : null;
         if(mmQuiesceVhosts)
@@ -457,28 +463,32 @@
     }
 
 
-    private class BrokerFinder implements ConfigurationRecoveryHandler
+    private class BrokerFinder implements ConfiguredObjectRecordHandler
     {
         private ConfiguredObjectRecord _brokerRecord;
-        @Override
-        public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
-        {
+        private int _version;
 
+        @Override
+        public void begin(final int configVersion)
+        {
+            _version = configVersion;
         }
 
         @Override
-        public void configuredObject(final ConfiguredObjectRecord object)
+        public boolean handle(final ConfiguredObjectRecord object)
         {
             if(object.getType().equals(Broker.class.getSimpleName()))
             {
                 _brokerRecord = object;
+                return false;
             }
+            return true;
         }
 
         @Override
-        public int completeConfigurationRecovery()
+        public int end()
         {
-            return 0;
+            return _version;
         }
 
         public ConfiguredObjectRecord getBrokerRecord()
@@ -487,10 +497,11 @@
         }
     }
 
-    private class RecordFinder implements ConfigurationRecoveryHandler
+    private class RecordFinder implements ConfiguredObjectRecordHandler
     {
         private final UUID _id;
         private ConfiguredObjectRecord _foundRecord;
+        private int _version;
 
         private RecordFinder(final UUID id)
         {
@@ -498,24 +509,26 @@
         }
 
         @Override
-        public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+        public void begin(final int configVersion)
         {
-
+            _version = configVersion;
         }
 
         @Override
-        public void configuredObject(final ConfiguredObjectRecord object)
+        public boolean handle(final ConfiguredObjectRecord object)
         {
             if(object.getId().equals(_id))
             {
                 _foundRecord = object;
+                return false;
             }
+            return true;
         }
 
         @Override
-        public int completeConfigurationRecovery()
+        public int end()
         {
-            return 0;
+            return _version;
         }
 
         public ConfiguredObjectRecord getFoundRecord()
@@ -524,10 +537,11 @@
         }
     }
 
-    private class ChildFinder implements ConfigurationRecoveryHandler
+    private class ChildFinder implements ConfiguredObjectRecordHandler
     {
         private final Collection<UUID> _childIds = new HashSet<UUID>();
         private final ConfiguredObjectRecord _parent;
+        private int _version;
 
         private ChildFinder(final ConfiguredObjectRecord parent)
         {
@@ -535,13 +549,13 @@
         }
 
         @Override
-        public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+        public void begin(final int configVersion)
         {
-
+            _version = configVersion;
         }
 
         @Override
-        public void configuredObject(final ConfiguredObjectRecord object)
+        public boolean handle(final ConfiguredObjectRecord object)
         {
 
             if(object.getParents() != null)
@@ -555,12 +569,13 @@
                 }
 
             }
+            return true;
         }
 
         @Override
-        public int completeConfigurationRecovery()
+        public int end()
         {
-            return 0;
+            return _version;
         }
 
         public Collection<UUID> getChildIds()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 8305211..b38d9d7 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -21,9 +21,7 @@
 package org.apache.qpid.server.store;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -49,6 +47,7 @@
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.util.FileUtils;
 import org.mockito.ArgumentCaptor;
@@ -71,9 +70,8 @@
     private String _storePath;
     private String _storeName;
 
-    private ConfigurationRecoveryHandler _recoveryHandler;
+    private ConfiguredObjectRecordHandler _handler;
 
-    private ExchangeImpl _exchange = mock(ExchangeImpl.class);
     private static final String ROUTING_KEY = "routingKey";
     private static final String QUEUE_NAME = "queueName";
     private Map<String,Object> _bindingArgs;
@@ -96,16 +94,8 @@
         FileUtils.delete(new File(_storePath), true);
         setTestSystemProperty("QPID_WORK", TMP_FOLDER);
 
-        _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
-        when(_exchange.getName()).thenReturn(EXCHANGE_NAME);
-        when(_exchange.getId()).thenReturn(_exchangeId);
-        when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
-        when(_exchange.getEventLogger()).thenReturn(new EventLogger());
-
-        ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
-        when(exchangeRecord.getId()).thenReturn(_exchangeId);
-        when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
-        when(_exchange.asObjectRecord()).thenReturn(exchangeRecord);
+        _handler = mock(ConfiguredObjectRecordHandler.class);
+        when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true);
 
         _bindingArgs = new HashMap<String, Object>();
         String argKey = AMQPFilterTypes.JMS_SELECTOR.toString();
@@ -134,7 +124,7 @@
         DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
 
         reopenStore();
-        verify(_recoveryHandler).configuredObject(matchesRecord(_exchangeId, EXCHANGE,
+        verify(_handler).handle(matchesRecord(_exchangeId, EXCHANGE,
                 map( org.apache.qpid.server.model.Exchange.NAME, getName(),
                         org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
                         org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
@@ -168,14 +158,16 @@
         DurableConfigurationStoreHelper.removeExchange(_configStore, exchange);
 
         reopenStore();
-        verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+        verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
     }
 
     public void testBindQueue() throws Exception
     {
+        ExchangeImpl<?> exchange = createTestExchange();
         AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
         BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
-                _exchange, _bindingArgs);
+                exchange, _bindingArgs);
+        DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
         DurableConfigurationStoreHelper.createQueue(_configStore, queue);
         DurableConfigurationStoreHelper.createBinding(_configStore, binding);
 
@@ -187,10 +179,10 @@
 
         Map<String,UUID> parents = new HashMap<String, UUID>();
 
-        parents.put(Exchange.class.getSimpleName(), _exchange.getId());
+        parents.put(Exchange.class.getSimpleName(), exchange.getId());
         parents.put(Queue.class.getSimpleName(), queue.getId());
 
-        verify(_recoveryHandler).configuredObject(matchesRecord(binding.getId(), BINDING, map, parents));
+        verify(_handler).handle(matchesRecord(binding.getId(), BINDING, map, parents));
     }
 
 
@@ -260,15 +252,18 @@
 
     public void testUnbindQueue() throws Exception
     {
+        ExchangeImpl<?> exchange = createTestExchange();
+        DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
+
         AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
         BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
-                _exchange, _bindingArgs);
+                exchange, _bindingArgs);
         DurableConfigurationStoreHelper.createBinding(_configStore, binding);
 
         DurableConfigurationStoreHelper.removeBinding(_configStore, binding);
         reopenStore();
 
-        verify(_recoveryHandler, never()).configuredObject(matchesRecord(ANY_UUID, BINDING,
+        verify(_handler, never()).handle(matchesRecord(ANY_UUID, BINDING,
                                                                          ANY_MAP));
     }
 
@@ -282,7 +277,7 @@
         queueAttributes.put(Queue.NAME, getName());
         queueAttributes.put(Queue.OWNER, getName()+"Owner");
         queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
-        verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+        verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
     }
 
     public void testCreateQueueAMQQueueFieldTable() throws Exception
@@ -304,7 +299,7 @@
         queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
         queueAttributes.putAll(attributes);
 
-        verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+        verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
     }
 
     public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
@@ -322,7 +317,7 @@
         queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
         queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
 
-        verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+        verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
     }
 
     private ExchangeImpl createTestAlternateExchange()
@@ -355,7 +350,7 @@
         queueAttributes.put(Queue.NAME, getName());
         queueAttributes.putAll(attributes);
 
-        verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+        verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
 
     }
 
@@ -382,7 +377,7 @@
         queueAttributes.putAll(attributes);
         queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
 
-        verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+        verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
     }
 
     public void testRemoveQueue() throws Exception
@@ -397,7 +392,7 @@
         // remove queue
         DurableConfigurationStoreHelper.removeQueue(_configStore,queue);
         reopenStore();
-        verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+        verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
     }
 
     private AMQQueue createTestQueue(String queueName,
@@ -463,11 +458,9 @@
     {
         ExchangeImpl exchange = mock(ExchangeImpl.class);
         Map<String,Object> actualAttributes = new HashMap<String, Object>();
-        actualAttributes.put("id", _exchangeId);
         actualAttributes.put("name", getName());
         actualAttributes.put("type", getName() + "Type");
         actualAttributes.put("lifetimePolicy", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
-        when(exchange.getActualAttributes()).thenReturn(actualAttributes);
         when(exchange.getName()).thenReturn(getName());
         when(exchange.getTypeName()).thenReturn(getName() + "Type");
         when(exchange.isAutoDelete()).thenReturn(true);
@@ -475,11 +468,10 @@
         ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
         when(exchangeRecord.getId()).thenReturn(_exchangeId);
         when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
-        Map<String,Object> actualAttributesExceptId = new HashMap<String, Object>(actualAttributes);
-        actualAttributesExceptId.remove("id");
-        when(exchangeRecord.getAttributes()).thenReturn(actualAttributesExceptId);
+        when(exchangeRecord.getAttributes()).thenReturn(actualAttributes);
         when(exchange.asObjectRecord()).thenReturn(exchangeRecord);
-
+        when(exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
+        when(exchange.getEventLogger()).thenReturn(new EventLogger());
         return exchange;
     }
 
@@ -491,7 +483,7 @@
         ConfiguredObject<?> parent = mock(ConfiguredObject.class);
         when(parent.getName()).thenReturn("testName");
         _configStore.openConfigurationStore(parent, _configurationStoreSettings);
-        _configStore.recoverConfigurationStore(_recoveryHandler);
+        _configStore.visitConfiguredObjectRecords(_handler);
     }
 
     protected abstract DurableConfigurationStore createConfigStore() throws Exception;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
index 8f2d002..2400a68 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
@@ -27,10 +27,4 @@
     {
         return new JsonFileConfigStore();
     }
-
-    @Override
-    public void testBindQueue() throws Exception
-    {
-        // TODO: Temporarily disable the test as it is already fixed on trunk
-    }
 }
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
index 1de24e3..6907898 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
@@ -28,6 +28,7 @@
 
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.test.utils.TestFileUtils;
@@ -43,15 +44,15 @@
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
 
 public class JsonFileConfigStoreTest extends QpidTestCase
 {
-    private final ConfigurationRecoveryHandler _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
-
     private JsonFileConfigStore _store;
     private HashMap<String, Object> _configurationStoreSettings;
     private ConfiguredObject<?> _virtualHost;
     private File _storeLocation;
+    private ConfiguredObjectRecordHandler _handler;
 
 
     private static final UUID ANY_UUID = UUID.randomUUID();
@@ -69,6 +70,9 @@
         _configurationStoreSettings.put(JsonFileConfigStore.STORE_TYPE, JsonFileConfigStore.TYPE);
         _configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, _storeLocation.getAbsolutePath());
         _store = new JsonFileConfigStore();
+
+        _handler = mock(ConfiguredObjectRecordHandler.class);
+        when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true);
     }
 
     @Override
@@ -113,35 +117,35 @@
         }
     }
 
-    public void testStartFromNoStore() throws Exception
+    public void testVisitEmptyStore()
     {
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
-        InOrder inorder = inOrder(_recoveryHandler);
-        inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
-        inorder.verify(_recoveryHandler,never()).configuredObject(any(ConfiguredObjectRecord.class));
-        inorder.verify(_recoveryHandler).completeConfigurationRecovery();
+        _store.visitConfiguredObjectRecords(_handler);
+        InOrder inorder = inOrder(_handler);
+        inorder.verify(_handler).begin(eq(0));
+        inorder.verify(_handler,never()).handle(any(ConfiguredObjectRecord.class));
+        inorder.verify(_handler).end();
         _store.closeConfigurationStore();
     }
 
     public void testUpdatedConfigVersionIsRetained() throws Exception
     {
         final int NEW_CONFIG_VERSION = 42;
-        when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION);
+        when(_handler.end()).thenReturn(NEW_CONFIG_VERSION);
 
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
+        _store.visitConfiguredObjectRecords(_handler);
         _store.closeConfigurationStore();
 
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
-        InOrder inorder = inOrder(_recoveryHandler);
+        _store.visitConfiguredObjectRecords(_handler);
+        InOrder inorder = inOrder(_handler);
 
         // first time the config version should be the initial version - 0
-        inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
+        inorder.verify(_handler).begin(eq(0));
 
         // second time the config version should be the updated version
-        inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(NEW_CONFIG_VERSION));
+        inorder.verify(_handler).begin(eq(NEW_CONFIG_VERSION));
 
         _store.closeConfigurationStore();
     }
@@ -157,8 +161,9 @@
         _store.closeConfigurationStore();
 
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
-        verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
+
+        _store.visitConfiguredObjectRecords(_handler);
+        verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr));
         _store.closeConfigurationStore();
     }
 
@@ -179,8 +184,8 @@
         _store.closeConfigurationStore();
 
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
-        verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
+        _store.visitConfiguredObjectRecords(_handler);
+        verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr));
         _store.closeConfigurationStore();
     }
 
@@ -201,8 +206,8 @@
         _store.closeConfigurationStore();
 
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
-        verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+        _store.visitConfiguredObjectRecords(_handler);
+        verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
         _store.closeConfigurationStore();
     }
 
@@ -311,12 +316,12 @@
         _store.update(true, bindingRecord, binding2Record);
         _store.closeConfigurationStore();
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
-        verify(_recoveryHandler).configuredObject(matchesRecord(queueId, "Queue", EMPTY_ATTR));
-        verify(_recoveryHandler).configuredObject(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
-        verify(_recoveryHandler).configuredObject(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
-        verify(_recoveryHandler).configuredObject(matchesRecord(bindingId, "Binding", EMPTY_ATTR));
-        verify(_recoveryHandler).configuredObject(matchesRecord(binding2Id, "Binding", EMPTY_ATTR));
+        _store.visitConfiguredObjectRecords(_handler);
+        verify(_handler).handle(matchesRecord(queueId, "Queue", EMPTY_ATTR));
+        verify(_handler).handle(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
+        verify(_handler).handle(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
+        verify(_handler).handle(matchesRecord(bindingId, "Binding", EMPTY_ATTR));
+        verify(_handler).handle(matchesRecord(binding2Id, "Binding", EMPTY_ATTR));
         _store.closeConfigurationStore();
 
     }
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index 451a274..89fef15 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -33,7 +33,6 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.util.FileUtils;
 
@@ -66,12 +65,9 @@
 
         _store = createStore();
 
-        MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
-        when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
         ConfiguredObject<?> parent = mock(ConfiguredObject.class);
         when(parent.getName()).thenReturn("test");
         _store.openMessageStore(parent, storeSettings);
-        _store.recoverMessageStore(recoveryHandler, null);
 
         _transactionResource = UUID.randomUUID();
         _events = new ArrayList<Event>();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 51d3fc1..8bf981b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -21,29 +21,31 @@
 package org.apache.qpid.server.store;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
 import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.ArgumentMatcher;
 
 public abstract class MessageStoreTestCase extends QpidTestCase
 {
-    private MessageStoreRecoveryHandler _messageStoreRecoveryHandler;
-    private StoredMessageRecoveryHandler _storedMessageRecoveryHandler;
-    private TransactionLogRecoveryHandler _logRecoveryHandler;
-    private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler;
-    private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler;
-
     private MessageStore _store;
     private Map<String, Object> _storeSettings;
     private ConfiguredObject<?> _parent;
@@ -55,35 +57,34 @@
         _parent = mock(ConfiguredObject.class);
         when(_parent.getName()).thenReturn("test");
 
-        _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class);
-        _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class);
-        _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
-        _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
-        _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
-
-        when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
-        when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
-        when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
-
         _storeSettings = getStoreSettings();
 
         _store = createMessageStore();
 
         _store.openMessageStore(_parent, _storeSettings);
-        _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
+
     }
 
     protected abstract Map<String, Object> getStoreSettings() throws Exception;
 
     protected abstract MessageStore createMessageStore();
 
-    public MessageStore getStore()
+    protected MessageStore getStore()
     {
         return _store;
     }
 
-    public void testRecordXid() throws Exception
+    protected void reopenStore() throws Exception
     {
+        _store.closeMessageStore();
+
+        _store = createMessageStore();
+        _store.openMessageStore(_parent, _storeSettings);
+    }
+
+    public void testAddAndRemoveRecordXid() throws Exception
+    {
+        long format = 1l;
         Record enqueueRecord = getTestRecord(1);
         Record dequeueRecord = getTestRecord(2);
         Record[] enqueues = { enqueueRecord };
@@ -92,27 +93,287 @@
         byte[] branchId = new byte[] { 2 };
 
         Transaction transaction = _store.newTransaction();
-        transaction.recordXid(1l, globalId, branchId, enqueues, dequeues);
+        transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
         transaction.commitTran();
+
         reopenStore();
-        verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+
+        DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class);
+        _store.visitDistributedTransactions(handler);
+        verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues);
 
         transaction = _store.newTransaction();
         transaction.removeXid(1l, globalId, branchId);
         transaction.commitTran();
 
         reopenStore();
-        verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+
+        handler = mock(DistributedTransactionHandler.class);
+        _store.visitDistributedTransactions(handler);
+        verify(handler, never()).handle(format,globalId, branchId, enqueues, dequeues);
     }
 
-    private void reopenStore() throws Exception
+    public void testVisitMessages() throws Exception
     {
-        _store.closeMessageStore();
+        long messageId = 1;
+        int contentSize = 0;
+        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+        StoreFuture flushFuture = message.flushToStore();
+        flushFuture.waitForCompletion();
 
-        _store = createMessageStore();
-        _store.openMessageStore(_parent, _storeSettings);
-        _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
+        MessageHandler handler = mock(MessageHandler.class);
+        _store.visitMessages(handler);
+
+        verify(handler, times(1)).handle(argThat(new MessageMetaDataMatcher(messageId)));
+
     }
+
+    public void testVisitMessagesAborted() throws Exception
+    {
+        int contentSize = 0;
+        for (int i = 0; i < 3; i++)
+        {
+            final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
+            StoreFuture flushFuture = message.flushToStore();
+            flushFuture.waitForCompletion();
+        }
+
+        MessageHandler handler = mock(MessageHandler.class);
+        when(handler.handle(any(StoredMessage.class))).thenReturn(true, false);
+
+        _store.visitMessages(handler);
+
+        verify(handler, times(2)).handle(any(StoredMessage.class));
+    }
+
+    public void testReopenedMessageStoreUsesLastMessageId() throws Exception
+    {
+        int contentSize = 0;
+        for (int i = 0; i < 3; i++)
+        {
+            final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
+            StoreFuture flushFuture = message.flushToStore();
+            flushFuture.waitForCompletion();
+        }
+
+        reopenStore();
+
+        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize));
+
+        StoreFuture flushFuture = message.flushToStore();
+        flushFuture.waitForCompletion();
+
+        assertEquals("Unexpected message id", 4, message.getMessageNumber());
+    }
+
+    public void testVisitMessageInstances() throws Exception
+    {
+        long messageId = 1;
+        int contentSize = 0;
+        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+        StoreFuture flushFuture = message.flushToStore();
+        flushFuture.waitForCompletion();
+
+        EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message);
+
+        UUID queueId = UUID.randomUUID();
+        TransactionLogResource queue = createTransactionLogResource(queueId);
+
+        Transaction transaction = _store.newTransaction();
+        transaction.enqueueMessage(queue, enqueueableMessage);
+        transaction.commitTran();
+
+        MessageInstanceHandler handler = mock(MessageInstanceHandler.class);
+        _store.visitMessageInstances(handler);
+
+        verify(handler, times(1)).handle(queueId, messageId);
+    }
+
+    public void testVisitDistributedTransactions() throws Exception
+    {
+        long format = 1l;
+        byte[] branchId = new byte[] { 2 };
+        byte[] globalId = new byte[] { 1 };
+        Record enqueueRecord = getTestRecord(1);
+        Record dequeueRecord = getTestRecord(2);
+        Record[] enqueues = { enqueueRecord };
+        Record[] dequeues = { dequeueRecord };
+
+        Transaction transaction = _store.newTransaction();
+        transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
+        transaction.commitTran();
+
+        DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class);
+        _store.visitDistributedTransactions(handler);
+
+        verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues);
+
+    }
+
+    public void testCommitTransaction() throws Exception
+    {
+        final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+        TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId);
+
+        Transaction txn = getStore().newTransaction();
+
+        long messageId1 = 1L;
+        long messageId2 = 5L;
+        final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1);
+        final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2);
+
+        txn.enqueueMessage(mockQueue, enqueueableMessage1);
+        txn.enqueueMessage(mockQueue, enqueueableMessage2);
+        txn.commitTran();
+
+        QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
+        getStore().visitMessageInstances(filter);
+        Set<Long> enqueuedIds = filter.getEnqueuedIds();
+
+        assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+        assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1));
+        assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2));
+    }
+
+    public void testRollbackTransactionBeforeCommit() throws Exception
+    {
+        final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+        TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId);
+
+        long messageId1 = 21L;
+        long messageId2 = 22L;
+        long messageId3 = 23L;
+        final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1);
+        final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2);
+        final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3);
+
+        Transaction txn = getStore().newTransaction();
+
+        txn.enqueueMessage(mockQueue, enqueueableMessage1);
+        txn.abortTran();
+
+        txn = getStore().newTransaction();
+        txn.enqueueMessage(mockQueue, enqueueableMessage2);
+        txn.enqueueMessage(mockQueue, enqueueableMessage3);
+        txn.commitTran();
+
+        QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
+        getStore().visitMessageInstances(filter);
+        Set<Long> enqueuedIds = filter.getEnqueuedIds();
+
+        assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+        assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2));
+        assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3));
+    }
+
+    public void testRollbackTransactionAfterCommit() throws Exception
+    {
+        final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+        TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId);
+
+        long messageId1 = 30L;
+        long messageId2 = 31L;
+        long messageId3 = 32L;
+
+        final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1);
+        final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2);
+        final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3);
+
+        Transaction txn = getStore().newTransaction();
+
+        txn.enqueueMessage(mockQueue, enqueueableMessage1);
+        txn.commitTran();
+
+        txn = getStore().newTransaction();
+        txn.enqueueMessage(mockQueue, enqueueableMessage2);
+        txn.abortTran();
+
+        txn = getStore().newTransaction();
+        txn.enqueueMessage(mockQueue, enqueueableMessage3);
+        txn.commitTran();
+
+        QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
+        getStore().visitMessageInstances(filter);
+        Set<Long> enqueuedIds = filter.getEnqueuedIds();
+
+        assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+        assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1));
+        assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3));
+    }
+
+    public void testStoreIgnoresTransientMessage() throws Exception
+    {
+        long messageId = 1;
+        int contentSize = 0;
+        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false));
+        StoreFuture flushFuture = message.flushToStore();
+        flushFuture.waitForCompletion();
+
+        MessageHandler handler = mock(MessageHandler.class);
+        _store.visitMessages(handler);
+
+        verify(handler, times(0)).handle(argThat(new MessageMetaDataMatcher(messageId)));
+    }
+
+    public void testAddAndRemoveMessageWithoutContent() throws Exception
+    {
+        long messageId = 1;
+        int contentSize = 0;
+        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+        StoreFuture flushFuture = message.flushToStore();
+        flushFuture.waitForCompletion();
+
+        final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>();
+        _store.visitMessages(new MessageHandler()
+        {
+
+            @Override
+            public boolean handle(StoredMessage<?> storedMessage)
+            {
+                retrievedMessageRef.set(storedMessage);
+                return true;
+            }
+        });
+
+        StoredMessage<?> retrievedMessage = retrievedMessageRef.get();
+        assertNotNull("Message was not found", retrievedMessageRef);
+        assertEquals("Unexpected retreived message", message.getMessageNumber(), retrievedMessage.getMessageNumber());
+
+        retrievedMessage.remove();
+
+        retrievedMessageRef.set(null);
+        _store.visitMessages(new MessageHandler()
+        {
+
+            @Override
+            public boolean handle(StoredMessage<?> storedMessage)
+            {
+                retrievedMessageRef.set(storedMessage);
+                return true;
+            }
+        });
+        assertNull(retrievedMessageRef.get());
+    }
+
+
+    private TransactionLogResource createTransactionLogResource(UUID queueId)
+    {
+        TransactionLogResource queue = mock(TransactionLogResource.class);
+        when(queue.getId()).thenReturn(queueId);
+        when(queue.getName()).thenReturn("testQueue");
+        when(queue.isDurable()).thenReturn(true);
+        return queue;
+    }
+
+    private EnqueueableMessage createMockEnqueueableMessage(long messageId, final StoredMessage<TestMessageMetaData> message)
+    {
+        EnqueueableMessage enqueueableMessage = mock(EnqueueableMessage.class);
+        when(enqueueableMessage.isPersistent()).thenReturn(true);
+        when(enqueueableMessage.getMessageNumber()).thenReturn(messageId);
+        when(enqueueableMessage.getStoredMessage()).thenReturn(message);
+        return enqueueableMessage;
+    }
+
     private Record getTestRecord(long messageNumber)
     {
         UUID queueId1 = UUIDGenerator.generateRandomUUID();
@@ -121,77 +382,66 @@
         EnqueueableMessage message1 = mock(EnqueueableMessage.class);
         when(message1.isPersistent()).thenReturn(true);
         when(message1.getMessageNumber()).thenReturn(messageNumber);
-        final StoredMessage storedMessage = mock(StoredMessage.class);
+        final StoredMessage<?> storedMessage = mock(StoredMessage.class);
         when(storedMessage.getMessageNumber()).thenReturn(messageNumber);
         when(message1.getStoredMessage()).thenReturn(storedMessage);
         Record enqueueRecord = new TestRecord(queue1, message1);
         return enqueueRecord;
     }
 
-    private static class TestRecord implements Record
+    private EnqueueableMessage createEnqueueableMessage(long messageId1)
     {
-        private TransactionLogResource _queue;
-        private EnqueueableMessage _message;
+        final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0));
+        StoreFuture flushFuture = message1.flushToStore();
+        flushFuture.waitForCompletion();
+        EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1);
+        return enqueueableMessage1;
+    }
 
-        public TestRecord(TransactionLogResource queue, EnqueueableMessage message)
+    private class MessageMetaDataMatcher extends ArgumentMatcher<StoredMessage<?>>
+    {
+        private long _messageNumber;
+
+        public MessageMetaDataMatcher(long messageNumber)
         {
             super();
-            _queue = queue;
-            _message = message;
+            _messageNumber = messageNumber;
         }
 
-        @Override
-        public TransactionLogResource getResource()
+        public boolean matches(Object obj)
         {
-            return _queue;
+            return obj instanceof StoredMessage && ((StoredMessage<?>)obj).getMessageNumber() == _messageNumber;
         }
-
-        @Override
-        public EnqueueableMessage getMessage()
-        {
-            return _message;
-        }
-
-        @Override
-        public int hashCode()
-        {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode());
-            result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode());
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj)
-        {
-            if (this == obj)
-            {
-                return true;
-            }
-            if (obj == null)
-            {
-                return false;
-            }
-            if (!(obj instanceof Record))
-            {
-                return false;
-            }
-            Record other = (Record) obj;
-            if (_message == null && other.getMessage() != null)
-            {
-                return false;
-            }
-            if (_queue == null && other.getResource() != null)
-            {
-                return false;
-            }
-            if (_message.getMessageNumber() != other.getMessage().getMessageNumber())
-            {
-                return false;
-            }
-            return _queue.getId().equals(other.getResource().getId());
-        }
-
     }
+
+    private class QueueFilteringMessageInstanceHandler implements MessageInstanceHandler
+    {
+        private final UUID _queueId;
+        private final Set<Long> _enqueuedIds = new HashSet<Long>();
+
+        public QueueFilteringMessageInstanceHandler(UUID queueId)
+        {
+            _queueId = queueId;
+        }
+
+        @Override
+        public boolean handle(UUID queueId, long messageId)
+        {
+            if (queueId.equals(_queueId))
+            {
+                if (_enqueuedIds.contains(messageId))
+                {
+                    fail("Queue with id " + _queueId + " contains duplicate message ids");
+                }
+                _enqueuedIds.add(messageId);
+            }
+            return true;
+        }
+
+        public Set<Long> getEnqueuedIds()
+        {
+            return _enqueuedIds;
+        }
+    }
+
 }
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
index 32df355..bfa4e1d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
@@ -20,15 +20,30 @@
  */
 package org.apache.qpid.server.store;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.store.handler.MessageHandler;
+
 
 /** A simple message store that stores the messages in a thread-safe structure in memory. */
 public class TestMemoryMessageStore extends AbstractMemoryMessageStore
 {
     public static final String TYPE = "TestMemory";
 
-    @Override
-    public String getStoreType()
+    public int getMessageCount()
     {
-        return TYPE;
+        final AtomicInteger counter = new AtomicInteger();
+        visitMessages(new MessageHandler()
+        {
+
+            @Override
+            public boolean handle(StoredMessage<?> storedMessage)
+            {
+                counter.incrementAndGet();
+                return true;
+            }
+        });
+        return counter.get();
     }
+
 }
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
index e14b41b..6e55b46 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
@@ -34,13 +34,20 @@
 
     private static final TestMessageMetaDataType TYPE = new TestMessageMetaDataType();
 
-    private int _contentSize;
-    private long _messageId;
+    private final int _contentSize;
+    private final long _messageId;
+    private final boolean _persistent;
 
     public TestMessageMetaData(long messageId, int contentSize)
     {
+        this(messageId, contentSize, true);
+    }
+
+    public TestMessageMetaData(long messageId, int contentSize, boolean persistent)
+    {
         _contentSize = contentSize;
         _messageId = messageId;
+        _persistent = persistent;
     }
 
     @Override
@@ -59,7 +66,7 @@
     }
 
     @Override
-    public MessageMetaDataType getType()
+    public MessageMetaDataType<TestMessageMetaData> getType()
     {
         return TYPE;
     }
@@ -67,7 +74,7 @@
     @Override
     public boolean isPersistent()
     {
-        return true;
+        return _persistent;
     }
 
     @Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 5622383..e5c94cf 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -70,7 +70,22 @@
 
     private static class TestServerMessage implements ServerMessage<TestMessageMetaData>
     {
-        private StoredMessage<TestMessageMetaData> _storedMsg;
+        private final StoredMessage<TestMessageMetaData> _storedMsg;
+
+        private final MessageReference<ServerMessage> _messageReference = new MessageReference<ServerMessage>()
+        {
+
+            @Override
+            public ServerMessage getMessage()
+            {
+                return TestServerMessage.this;
+            }
+
+            @Override
+            public void release()
+            {
+            }
+        };
 
         public TestServerMessage(StoredMessage<TestMessageMetaData> storedMsg)
         {
@@ -115,7 +130,7 @@
         @Override
         public long getMessageNumber()
         {
-            return 0;
+            return _storedMsg.getMessageNumber();
         }
 
         @Override
@@ -140,13 +155,54 @@
         @Override
         public boolean isPersistent()
         {
-            return false;
+            return _storedMsg.getMetaData().isPersistent();
         }
 
         @Override
         public MessageReference newReference()
         {
-            return null;
+            return _messageReference;
         }
+
+        @Override
+        public int hashCode()
+        {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((_storedMsg == null) ? 0 : _storedMsg.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (this == obj)
+            {
+                return true;
+            }
+            if (obj == null)
+            {
+                return false;
+            }
+            if (getClass() != obj.getClass())
+            {
+                return false;
+            }
+            TestServerMessage other = (TestServerMessage) obj;
+            if (_storedMsg == null)
+            {
+                if (other._storedMsg != null)
+                {
+                    return false;
+                }
+            }
+            else if (!_storedMsg.equals(other._storedMsg))
+            {
+                return false;
+            }
+            return true;
+        }
+
+
     }
 }
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
new file mode 100644
index 0000000..668d9d5
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.Transaction.Record;
+
+public class TestRecord implements Record
+{
+    private TransactionLogResource _queue;
+    private EnqueueableMessage _message;
+
+    public TestRecord(TransactionLogResource queue, EnqueueableMessage message)
+    {
+        super();
+        _queue = queue;
+        _message = message;
+    }
+
+    @Override
+    public TransactionLogResource getResource()
+    {
+        return _queue;
+    }
+
+    @Override
+    public EnqueueableMessage getMessage()
+    {
+        return _message;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode());
+        result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (this == obj)
+        {
+            return true;
+        }
+        if (obj == null)
+        {
+            return false;
+        }
+        if (!(obj instanceof Record))
+        {
+            return false;
+        }
+        Record other = (Record) obj;
+        if (_message == null && other.getMessage() != null)
+        {
+            return false;
+        }
+        if (_queue == null && other.getResource() != null)
+        {
+            return false;
+        }
+        if (_message.getMessageNumber() != other.getMessage().getMessageNumber())
+        {
+            return false;
+        }
+        return _queue.getId().equals(other.getResource().getId());
+    }
+
+}
\ No newline at end of file
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
deleted file mode 100644
index 7d4dcd0..0000000
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Adds some extra methods to the memory message store for testing purposes.
- */
-public class TestableMemoryMessageStore extends TestMemoryMessageStore
-{
-    public static final String TYPE = "TestableMemory";
-    private final Map<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
-    private final AtomicInteger _messageCount = new AtomicInteger(0);
-
-    @Override
-    public StoredMessage addMessage(StorableMessageMetaData metaData)
-    {
-        return new TestableStoredMessage(super.addMessage(metaData));
-    }
-
-    public int getMessageCount()
-    {
-        return _messageCount.get();
-    }
-
-    public Map<Long, AMQQueue> getMessages()
-    {
-        return _messages;
-    }
-
-    private class TestableTransaction implements Transaction
-    {
-        @Override
-        public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
-        {
-            getMessages().put(message.getMessageNumber(), (AMQQueue)queue);
-        }
-
-        @Override
-        public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
-        {
-            getMessages().remove(message.getMessageNumber());
-        }
-
-        @Override
-        public void commitTran()
-        {
-        }
-
-        @Override
-        public StoreFuture commitTranAsync()
-        {
-            return StoreFuture.IMMEDIATE_FUTURE;
-        }
-
-        public void abortTran()
-        {
-        }
-
-        public void removeXid(long format, byte[] globalId, byte[] branchId)
-        {
-        }
-
-        public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
-        {
-        }
-    }
-
-
-    @Override
-    public Transaction newTransaction()
-    {
-        return new TestableTransaction();
-    }
-
-
-    private class TestableStoredMessage implements StoredMessage
-    {
-        private final StoredMessage _storedMessage;
-
-        public TestableStoredMessage(StoredMessage storedMessage)
-        {
-            _messageCount.incrementAndGet();
-            _storedMessage = storedMessage;
-        }
-
-        public StorableMessageMetaData getMetaData()
-        {
-            return _storedMessage.getMetaData();
-        }
-
-        public long getMessageNumber()
-        {
-            return _storedMessage.getMessageNumber();
-        }
-
-        public void addContent(int offsetInMessage, ByteBuffer src)
-        {
-            _storedMessage.addContent(offsetInMessage, src);
-        }
-
-        public int getContent(int offsetInMessage, ByteBuffer dst)
-        {
-            return _storedMessage.getContent(offsetInMessage, dst);
-        }
-
-
-        public ByteBuffer getContent(int offsetInMessage, int size)
-        {
-            return _storedMessage.getContent(offsetInMessage, size);
-        }
-
-        public StoreFuture flushToStore()
-        {
-            return _storedMessage.flushToStore();
-        }
-
-        public void remove()
-        {
-            _storedMessage.remove();
-            _messageCount.decrementAndGet();
-        }
-    }
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
index ab18c8f..da868a0 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
@@ -33,14 +33,14 @@
  * Mock implementation of a (Store) Transaction allow its state to be observed.
  * Also provide a factory method to produce TestTransactionLog objects suitable
  * for unit test use.
- * 
+ *
  */
 class MockStoreTransaction implements Transaction
 {
     enum TransactionState {NOT_STARTED, STARTED, COMMITTED, ABORTED};
 
     private TransactionState _state = TransactionState.NOT_STARTED;
-    
+
     private int _numberOfEnqueuedMessages = 0;
     private int _numberOfDequeuedMessages = 0;
     private boolean _throwExceptionOnQueueOp;
@@ -52,7 +52,7 @@
 
     public void setState(TransactionState state)
     {
-        _state = state; 
+        _state = state;
     }
 
     public TransactionState getState()
@@ -64,10 +64,10 @@
     {
         if (_throwExceptionOnQueueOp)
         {
-            
+
             throw new ServerScopedRuntimeException("Mocked exception");
         }
-        
+
         _numberOfEnqueuedMessages++;
     }
 
@@ -87,7 +87,7 @@
         {
             throw new ServerScopedRuntimeException("Mocked exception");
         }
-        
+
         _numberOfDequeuedMessages++;
     }
 
@@ -124,12 +124,6 @@
                 storeTransaction.setState(TransactionState.STARTED);
                 return storeTransaction;
             }
-
-            @Override
-            public String getStoreType()
-            {
-                return "TEST";
-            }
        };
     }
 }
\ No newline at end of file
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index fd56f3f..1b131a1 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -44,7 +44,6 @@
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.plugin.PluggableFactoryLoader;
@@ -110,7 +109,7 @@
         when(virtualHost.getAttribute(org.apache.qpid.server.model.VirtualHost.TYPE)).thenReturn(StandardVirtualHostFactory.TYPE);
 
         Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
-        messageStoreSettings.put(MessageStore.STORE_TYPE, TestableMemoryMessageStore.TYPE);
+        messageStoreSettings.put(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE);
 
         when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings);
         when(virtualHost.getName()).thenReturn(name);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java
new file mode 100644
index 0000000..ce5616b
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java
@@ -0,0 +1,414 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.NullMessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TestMessageMetaData;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.txn.DtxBranch;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.Xid;
+import org.mockito.ArgumentMatcher;
+
+public class MessageStoreRecovererTest extends TestCase
+{
+    private VirtualHost _virtualHost;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+
+        _virtualHost = mock(VirtualHost.class);
+        when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
+
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testRecoveryOfSingleMessageOnSingleQueue()
+    {
+        final AMQQueue<?> queue = createRegisteredMockQueue();
+
+        final long messageId = 1;
+        final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(messageId);
+
+        MessageStore store = new NullMessageStore()
+        {
+            @Override
+            public void visitMessages(MessageHandler handler) throws StoreException
+            {
+                handler.handle(storedMessage);
+            }
+
+            @Override
+            public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+            {
+                handler.handle(queue.getId(), messageId);
+            }
+        };
+
+        when(_virtualHost.getMessageStore()).thenReturn(store);
+
+        MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+        recoverer.recover();
+
+        ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage);
+        verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull());
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testRecoveryOfMessageInstanceForNonExistingMessage()
+    {
+        final AMQQueue<?> queue = createRegisteredMockQueue();
+
+        final long messageId = 1;
+        final Transaction transaction = mock(Transaction.class);
+
+        MessageStore store = new NullMessageStore()
+        {
+            @Override
+            public void visitMessages(MessageHandler handler) throws StoreException
+            {
+                // no message to visit
+            }
+
+            @Override
+            public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+            {
+                handler.handle(queue.getId(), messageId);
+            }
+
+            @Override
+            public Transaction newTransaction()
+            {
+                return transaction;
+            }
+        };
+
+        when(_virtualHost.getMessageStore()).thenReturn(store);
+
+        MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+        recoverer.recover();
+
+        verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class));
+        verify(transaction).dequeueMessage(same(queue), argThat(new MessageNumberMatcher(messageId)));
+        verify(transaction, times(1)).commitTranAsync();
+    }
+
+    public void testRecoveryOfMessageInstanceForNonExistingQueue()
+    {
+        final UUID queueId = UUID.randomUUID();
+        final Transaction transaction = mock(Transaction.class);
+        final long messageId = 1;
+        final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(messageId);
+
+        MessageStore store = new NullMessageStore()
+        {
+            @Override
+            public void visitMessages(MessageHandler handler) throws StoreException
+            {
+                handler.handle(storedMessage);
+            }
+
+            @Override
+            public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+            {
+                handler.handle(queueId, messageId);
+            }
+
+            @Override
+            public Transaction newTransaction()
+            {
+                return transaction;
+            }
+        };
+
+        when(_virtualHost.getMessageStore()).thenReturn(store);
+
+        MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+        recoverer.recover();
+
+        verify(transaction).dequeueMessage(argThat(new QueueIdMatcher(queueId)), argThat(new MessageNumberMatcher(messageId)));
+        verify(transaction, times(1)).commitTranAsync();
+    }
+
+    public void testRecoveryDeletesOrphanMessages()
+    {
+
+        final long messageId = 1;
+        final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(messageId);
+
+        MessageStore store = new NullMessageStore()
+        {
+            @Override
+            public void visitMessages(MessageHandler handler) throws StoreException
+            {
+                handler.handle(storedMessage);
+            }
+
+            @Override
+            public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+            {
+                // No messages instances
+            }
+        };
+
+        when(_virtualHost.getMessageStore()).thenReturn(store);
+
+        MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+        recoverer.recover();
+
+        verify(storedMessage, times(1)).remove();
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testRecoveryOfSingleEnqueueWithDistributedTransaction()
+    {
+        AMQQueue<?> queue = createRegisteredMockQueue();
+
+        final Transaction transaction = mock(Transaction.class);
+
+        final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(1);
+        long messageId = storedMessage.getMessageNumber();
+
+        EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage);
+        Record enqueueRecord = createMockRecord(queue, enqueueableMessage);
+
+        final long format = 1;
+        final byte[] globalId = new byte[] {0};
+        final byte[] branchId = new byte[] {0};
+        final Record[] enqueues = { enqueueRecord };
+        final Record[] dequeues = {};
+
+        MessageStore store = new NullMessageStore()
+        {
+            @Override
+            public void visitMessages(MessageHandler handler) throws StoreException
+            {
+                handler.handle(storedMessage);
+            }
+
+            @Override
+            public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+            {
+                // No messages instances
+            }
+
+            @Override
+            public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+            {
+                handler.handle(format, globalId, branchId, enqueues, dequeues);
+            }
+
+            @Override
+            public Transaction newTransaction()
+            {
+                return transaction;
+            }
+        };
+
+        DtxRegistry dtxRegistry = new DtxRegistry();
+
+        when(_virtualHost.getMessageStore()).thenReturn(store);
+        when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry);
+
+        MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+        recoverer.recover();
+
+        DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId));
+        assertNotNull("Expected dtx branch to be created", branch);
+        branch.commit();
+
+        ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage);
+        verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull());
+        verify(transaction).commitTran();
+    }
+
+    public void testRecoveryOfSingleDequeueWithDistributedTransaction()
+    {
+        final AMQQueue<?> queue = createRegisteredMockQueue();
+
+
+        final Transaction transaction = mock(Transaction.class);
+
+        final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(1);
+        final long messageId = storedMessage.getMessageNumber();
+
+        EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage);
+        Record dequeueRecord = createMockRecord(queue, enqueueableMessage);
+
+        QueueEntry queueEntry = mock(QueueEntry.class);
+        when(queue.getMessageOnTheQueue(messageId)).thenReturn(queueEntry);
+
+        final long format = 1;
+        final byte[] globalId = new byte[] {0};
+        final byte[] branchId = new byte[] {0};
+        final Record[] enqueues = {};
+        final Record[] dequeues = { dequeueRecord };
+
+        MessageStore store = new NullMessageStore()
+        {
+            @Override
+            public void visitMessages(MessageHandler handler) throws StoreException
+            {
+                handler.handle(storedMessage);
+            }
+
+            @Override
+            public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+            {
+                // We need the message to be enqueued onto the queue so that later the distributed transaction
+                // can dequeue it.
+                handler.handle(queue.getId(), messageId);
+            }
+
+            @Override
+            public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+            {
+                handler.handle(format, globalId, branchId, enqueues, dequeues);
+            }
+
+            @Override
+            public Transaction newTransaction()
+            {
+                return transaction;
+            }
+        };
+
+        DtxRegistry dtxRegistry = new DtxRegistry();
+
+        when(_virtualHost.getMessageStore()).thenReturn(store);
+        when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry);
+
+        MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+        recoverer.recover();
+
+        DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId));
+        assertNotNull("Expected dtx branch to be created", branch);
+        branch.commit();
+
+        verify(queueEntry, times(1)).delete();
+        verify(transaction).commitTran();
+    }
+
+
+    protected Record createMockRecord(AMQQueue<?> queue, EnqueueableMessage enqueueableMessage)
+    {
+        Record enqueueRecord = mock(Record.class);
+        when(enqueueRecord.getMessage()).thenReturn(enqueueableMessage);
+        when(enqueueRecord.getResource()).thenReturn(queue);
+        return enqueueRecord;
+    }
+
+    protected EnqueueableMessage createMockEnqueueableMessage(long messageId,
+            final StoredMessage<StorableMessageMetaData> storedMessage)
+    {
+        EnqueueableMessage enqueueableMessage = mock(EnqueueableMessage.class);
+        when(enqueueableMessage.getMessageNumber()).thenReturn(messageId);
+        when(enqueueableMessage.getStoredMessage()).thenReturn(storedMessage);
+        return enqueueableMessage;
+    }
+
+    private StoredMessage<StorableMessageMetaData> createMockStoredMessage(final long messageId)
+    {
+        TestMessageMetaData metaData = new TestMessageMetaData(messageId, 0);
+
+        @SuppressWarnings("unchecked")
+        final StoredMessage<StorableMessageMetaData> storedMessage = mock(StoredMessage.class);
+        when(storedMessage.getMessageNumber()).thenReturn(messageId);
+        when(storedMessage.getMetaData()).thenReturn(metaData);
+        return storedMessage;
+    }
+
+    private AMQQueue<?> createRegisteredMockQueue()
+    {
+        AMQQueue<?> queue = mock(AMQQueue.class);
+        final UUID queueId = UUID.randomUUID();
+        when(queue.getId()).thenReturn(queueId);
+        when(queue.getName()).thenReturn("test-queue");
+        when(_virtualHost.getQueue(queueId)).thenReturn(queue);
+        return queue;
+    }
+
+
+    private final class QueueIdMatcher extends ArgumentMatcher<TransactionLogResource>
+    {
+        private UUID _queueId;
+        public QueueIdMatcher(UUID queueId)
+        {
+            _queueId = queueId;
+        }
+
+        @Override
+        public boolean matches(Object argument)
+        {
+            return argument instanceof TransactionLogResource && _queueId.equals( ((TransactionLogResource)argument).getId() );
+        }
+    }
+
+    private final class MessageNumberMatcher extends ArgumentMatcher<EnqueueableMessage>
+    {
+        private final long _messageId;
+
+        private MessageNumberMatcher(long messageId)
+        {
+            _messageId = messageId;
+        }
+
+        @Override
+        public boolean matches(Object argument)
+        {
+            return argument instanceof EnqueueableMessage && ((EnqueueableMessage)argument).getMessageNumber() == _messageId;
+        }
+    }
+}
diff --git a/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
index 4824161..9512fb8 100644
--- a/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
+++ b/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
@@ -17,4 +17,3 @@
 # under the License.
 #
 org.apache.qpid.server.store.TestMemoryMessageStoreFactory
-org.apache.qpid.server.store.TestableMemoryMessageStoreFactory
\ No newline at end of file
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index e5cfced..6b697f8 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -32,7 +32,7 @@
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.BrokerTestHelper;
@@ -53,7 +53,7 @@
 
     private AMQProtocolSession _protocolSession;
 
-    private TestableMemoryMessageStore _messageStore;
+    private TestMemoryMessageStore _messageStore;
 
     private AMQChannel _channel;
 
@@ -71,7 +71,7 @@
         _protocolSession = _channel.getProtocolSession();
         _virtualHost = _protocolSession.getVirtualHost();
         _queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost);
-        _messageStore = (TestableMemoryMessageStore)_virtualHost.getMessageStore();
+        _messageStore = (TestMemoryMessageStore)_virtualHost.getMessageStore();
     }
 
     @Override
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
index 9e551c9..399564f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
@@ -20,13 +20,11 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-
-import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -175,7 +173,7 @@
 
     private void checkStoreContents(int messageCount)
     {
-        assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount());
+        assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestMemoryMessageStore) _messageStore).getMessageCount());
     }
 
 }
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index e213aa8..520e35f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -27,7 +27,7 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -133,7 +133,7 @@
 
     private void checkStoreContents(int messageCount)
     {
-        assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount());
+        assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestMemoryMessageStore) _messageStore).getMessageCount());
     }
 
     private AMQShortString browse(AMQChannel channel, AMQQueue queue) throws Exception
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
index 227e979..e9c37e7 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
@@ -27,7 +27,7 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 /**
@@ -35,12 +35,12 @@
  */
 public class ReferenceCountingTest extends QpidTestCase
 {
-    private TestableMemoryMessageStore _store;
+    private TestMemoryMessageStore _store;
 
 
     protected void setUp() throws Exception
     {
-        _store = new TestableMemoryMessageStore();
+        _store = new TestMemoryMessageStore();
     }
 
     /**
@@ -83,7 +83,7 @@
 
         MessageMetaData mmd = new MessageMetaData(info, chb);
         StoredMessage storedMessage = _store.addMessage(mmd);
-
+        storedMessage.flushToStore();
 
         AMQMessage message = new AMQMessage(storedMessage);
 
@@ -141,6 +141,7 @@
 
         MessageMetaData mmd = new MessageMetaData(info, chb);
         StoredMessage storedMessage = _store.addMessage(mmd);
+        storedMessage.flushToStore();
 
         AMQMessage message = new AMQMessage(storedMessage);
 
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 9202672..d682076 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -417,12 +417,6 @@
     }
 
     @Override
-    public String getStoreType()
-    {
-        return TYPE;
-    }
-
-    @Override
     public void onDelete()
     {
         if (_logger.isDebugEnabled())
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
index 20de4ea..9a2d945 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
@@ -70,10 +70,13 @@
 
     private void deleteStoreIfExists()
     {
-        File location = new File(_storeLocation);
-        if (location.exists())
+        if (_storeLocation != null)
         {
-            FileUtils.delete(location, true);
+            File location = new File(_storeLocation);
+            if (location.exists())
+            {
+                FileUtils.delete(location, true);
+            }
         }
     }
 
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
index 4ca9cb2..509184d 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
@@ -338,12 +338,6 @@
     }
 
     @Override
-    public String getStoreType()
-    {
-        return TYPE;
-    }
-
-    @Override
     protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
     {
         if(_useBytesMethodsForBlob)
diff --git a/qpid/java/broker-plugins/memory-store/pom.xml b/qpid/java/broker-plugins/memory-store/pom.xml
index b715743..8bec7ef 100644
--- a/qpid/java/broker-plugins/memory-store/pom.xml
+++ b/qpid/java/broker-plugins/memory-store/pom.xml
@@ -36,6 +36,22 @@
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-test-utils</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-broker-core</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 61fef91..c8dd2e6 100644
--- a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -26,9 +26,4 @@
 {
     public static final String TYPE = "Memory";
 
-    @Override
-    public String getStoreType()
-    {
-        return TYPE;
-    }
 }
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
similarity index 68%
rename from qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
rename to qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
index ba9b7c1..8fd3cbb 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
+++ b/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
@@ -18,30 +18,30 @@
  * under the License.
  *
  */
-
 package org.apache.qpid.server.store;
 
+import java.util.Collections;
 import java.util.Map;
 
-import org.apache.qpid.server.plugin.MessageStoreFactory;
-
-public class TestableMemoryMessageStoreFactory implements MessageStoreFactory
+public class MemoryMessageStoreTest extends MessageStoreTestCase
 {
+
     @Override
-    public String getType()
+    protected Map<String, Object> getStoreSettings() throws Exception
     {
-        return TestableMemoryMessageStore.TYPE;
+        return Collections.<String, Object>emptyMap();
     }
 
     @Override
-    public MessageStore createMessageStore()
+    protected MessageStore createMessageStore()
     {
-        return new TestableMemoryMessageStore();
+        return new MemoryMessageStore();
     }
 
     @Override
-    public void validateAttributes(Map<String, Object> attributes)
+    protected void reopenStore() throws Exception
     {
+        // cannot re-open memory message store as it is not persistent
     }
 
 }
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
index 406a20d..0993783 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
@@ -27,7 +27,7 @@
 import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.server.model.ConfiguredObject;
 
-public class QuotaMessageStore extends NullMessageStore
+public class QuotaMessageStore extends AbstractMemoryMessageStore
 {
     public static final String TYPE = "QuotaMessageStore";
     private final AtomicLong _messageId = new AtomicLong(1);
@@ -155,10 +155,4 @@
             }
         }
     }
-
-    @Override
-    public String getStoreType()
-    {
-        return TYPE;
-    }
 }
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index e20196c..95bffa8 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -31,6 +31,10 @@
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.plugin.MessageStoreFactory;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 
 public class SlowMessageStore implements MessageStore, DurableConfigurationStore
 {
@@ -63,12 +67,6 @@
         }
     }
 
-    @Override
-    public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
-    {
-        _realDurableConfigurationStore.recoverConfigurationStore(recoveryHandler);
-    }
-
     private void configureDelays(Map<String, Object> delays)
     {
 
@@ -294,12 +292,6 @@
     }
 
     @Override
-    public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
-    {
-       _realMessageStore.recoverMessageStore(messageRecoveryHandler, transactionLogRecoveryHandler);
-    }
-
-    @Override
     public void addEventListener(EventListener eventListener, Event... events)
     {
         if (_realMessageStore == null)
@@ -319,15 +311,33 @@
     }
 
     @Override
-    public String getStoreType()
-    {
-        return TYPE;
-    }
-
-    @Override
     public void onDelete()
     {
         _realMessageStore.onDelete();
     }
 
+    @Override
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+    {
+        _realDurableConfigurationStore.visitConfiguredObjectRecords(handler);
+    }
+
+    @Override
+    public void visitMessages(MessageHandler handler) throws StoreException
+    {
+        _realMessageStore.visitMessages(handler);
+    }
+
+    @Override
+    public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+    {
+        _realMessageStore.visitMessageInstances(handler);
+    }
+
+    @Override
+    public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+    {
+        _realMessageStore.visitDistributedTransactions(handler);
+    }
+
 }
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
similarity index 90%
rename from qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
rename to qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index d89f5cc..7db8210 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.store;
 
-
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -46,7 +45,6 @@
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
@@ -68,14 +66,16 @@
 import org.apache.qpid.util.FileUtils;
 
 /**
- * This tests the MessageStores by using the available interfaces.
+ *
+ * Virtualhost/store integration test. Tests for correct behaviour of the message store
+ * when exercised via the higher level functions of the store.
  *
  * For persistent stores, it validates that Exchanges, Queues, Bindings and
  * Messages are persisted and recovered correctly.
  */
-public class MessageStoreTest extends QpidTestCase
+public class VirtualHostMessageStoreTest extends QpidTestCase
 {
-    private static final Logger _logger = Logger.getLogger(MessageStoreTest.class);
+    private static final Logger _logger = Logger.getLogger(VirtualHostMessageStoreTest.class);
 
     public static final int DEFAULT_PRIORTY_LEVEL = 5;
     public static final String SELECTOR_VALUE = "Test = 'MST'";
@@ -103,8 +103,7 @@
     private String queueOwner = "MST";
 
     private VirtualHost _virtualHost;
-    private org.apache.qpid.server.model.VirtualHost _virtualHostModel;
-    private Broker _broker;
+    private org.apache.qpid.server.model.VirtualHost<?> _virtualHostModel;
     private String _storePath;
 
     public void setUp() throws Exception
@@ -120,6 +119,7 @@
         messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType());
 
         _virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class);
+
         when(_virtualHostModel.getMessageStoreSettings()).thenReturn(messageStoreSettings);
         when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.TYPE))).thenReturn(StandardVirtualHostFactory.TYPE);
         when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.NAME))).thenReturn(hostName);
@@ -128,8 +128,6 @@
 
         cleanup(new File(_storePath));
 
-        _broker = BrokerTestHelper.createBrokerMock();
-
         reloadVirtualHost();
     }
 
@@ -201,10 +199,6 @@
         assertTrue("Virtualhost has not changed, reload was not successful", original != getVirtualHost());
     }
 
-    /**
-     * Old MessageStoreTest segment which runs against both persistent and non-persistent stores
-     * creating queues, exchanges and bindings and then verifying message delivery to them.
-     */
     public void testQueueExchangeAndBindingCreation() throws Exception
     {
         assertEquals("Should not be any existing queues", 0,  getVirtualHost().getQueues().size());
@@ -213,15 +207,15 @@
         createAllTopicQueues();
 
         //Register Non-Durable DirectExchange
-        ExchangeImpl nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false);
+        ExchangeImpl<?> nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false);
         bindAllQueuesToExchange(nonDurableExchange, directRouting);
 
         //Register DirectExchange
-        ExchangeImpl directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true);
+        ExchangeImpl<?> directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true);
         bindAllQueuesToExchange(directExchange, directRouting);
 
         //Register TopicExchange
-        ExchangeImpl topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
+        ExchangeImpl<?> topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
         bindAllTopicQueuesToExchange(topicExchange, topicRouting);
 
         //Send Message To NonDurable direct Exchange = persistent
@@ -248,12 +242,6 @@
                 10, getVirtualHost().getQueues().size());
     }
 
-    /**
-     * Tests message persistence by running the testQueueExchangeAndBindingCreation() method above
-     * before reloading the virtual host and ensuring that the persistent messages were restored.
-     *
-     * More specific testing of message persistence is left to store-specific unit testing.
-     */
     public void testMessagePersistence() throws Exception
     {
         testQueueExchangeAndBindingCreation();
@@ -346,7 +334,7 @@
                 1,  getVirtualHost().getQueues().size());
 
         //test that removing the queue means it is not recovered next time
-        final AMQQueue queue = getVirtualHost().getQueue(durableQueueName);
+        final AMQQueue<?> queue = getVirtualHost().getQueue(durableQueueName);
         DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue);
 
         reloadVirtualHost();
@@ -397,7 +385,7 @@
                 origExchangeCount + 1,  getVirtualHost().getExchanges().size());
 
         //test that removing the exchange means it is not recovered next time
-        final ExchangeImpl exchange = getVirtualHost().getExchange(directExchangeName);
+        final ExchangeImpl<?> exchange = getVirtualHost().getExchange(directExchangeName);
         DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange);
 
         reloadVirtualHost();
@@ -423,9 +411,9 @@
 
         Map<String, ExchangeImpl<?>> exchanges = createExchanges();
 
-        ExchangeImpl nonDurableExchange = exchanges.get(nonDurableExchangeName);
-        ExchangeImpl directExchange = exchanges.get(directExchangeName);
-        ExchangeImpl topicExchange = exchanges.get(topicExchangeName);
+        ExchangeImpl<?> nonDurableExchange = exchanges.get(nonDurableExchangeName);
+        ExchangeImpl<?> directExchange = exchanges.get(directExchangeName);
+        ExchangeImpl<?> topicExchange = exchanges.get(topicExchangeName);
 
         bindAllQueuesToExchange(nonDurableExchange, directRouting);
         bindAllQueuesToExchange(directExchange, directRouting);
@@ -449,7 +437,7 @@
     public void testDurableBindingRemoval() throws Exception
     {
         //create durable queue and exchange, bind them
-        ExchangeImpl exch = createExchange(DirectExchange.TYPE, directExchangeName, true);
+        ExchangeImpl<?> exch = createExchange(DirectExchange.TYPE, directExchangeName, true);
         createQueue(durableQueueName, false, true, false, false);
         bindQueueToExchange(exch, directRouting, getVirtualHost().getQueue(durableQueueName), false);
 
@@ -482,7 +470,7 @@
     private void validateExchanges(int originalNumExchanges, Map<String, ExchangeImpl<?>> oldExchanges)
     {
         Collection<ExchangeImpl<?>> exchanges = getVirtualHost().getExchanges();
-        Collection<String> exchangeNames = new ArrayList(exchanges.size());
+        Collection<String> exchangeNames = new ArrayList<String>(exchanges.size());
         for(ExchangeImpl<?> exchange : exchanges)
         {
             exchangeNames.add(exchange.getName());
@@ -506,6 +494,7 @@
     }
 
     /** Validates the Durable queues and their properties are as expected following recovery */
+    @SuppressWarnings("unchecked")
     private void validateBindingProperties()
     {
 
@@ -526,11 +515,11 @@
      * @param bindings     the set of bindings to validate
      * @param useSelectors if set, check the binding has a JMS_SELECTOR argument and the correct value for it
      */
-    private void validateBindingProperties(Collection<? extends Binding> bindings, boolean useSelectors)
+    private void validateBindingProperties(Collection<? extends Binding<?>> bindings, boolean useSelectors)
     {
         assertEquals("Each queue should only be bound once.", 1, bindings.size());
 
-        Binding binding = bindings.iterator().next();
+        Binding<?> binding = bindings.iterator().next();
 
         if (useSelectors)
         {
@@ -543,13 +532,13 @@
 
     private void setQueueExclusivity(boolean exclusive) throws MessageSource.ExistingConsumerPreventsExclusive
     {
-        AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName);
+        AMQQueue<?> queue = getVirtualHost().getQueue(durableExclusiveQueueName);
         queue.setExclusivityPolicy(exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE);
     }
 
     private void validateQueueExclusivityProperty(boolean expected)
     {
-        AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName);
+        AMQQueue<?> queue = getVirtualHost().getQueue(durableExclusiveQueueName);
 
         assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected);
     }
@@ -565,7 +554,7 @@
         validateQueueProperties(getVirtualHost().getQueue(durableLastValueQueueName), false, true, true, true);
     }
 
-    private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
+    private void validateQueueProperties(AMQQueue<?> queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
     {
         if(usePriority || lastValueQueue)
         {
@@ -588,9 +577,9 @@
             assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass());
         }
 
-        assertEquals("Queue owner is not as expected", exclusive ? queueOwner : null, queue.getOwner());
-        assertEquals("Queue durability is not as expected", durable, queue.isDurable());
-        assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive());
+        assertEquals("Queue owner is not as expected for queue " + queue.getName(), exclusive ? queueOwner : null, queue.getOwner());
+        assertEquals("Queue durability is not as expected for queue " + queue.getName(), durable, queue.isDurable());
+        assertEquals("Queue exclusivity is not as expected for queue " + queue.getName(), exclusive, queue.isExclusive());
     }
 
     /**
@@ -606,7 +595,7 @@
         }
     }
 
-    private void sendMessageOnExchange(ExchangeImpl exchange, String routingKey, boolean deliveryMode)
+    private void sendMessageOnExchange(ExchangeImpl<?> exchange, String routingKey, boolean deliveryMode)
     {
         //Set MessagePersistence
         BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
@@ -698,15 +687,12 @@
         {
             queueArguments.put(Queue.OWNER, queueOwner);
         }
-        AMQQueue queue = null;
+        AMQQueue<?> queue = null;
 
         //Ideally we would be able to use the QueueDeclareHandler here.
         queue = getVirtualHost().createQueue(queueArguments);
 
         validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);
-
-
-
     }
 
     private Map<String, ExchangeImpl<?>> createExchanges() throws Exception
@@ -733,14 +719,14 @@
         attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType());
         attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable);
         attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
-                       false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+                durable ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
         attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
         exchange = getVirtualHost().createExchange(attributes);
 
         return exchange;
     }
 
-    private void bindAllQueuesToExchange(ExchangeImpl exchange, String routingKey)
+    private void bindAllQueuesToExchange(ExchangeImpl<?> exchange, String routingKey)
     {
         bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityQueueName), false);
         bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableQueueName), false);
@@ -749,7 +735,7 @@
         bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableExclusiveQueueName), false);
     }
 
-    private void bindAllTopicQueuesToExchange(ExchangeImpl exchange, String routingKey)
+    private void bindAllTopicQueuesToExchange(ExchangeImpl<?> exchange, String routingKey)
     {
 
         bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityTopicQueueName), true);
@@ -759,9 +745,9 @@
     }
 
 
-    protected void bindQueueToExchange(ExchangeImpl exchange,
+    protected void bindQueueToExchange(ExchangeImpl<?> exchange,
                                        String routingKey,
-                                       AMQQueue queue,
+                                       AMQQueue<?> queue,
                                        boolean useSelector)
     {
         Map<String,Object> bindArguments = new HashMap<String, Object>();
@@ -781,9 +767,9 @@
         }
     }
 
-    protected void unbindQueueFromExchange(ExchangeImpl exchange,
+    protected void unbindQueueFromExchange(ExchangeImpl<?> exchange,
                                            String routingKey,
-                                           AMQQueue queue,
+                                           AMQQueue<?> queue,
                                            boolean useSelector)
     {
         Map<String,Object> bindArguments = new HashMap<String, Object>();
@@ -829,7 +815,7 @@
 
     private void validateMessageOnQueue(String queueName, long messageCount)
     {
-        AMQQueue queue = getVirtualHost().getQueue(queueName);
+        AMQQueue<?> queue = getVirtualHost().getQueue(queueName);
 
         assertNotNull("Queue(" + queueName + ") not correctly registered:", queue);
 
@@ -839,12 +825,12 @@
     private class TestMessagePublishInfo implements MessagePublishInfo
     {
 
-        ExchangeImpl _exchange;
+        ExchangeImpl<?> _exchange;
         boolean _immediate;
         boolean _mandatory;
         String _routingKey;
 
-        TestMessagePublishInfo(ExchangeImpl exchange, boolean immediate, boolean mandatory, String routingKey)
+        TestMessagePublishInfo(ExchangeImpl<?> exchange, boolean immediate, boolean mandatory, String routingKey)
         {
             _exchange = exchange;
             _immediate = immediate;
@@ -852,29 +838,35 @@
             _routingKey = routingKey;
         }
 
+        @Override
         public AMQShortString getExchange()
         {
             return new AMQShortString(_exchange.getName());
         }
 
+        @Override
         public void setExchange(AMQShortString exchange)
         {
             //no-op
         }
 
+        @Override
         public boolean isImmediate()
         {
             return _immediate;
         }
 
+        @Override
         public boolean isMandatory()
         {
             return _mandatory;
         }
 
+        @Override
         public AMQShortString getRoutingKey()
         {
             return new AMQShortString(_routingKey);
         }
     }
+
 }
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
index 4c0e2b7..59b4d49 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
@@ -45,10 +45,9 @@
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.security.access.FileAccessControlProviderConstants;
 import org.apache.qpid.server.security.group.FileGroupManagerFactory;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 
 public class TestBrokerConfiguration
 {
@@ -191,7 +190,7 @@
     private ConfiguredObjectRecord findObject(final Class<? extends ConfiguredObject> category, final String objectName)
     {
         final RecordFindingVisitor visitor = new RecordFindingVisitor(category, objectName);
-        _store.recoverConfigurationStore(visitor);
+        _store.visitConfiguredObjectRecords(visitor);
         return visitor.getFoundRecord();
     }
 
@@ -235,11 +234,12 @@
         return findObject(category, name).getAttributes();
     }
 
-    private static class RecordFindingVisitor implements ConfigurationRecoveryHandler
+    private static class RecordFindingVisitor implements ConfiguredObjectRecordHandler
     {
         private final Class<? extends ConfiguredObject> _category;
         private final String _objectName;
         public ConfiguredObjectRecord _foundRecord;
+        private int _version;
 
         public RecordFindingVisitor(final Class<? extends ConfiguredObject> category, final String objectName)
         {
@@ -248,26 +248,28 @@
         }
 
         @Override
-        public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+        public void begin(final int configVersion)
         {
-
+            _version = configVersion;
         }
 
         @Override
-        public void configuredObject(final ConfiguredObjectRecord object)
+        public boolean handle(final ConfiguredObjectRecord object)
         {
             if (object.getType().equals(_category.getSimpleName())
                 && (_objectName == null
                     || _objectName.equals(object.getAttributes().get(ConfiguredObject.NAME))))
             {
                 _foundRecord = object;
+                return false;
             }
+            return true;
         }
 
         @Override
-        public int completeConfigurationRecovery()
+        public int end()
         {
-            return 0;
+            return _version;
         }
 
         public ConfiguredObjectRecord getFoundRecord()
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index 6f7de94..8e48e77 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -85,14 +85,14 @@
 org.apache.qpid.server.store.SplitStoreTest#*
 
 // These tests are for the Java broker persistent store modules
-org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence
-org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval
-org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence
-org.apache.qpid.server.store.MessageStoreTest#testDurableBindingRemoval
-org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence
-org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval
-org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence
-org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessagePersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessageRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testBindingPersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableBindingRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testQueuePersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableQueueRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testExchangePersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableExchangeRemoval
 org.apache.qpid.server.store.DurableConfigurationStoreTest#*
 
 // CPP Broker does not follow the same Logging convention as the Java broker
diff --git a/qpid/java/test-profiles/JavaBDBExcludes b/qpid/java/test-profiles/JavaBDBExcludes
index 0750beb..969b927 100644
--- a/qpid/java/test-profiles/JavaBDBExcludes
+++ b/qpid/java/test-profiles/JavaBDBExcludes
@@ -17,7 +17,3 @@
 // under the License.
 //
 
-//This test is subclassed within the bdbstore module to enable it to run and
-//also add some bdb-specific tests. It is excluded to prevent running twice.
-org.apache.qpid.server.store.MessageStoreTest#*
-org.apache.qpid.server.store.DurableConfigurationStoreTest#*
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index 0b06005..ef98882 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -33,19 +33,17 @@
 org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash
 org.apache.qpid.test.unit.xa.TopicTest#testRecover
 
-org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence
-org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval
-org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence
-org.apache.qpid.server.store.MessageStoreTest#testDurableBindingRemoval
-org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence
-org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval
-org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence
-org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessagePersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessageRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testBindingPersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableBindingRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testQueuePersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableQueueRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testExchangePersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableExchangeRemoval
 
 org.apache.qpid.server.store.berkeleydb.*
 
-org.apache.qpid.server.store.DurableConfigurationStoreTest#*
-
 org.apache.qpid.systest.management.jmx.QueueManagementTest#testAlternateExchangeSurvivesRestart
 org.apache.qpid.systest.management.jmx.QueueManagementTest#testQueueDescriptionSurvivesRestart
 org.apache.qpid.systest.management.jmx.QueueManagementTest#testMoveMessageBetweenQueuesWithBrokerRestart