QPID-8407: [Broker-J][BDB HA] Clean cached sequences and databases on BDB HA node transition into a Master role
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 05733ad..57b1856 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -1561,8 +1561,6 @@
final List<QueueEntryKey> entries = new ArrayList<>();
try(Cursor cursor = getDeliveryDb().openCursor(null, null))
{
- boolean searchCompletedSuccessfully = false;
-
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
value.setPartial(0, 0, true);
@@ -1570,19 +1568,9 @@
CachingUUIDFactory uuidFactory = new CachingUUIDFactory();
QueueEntryBinding.objectToEntry(new QueueEntryKey(queue.getId(), 0L), key);
- if (!searchCompletedSuccessfully && (searchCompletedSuccessfully =
- cursor.getSearchKeyRange(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS))
+ if (cursor.getSearchKeyRange(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS)
{
- QueueEntryKey entry = QueueEntryBinding.entryToObject(uuidFactory, key);
- if (entry.getQueueId().equals(queue.getId()))
- {
- entries.add(entry);
- }
- }
-
- if (searchCompletedSuccessfully)
- {
- while (cursor.getNext(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS)
+ do
{
QueueEntryKey entry = QueueEntryBinding.entryToObject(uuidFactory, key);
if (entry.getQueueId().equals(queue.getId()))
@@ -1594,6 +1582,7 @@
break;
}
}
+ while (cursor.getNext(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS);
}
}
catch (RuntimeException e)
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index e3b053f..8006cb3 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -996,6 +996,10 @@
LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
_joinTime = System.currentTimeMillis();
}
+ if (state == ReplicatedEnvironment.State.MASTER)
+ {
+ closeSequencesAndDatabasesSafely();
+ }
}
StateChangeListener listener = _stateChangeListener.get();
@@ -1366,18 +1370,9 @@
ReplicatedEnvironment environment = _environment.getAndSet(null);
if (environment != null)
{
+ closeSequencesAndDatabasesSafely();
try
{
- try
- {
- closeSequences();
- closeDatabases();
- }
- catch(Exception e)
- {
- LOGGER.warn("Ignoring an exception whilst closing databases", e);
- }
-
environment.close();
}
catch (EnvironmentFailureException efe)
@@ -1387,6 +1382,19 @@
}
}
+ private void closeSequencesAndDatabasesSafely()
+ {
+ try
+ {
+ closeSequences();
+ closeDatabases();
+ }
+ catch(Exception e)
+ {
+ LOGGER.warn("Ignoring an exception whilst closing databases", e);
+ }
+ }
+
private void closeSequences()
{
RuntimeException firstThrownException = null;
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
index c2009b0..92e253a 100644
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
@@ -23,6 +23,7 @@
import static org.apache.qpid.systests.Utils.INDEX;
import static org.apache.qpid.systests.Utils.getReceiveTimeout;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertNotNull;
@@ -311,23 +312,29 @@
final int inactiveBrokerPort,
final int activeBrokerPort) throws Exception
{
+ transferMasterToNodeWithAmqpPort(connection, inactiveBrokerPort);
+
+ assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
+
+ getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA");
+ }
+
+ private void transferMasterToNodeWithAmqpPort(final Connection connection, final int nodeAmqpPort)
+ throws InterruptedException
+ {
_failoverListener = new FailoverAwaitingListener();
getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
- Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
+ Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(nodeAmqpPort);
assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
- getBrokerAdmin().setNodeAttributes(inactiveBrokerPort,
+ getBrokerAdmin().setNodeAttributes(nodeAmqpPort,
Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
_failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
LOGGER.info("Listener has finished");
- attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
+ attributes = getBrokerAdmin().getNodeAttributes(nodeAmqpPort);
assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
-
- assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
-
- getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA");
}
@Test
@@ -870,6 +877,83 @@
}
}
+ @Test
+ public void testAsynchronousRecoverer() throws Exception
+ {
+ configureAsynchronousRecoveryOnAllNodes();
+
+ final Connection connection = getConnectionBuilder().build();
+ try
+ {
+ getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+ final Destination queue = createTestQueue(connection);
+ int brokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+ LOGGER.info("Sending message 'A' to the node with port {}", brokerPort);
+ Utils.sendTextMessage(connection, queue, "A");
+
+ final int anotherNodePort = getBrokerAdmin().getAmqpPort(brokerPort);
+ LOGGER.info("Changing mastership to the node with port {}", anotherNodePort);
+ transferMasterToNodeWithAmqpPort(connection, anotherNodePort);
+ getBrokerAdmin().awaitNodeRole(brokerPort, "REPLICA", "MASTER");
+
+ LOGGER.info("Sending message 'B' to the node with port {}", anotherNodePort);
+ Utils.sendTextMessage(connection, queue, "B");
+
+ LOGGER.info("Transfer mastership back to broker with port {}", brokerPort);
+ transferMasterToNodeWithAmqpPort(connection, brokerPort);
+ getBrokerAdmin().awaitNodeRole(anotherNodePort, "REPLICA", "MASTER");
+
+ LOGGER.info("Sending message 'C' to the node with port {}", anotherNodePort);
+ Utils.sendTextMessage(connection, queue, "C");
+
+ consumeTextMessages(connection, queue, "A", "B", "C");
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ private void configureAsynchronousRecoveryOnAllNodes()
+ {
+ final GroupBrokerAdmin brokerAdmin = getBrokerAdmin();
+ for (int port : brokerAdmin.getGroupAmqpPorts())
+ {
+ brokerAdmin.setNodeAttributes(port, Collections.singletonMap(BDBHAVirtualHostNode.CONTEXT,
+ Collections.singletonMap(
+ "use_async_message_store_recovery",
+ "true")));
+ brokerAdmin.stopNode(port);
+ brokerAdmin.startNode(port);
+ brokerAdmin.awaitNodeRole(port, BDBHARemoteReplicationNode.ROLE, "REPLICA", "MASTER");
+ }
+
+ LOGGER.info("Asynchronous recoverer is configured on all group nodes");
+ }
+
+ private void consumeTextMessages(final Connection connection, final Destination queue, final String... expected)
+ throws JMSException
+ {
+ LOGGER.info("Trying to consume messages: {}", String.join(",", expected));
+ connection.start();
+ final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ for (String m : expected)
+ {
+ final Message message = consumer.receive(getReceiveTimeout());
+ assertThat(message, is(instanceOf(TextMessage.class)));
+ assertThat(((TextMessage) message).getText(), is(equalTo(m)));
+ }
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+
private final class FailoverAwaitingListener implements GenericConnectionListener
{
private final CountDownLatch _failoverCompletionLatch;