QPID-8407: [Broker-J][BDB HA] Clean cached sequences and databases on BDB HA node transition into a Master role
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 05733ad..57b1856 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -1561,8 +1561,6 @@
             final List<QueueEntryKey> entries = new ArrayList<>();
             try(Cursor cursor = getDeliveryDb().openCursor(null, null))
             {
-                boolean searchCompletedSuccessfully = false;
-
                 DatabaseEntry key = new DatabaseEntry();
                 DatabaseEntry value = new DatabaseEntry();
                 value.setPartial(0, 0, true);
@@ -1570,19 +1568,9 @@
                 CachingUUIDFactory uuidFactory = new CachingUUIDFactory();
                 QueueEntryBinding.objectToEntry(new QueueEntryKey(queue.getId(), 0L), key);
 
-                if (!searchCompletedSuccessfully && (searchCompletedSuccessfully =
-                        cursor.getSearchKeyRange(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS))
+                if (cursor.getSearchKeyRange(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS)
                 {
-                    QueueEntryKey entry = QueueEntryBinding.entryToObject(uuidFactory, key);
-                    if (entry.getQueueId().equals(queue.getId()))
-                    {
-                        entries.add(entry);
-                    }
-                }
-
-                if (searchCompletedSuccessfully)
-                {
-                    while (cursor.getNext(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS)
+                    do
                     {
                         QueueEntryKey entry = QueueEntryBinding.entryToObject(uuidFactory, key);
                         if (entry.getQueueId().equals(queue.getId()))
@@ -1594,6 +1582,7 @@
                             break;
                         }
                     }
+                    while (cursor.getNext(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS);
                 }
             }
             catch (RuntimeException e)
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index e3b053f..8006cb3 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -996,6 +996,10 @@
                     LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
                     _joinTime = System.currentTimeMillis();
                 }
+                if (state == ReplicatedEnvironment.State.MASTER)
+                {
+                    closeSequencesAndDatabasesSafely();
+                }
             }
 
             StateChangeListener listener = _stateChangeListener.get();
@@ -1366,18 +1370,9 @@
         ReplicatedEnvironment environment = _environment.getAndSet(null);
         if (environment != null)
         {
+            closeSequencesAndDatabasesSafely();
             try
             {
-                try
-                {
-                    closeSequences();
-                    closeDatabases();
-                }
-                catch(Exception e)
-                {
-                    LOGGER.warn("Ignoring an exception whilst closing databases", e);
-                }
-
                 environment.close();
             }
             catch (EnvironmentFailureException efe)
@@ -1387,6 +1382,19 @@
         }
     }
 
+    private void closeSequencesAndDatabasesSafely()
+    {
+        try
+        {
+            closeSequences();
+            closeDatabases();
+        }
+        catch(Exception e)
+        {
+            LOGGER.warn("Ignoring an exception whilst closing databases", e);
+        }
+    }
+
     private void closeSequences()
     {
         RuntimeException firstThrownException = null;
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
index c2009b0..92e253a 100644
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
@@ -23,6 +23,7 @@
 import static org.apache.qpid.systests.Utils.INDEX;
 import static org.apache.qpid.systests.Utils.getReceiveTimeout;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertNotNull;
@@ -311,23 +312,29 @@
                                              final int inactiveBrokerPort,
                                              final int activeBrokerPort) throws Exception
     {
+        transferMasterToNodeWithAmqpPort(connection, inactiveBrokerPort);
+
+        assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
+
+        getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA");
+    }
+
+    private void transferMasterToNodeWithAmqpPort(final Connection connection, final int nodeAmqpPort)
+            throws InterruptedException
+    {
         _failoverListener = new FailoverAwaitingListener();
         getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
 
-        Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
+        Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(nodeAmqpPort);
         assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
-        getBrokerAdmin().setNodeAttributes(inactiveBrokerPort,
+        getBrokerAdmin().setNodeAttributes(nodeAmqpPort,
                                            Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
 
         _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
         LOGGER.info("Listener has finished");
 
-        attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
+        attributes = getBrokerAdmin().getNodeAttributes(nodeAmqpPort);
         assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
-
-        assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
-
-        getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA");
     }
 
     @Test
@@ -870,6 +877,83 @@
         }
     }
 
+    @Test
+    public void testAsynchronousRecoverer() throws Exception
+    {
+        configureAsynchronousRecoveryOnAllNodes();
+
+        final Connection connection = getConnectionBuilder().build();
+        try
+        {
+            getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+            final Destination queue = createTestQueue(connection);
+            int brokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+            LOGGER.info("Sending message 'A' to the node with port {}", brokerPort);
+            Utils.sendTextMessage(connection, queue, "A");
+
+            final int anotherNodePort = getBrokerAdmin().getAmqpPort(brokerPort);
+            LOGGER.info("Changing mastership to the node with port {}", anotherNodePort);
+            transferMasterToNodeWithAmqpPort(connection, anotherNodePort);
+            getBrokerAdmin().awaitNodeRole(brokerPort, "REPLICA", "MASTER");
+
+            LOGGER.info("Sending message 'B' to the node with port {}", anotherNodePort);
+            Utils.sendTextMessage(connection, queue, "B");
+
+            LOGGER.info("Transfer mastership back to broker with port {}", brokerPort);
+            transferMasterToNodeWithAmqpPort(connection, brokerPort);
+            getBrokerAdmin().awaitNodeRole(anotherNodePort, "REPLICA", "MASTER");
+
+            LOGGER.info("Sending message 'C' to the node with port {}", anotherNodePort);
+            Utils.sendTextMessage(connection, queue, "C");
+
+            consumeTextMessages(connection, queue, "A", "B", "C");
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    private void configureAsynchronousRecoveryOnAllNodes()
+    {
+        final GroupBrokerAdmin brokerAdmin = getBrokerAdmin();
+        for (int port : brokerAdmin.getGroupAmqpPorts())
+        {
+            brokerAdmin.setNodeAttributes(port, Collections.singletonMap(BDBHAVirtualHostNode.CONTEXT,
+                                                                         Collections.singletonMap(
+                                                                                 "use_async_message_store_recovery",
+                                                                                 "true")));
+            brokerAdmin.stopNode(port);
+            brokerAdmin.startNode(port);
+            brokerAdmin.awaitNodeRole(port, BDBHARemoteReplicationNode.ROLE, "REPLICA", "MASTER");
+        }
+
+        LOGGER.info("Asynchronous recoverer is configured on all group nodes");
+    }
+
+    private void consumeTextMessages(final Connection connection, final Destination queue, final String... expected)
+            throws JMSException
+    {
+        LOGGER.info("Trying to consume messages: {}", String.join(",", expected));
+        connection.start();
+        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        try
+        {
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            for (String m : expected)
+            {
+                final Message message = consumer.receive(getReceiveTimeout());
+                assertThat(message, is(instanceOf(TextMessage.class)));
+                assertThat(((TextMessage) message).getText(), is(equalTo(m)));
+            }
+        }
+        finally
+        {
+            session.close();
+        }
+    }
+
     private final class FailoverAwaitingListener implements GenericConnectionListener
     {
         private final CountDownLatch _failoverCompletionLatch;