QPID-7546: [System Tests] Enable QueueMessageDurabilityTest on AMQP 1-0 persistent profiles
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java b/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
index 8637b88..a4dbc49 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
@@ -90,5 +90,7 @@
long getQueueDepth(Connection con, Queue destination) throws Exception;
+ boolean isQueueExist(Connection con, Queue destination) throws Exception;
+
String getBrokerDetailsFromDefaultConnectionUrl();
}
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 57ae8e3..0b92652 100755
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -383,6 +383,11 @@
return _jmsProvider.getQueueDepth(con, destination);
}
+ public boolean isQueueExist(final Connection con, final Queue destination) throws Exception
+ {
+ return _jmsProvider.isQueueExist(con, destination);
+ }
+
/**
* Send messages to the given destination.
* <p/>
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
index 01cf5a0..f9e865c 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
@@ -220,7 +220,7 @@
@Override
public Queue getQueueFromName(Session session, String name) throws JMSException
{
- return session.createQueue("ADDR: '" + name + "'");
+ return new AMQQueue("", name);
}
@Override
@@ -284,6 +284,21 @@
}
}
+ @Override
+ public boolean isQueueExist(final Connection con, final Queue destination) throws Exception
+ {
+ Queue queue = new AMQQueue("", destination.getQueueName());
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ return ((AMQSession<?, ?>) session).isQueueBound((AMQDestination) queue);
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+
public String getBrokerDetailsFromDefaultConnectionUrl()
{
try
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
index 319ee39..ad41352 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
@@ -381,6 +381,52 @@
}
@Override
+ public boolean isQueueExist(final Connection con, final Queue destination) throws Exception
+ {
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ MessageProducer producer = session.createProducer(session.createQueue("$management"));
+ final TemporaryQueue responseQ = session.createTemporaryQueue();
+ MessageConsumer consumer = session.createConsumer(responseQ);
+ MapMessage message = session.createMapMessage();
+ message.setStringProperty("index", "object-path");
+ final String escapedName = destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
+ message.setStringProperty("key", escapedName);
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "READ");
+
+ message.setJMSReplyTo(responseQ);
+
+ producer.send(message);
+
+ Message response = consumer.receive();
+ try
+ {
+ int statusCode = response.getIntProperty("statusCode");
+ switch(statusCode)
+ {
+ case 200:
+ return true;
+ case 404:
+ return false;
+ default:
+ throw new RuntimeException(String.format("Unexpected response for queue query '%s' : %d", destination.getQueueName(), statusCode));
+ }
+ }
+ finally
+ {
+ consumer.close();
+ responseQ.delete();
+ }
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+
+ @Override
public Connection getConnectionWithSyncPublishing() throws Exception
{
return getConnection();
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java b/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
index e6620f7..8ed84eb 100644
--- a/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
@@ -25,161 +25,143 @@
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.naming.NamingException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class QueueMessageDurabilityTest extends QpidBrokerTestCase
{
-
- private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
private static final String DURABLE_ALWAYS_PERSIST_NAME = "DURABLE_QUEUE_ALWAYS_PERSIST";
private static final String DURABLE_NEVER_PERSIST_NAME = "DURABLE_QUEUE_NEVER_PERSIST";
private static final String DURABLE_DEFAULT_PERSIST_NAME = "DURABLE_QUEUE_DEFAULT_PERSIST";
private static final String NONDURABLE_ALWAYS_PERSIST_NAME = "NONDURABLE_QUEUE_ALWAYS_PERSIST";
+ private Queue _durableAlwaysPersist;
+ private Queue _durableNeverPersist;
+ private Queue _durableDefaultPersist;
+ private Queue _nonDurableAlwaysPersist;
+ private String _topicNameFormat;
@Override
public void setUp() throws Exception
{
super.setUp();
- Connection conn = getConnection();
+ Connection conn = createStartedConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQSession amqSession = (AMQSession) session;
Map<String,Object> arguments = new HashMap<>();
- arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.ALWAYS.name());
- amqSession.createQueue(DURABLE_ALWAYS_PERSIST_NAME, false, true, false, arguments);
+ arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY, MessageDurability.ALWAYS.name());
+ arguments.put(org.apache.qpid.server.model.Queue.DURABLE, true);
+ _durableAlwaysPersist = createQueueWithArguments(session, DURABLE_ALWAYS_PERSIST_NAME, arguments);
arguments = new HashMap<>();
- arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name());
- amqSession.createQueue(DURABLE_NEVER_PERSIST_NAME, false, true, false, arguments);
+ arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY, MessageDurability.NEVER.name());
+ arguments.put(org.apache.qpid.server.model.Queue.DURABLE, true);
+ _durableNeverPersist = createQueueWithArguments(session, DURABLE_NEVER_PERSIST_NAME, arguments);
arguments = new HashMap<>();
- arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.DEFAULT.name());
- amqSession.createQueue(DURABLE_DEFAULT_PERSIST_NAME, false, true, false, arguments);
+ arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY, MessageDurability.DEFAULT.name());
+ arguments.put(org.apache.qpid.server.model.Queue.DURABLE, true);
+ _durableDefaultPersist = createQueueWithArguments(session, DURABLE_DEFAULT_PERSIST_NAME, arguments);
arguments = new HashMap<>();
- arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
- amqSession.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME, false, false, false, arguments);
+ arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
+ arguments.put(org.apache.qpid.server.model.Queue.DURABLE, false);
+ _nonDurableAlwaysPersist = createQueueWithArguments(session, NONDURABLE_ALWAYS_PERSIST_NAME, arguments);
- amqSession.bindQueue(DURABLE_ALWAYS_PERSIST_NAME,
- "Y.*.*.*",
- null,
- ExchangeDefaults.TOPIC_EXCHANGE_NAME,
- null);
+ bindQueue(session, "amq.topic", DURABLE_ALWAYS_PERSIST_NAME, "Y.*.*.*");
+ bindQueue(session, "amq.topic", DURABLE_NEVER_PERSIST_NAME, "*.Y.*.*");
+ bindQueue(session, "amq.topic", DURABLE_DEFAULT_PERSIST_NAME, "*.*.Y.*");
+ bindQueue(session, "amq.topic", NONDURABLE_ALWAYS_PERSIST_NAME, "*.*.*.Y");
- amqSession.bindQueue(DURABLE_NEVER_PERSIST_NAME,
- "*.Y.*.*",
- null,
- ExchangeDefaults.TOPIC_EXCHANGE_NAME,
- null);
+ _topicNameFormat = isBroker10() ? "amq.topic/%s" : "%s";
- amqSession.bindQueue(DURABLE_DEFAULT_PERSIST_NAME,
- "*.*.Y.*",
- null,
- ExchangeDefaults.TOPIC_EXCHANGE_NAME,
- null);
-
- amqSession.bindQueue(NONDURABLE_ALWAYS_PERSIST_NAME,
- "*.*.*.Y",
- null,
- ExchangeDefaults.TOPIC_EXCHANGE_NAME,
- null);
+ conn.close();
}
public void testSendPersistentMessageToAll() throws Exception
{
- Connection conn = getConnection();
+ Connection conn = createStartedConnection();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(null);
conn.start();
- producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+ producer.send(session.createTopic(String.format(_topicNameFormat, "Y.Y.Y.Y")), session.createTextMessage("test"));
session.commit();
- AMQSession amqSession = (AMQSession) session;
- assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
- assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
- assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
- assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+ assertEquals(1, getQueueDepth(conn, _durableNeverPersist));
+ assertEquals(1, getQueueDepth(conn, _durableDefaultPersist));
+ assertEquals(1, getQueueDepth(conn,_nonDurableAlwaysPersist));
restartDefaultBroker();
- conn = getConnection();
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
- amqSession = (AMQSession) session;
- assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
- assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
- assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ conn = createStartedConnection();
+ assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+ assertEquals(0, getQueueDepth(conn, _durableNeverPersist));
+ assertEquals(1, getQueueDepth(conn, _durableDefaultPersist));
- assertFalse(amqSession.isQueueBound((AMQDestination) session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
-
+ assertFalse(isQueueExist(conn, _nonDurableAlwaysPersist));
}
-
public void testSendNonPersistentMessageToAll() throws Exception
{
- Connection conn = getConnection();
+ Connection conn = createStartedConnection();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
conn.start();
- producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+ producer.send(session.createTopic(String.format(_topicNameFormat, "Y.Y.Y.Y")), session.createTextMessage("test"));
session.commit();
- AMQSession amqSession = (AMQSession) session;
- assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
- assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
- assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
- assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+ assertEquals(1, getQueueDepth(conn, _durableNeverPersist));
+ assertEquals(1, getQueueDepth(conn, _durableDefaultPersist));
+ assertEquals(1, getQueueDepth(conn,_nonDurableAlwaysPersist));
restartDefaultBroker();
- conn = getConnection();
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
- amqSession = (AMQSession) session;
- assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
- assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
- assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ conn = createStartedConnection();
+ assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+ assertEquals(0, getQueueDepth(conn, _durableNeverPersist));
+ assertEquals(0, getQueueDepth(conn, _durableDefaultPersist));
- assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+ assertFalse(isQueueExist(conn, _nonDurableAlwaysPersist));
}
public void testNonPersistentContentRetained() throws Exception
{
- Connection conn = getConnection();
+ Connection conn = createStartedConnection();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
conn.start();
- producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
- producer.send(session.createTopic("Y.N.Y.Y"), session.createTextMessage("test2"));
+ producer.send(session.createTopic(String.format(_topicNameFormat, "N.N.Y.Y")), session.createTextMessage("test1"));
+ producer.send(session.createTopic(String.format(_topicNameFormat, "Y.N.Y.Y")), session.createTextMessage("test2"));
session.commit();
- MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
- Message msg = consumer.receive(1000l);
+ MessageConsumer consumer = session.createConsumer(_durableAlwaysPersist);
+ Message msg = consumer.receive(getReceiveTimeout());
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("test2", ((TextMessage) msg).getText());
session.rollback();
restartDefaultBroker();
- conn = getConnection();
- conn.start();
+ conn = createStartedConnection();
session = conn.createSession(true, Session.SESSION_TRANSACTED);
- AMQSession amqSession = (AMQSession) session;
- assertEquals(1, amqSession.getQueueDepth((AMQDestination) session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
- assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
- assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
- consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
- msg = consumer.receive(1000l);
+ assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+ assertEquals(0, getQueueDepth(conn, _durableNeverPersist));
+ assertEquals(0, getQueueDepth(conn, _durableDefaultPersist));
+
+ consumer = session.createConsumer(_durableAlwaysPersist);
+ msg = consumer.receive(getReceiveTimeout());
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("test2", ((TextMessage)msg).getText());
@@ -189,27 +171,51 @@
public void testPersistentContentRetainedOnTransientQueue() throws Exception
{
setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
- Connection conn = getConnection();
+ Connection conn = createStartedConnection();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
conn.start();
- producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
+ producer.send(session.createTopic(String.format(_topicNameFormat, "N.N.Y.Y")), session.createTextMessage("test1"));
session.commit();
- MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME));
- Message msg = consumer.receive(1000l);
+ MessageConsumer consumer = session.createConsumer(_durableDefaultPersist);
+ Message msg = consumer.receive(getReceiveTimeout());
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("test1", ((TextMessage)msg).getText());
session.commit();
System.gc();
- consumer = session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME));
- msg = consumer.receive(1000l);
+ consumer = session.createConsumer(_nonDurableAlwaysPersist);
+ msg = consumer.receive(getReceiveTimeout());
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("test1", ((TextMessage)msg).getText());
session.commit();
}
+ private Connection createStartedConnection() throws JMSException, NamingException
+ {
+ Connection conn = getConnection();
+ conn.start();
+ return conn;
+ }
+
+ private Queue createQueueWithArguments(final Session session,
+ final String testQueueName,
+ final Map<String, Object> arguments) throws Exception
+ {
+ createEntityUsingAmqpManagement(testQueueName, session, "org.apache.qpid.Queue", arguments);
+ return getQueueFromName(session, testQueueName);
+ }
+
+ private void bindQueue(final Session session, final String exchange, final String queueName,
+ final String bindingKey) throws Exception
+ {
+
+ final Map<String, Object> arguments = new HashMap<>();
+ arguments.put("destination", queueName);
+ arguments.put("bindingKey", bindingKey);
+ performOperationUsingAmqpManagement(exchange, "bind", session, "org.apache.qpid.TopicExchange", arguments);
+ }
}