QPID-7546: [System Tests] Enable QueueMessageDurabilityTest on AMQP 1-0 persistent profiles
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java b/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
index 8637b88..a4dbc49 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
@@ -90,5 +90,7 @@
 
     long getQueueDepth(Connection con, Queue destination) throws Exception;
 
+    boolean isQueueExist(Connection con, Queue destination) throws Exception;
+
     String getBrokerDetailsFromDefaultConnectionUrl();
 }
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 57ae8e3..0b92652 100755
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -383,6 +383,11 @@
         return _jmsProvider.getQueueDepth(con, destination);
     }
 
+    public boolean isQueueExist(final Connection con, final Queue destination) throws Exception
+    {
+        return _jmsProvider.isQueueExist(con, destination);
+    }
+
     /**
      * Send messages to the given destination.
      * <p/>
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
index 01cf5a0..f9e865c 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
@@ -220,7 +220,7 @@
     @Override
     public Queue getQueueFromName(Session session, String name) throws JMSException
     {
-        return session.createQueue("ADDR: '" + name + "'");
+        return new AMQQueue("", name);
     }
 
     @Override
@@ -284,6 +284,21 @@
         }
     }
 
+    @Override
+    public boolean isQueueExist(final Connection con, final Queue destination) throws Exception
+    {
+        Queue queue = new AMQQueue("", destination.getQueueName());
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        try
+        {
+            return ((AMQSession<?, ?>) session).isQueueBound((AMQDestination) queue);
+        }
+        finally
+        {
+            session.close();
+        }
+    }
+
     public String getBrokerDetailsFromDefaultConnectionUrl()
     {
         try
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
index 319ee39..ad41352 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
@@ -381,6 +381,52 @@
     }
 
     @Override
+    public boolean isQueueExist(final Connection con, final Queue destination) throws Exception
+    {
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        try
+        {
+            MessageProducer producer = session.createProducer(session.createQueue("$management"));
+            final TemporaryQueue responseQ = session.createTemporaryQueue();
+            MessageConsumer consumer = session.createConsumer(responseQ);
+            MapMessage message = session.createMapMessage();
+            message.setStringProperty("index", "object-path");
+            final String escapedName = destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
+            message.setStringProperty("key", escapedName);
+            message.setStringProperty("type", "org.apache.qpid.Queue");
+            message.setStringProperty("operation", "READ");
+
+            message.setJMSReplyTo(responseQ);
+
+            producer.send(message);
+
+            Message response = consumer.receive();
+            try
+            {
+                int statusCode = response.getIntProperty("statusCode");
+                switch(statusCode)
+                {
+                    case 200:
+                        return true;
+                    case 404:
+                        return false;
+                    default:
+                        throw new RuntimeException(String.format("Unexpected response for queue query '%s' :  %d", destination.getQueueName(), statusCode));
+                }
+            }
+            finally
+            {
+                consumer.close();
+                responseQ.delete();
+            }
+        }
+        finally
+        {
+            session.close();
+        }
+    }
+
+    @Override
     public Connection getConnectionWithSyncPublishing() throws Exception
     {
         return getConnection();
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java b/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
index e6620f7..8ed84eb 100644
--- a/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
@@ -25,161 +25,143 @@
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.naming.NamingException;
 
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 public class QueueMessageDurabilityTest extends QpidBrokerTestCase
 {
-
-    private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
     private static final String DURABLE_ALWAYS_PERSIST_NAME = "DURABLE_QUEUE_ALWAYS_PERSIST";
     private static final String DURABLE_NEVER_PERSIST_NAME = "DURABLE_QUEUE_NEVER_PERSIST";
     private static final String DURABLE_DEFAULT_PERSIST_NAME = "DURABLE_QUEUE_DEFAULT_PERSIST";
     private static final String NONDURABLE_ALWAYS_PERSIST_NAME = "NONDURABLE_QUEUE_ALWAYS_PERSIST";
+    private Queue _durableAlwaysPersist;
+    private Queue _durableNeverPersist;
+    private Queue _durableDefaultPersist;
+    private Queue _nonDurableAlwaysPersist;
+    private String _topicNameFormat;
 
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
-        Connection conn = getConnection();
+        Connection conn = createStartedConnection();
         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        AMQSession amqSession = (AMQSession) session;
 
         Map<String,Object> arguments = new HashMap<>();
-        arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.ALWAYS.name());
-        amqSession.createQueue(DURABLE_ALWAYS_PERSIST_NAME, false, true, false, arguments);
+        arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY, MessageDurability.ALWAYS.name());
+        arguments.put(org.apache.qpid.server.model.Queue.DURABLE, true);
+        _durableAlwaysPersist = createQueueWithArguments(session, DURABLE_ALWAYS_PERSIST_NAME, arguments);
 
         arguments = new HashMap<>();
-        arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name());
-        amqSession.createQueue(DURABLE_NEVER_PERSIST_NAME, false, true, false, arguments);
+        arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY, MessageDurability.NEVER.name());
+        arguments.put(org.apache.qpid.server.model.Queue.DURABLE, true);
+        _durableNeverPersist = createQueueWithArguments(session, DURABLE_NEVER_PERSIST_NAME, arguments);
 
         arguments = new HashMap<>();
-        arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.DEFAULT.name());
-        amqSession.createQueue(DURABLE_DEFAULT_PERSIST_NAME, false, true, false, arguments);
+        arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY, MessageDurability.DEFAULT.name());
+        arguments.put(org.apache.qpid.server.model.Queue.DURABLE, true);
+        _durableDefaultPersist = createQueueWithArguments(session, DURABLE_DEFAULT_PERSIST_NAME, arguments);
 
         arguments = new HashMap<>();
-        arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
-        amqSession.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME, false, false, false, arguments);
+        arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
+        arguments.put(org.apache.qpid.server.model.Queue.DURABLE, false);
+        _nonDurableAlwaysPersist = createQueueWithArguments(session, NONDURABLE_ALWAYS_PERSIST_NAME, arguments);
 
-        amqSession.bindQueue(DURABLE_ALWAYS_PERSIST_NAME,
-                             "Y.*.*.*",
-                             null,
-                             ExchangeDefaults.TOPIC_EXCHANGE_NAME,
-                             null);
+        bindQueue(session, "amq.topic", DURABLE_ALWAYS_PERSIST_NAME, "Y.*.*.*");
+        bindQueue(session, "amq.topic", DURABLE_NEVER_PERSIST_NAME, "*.Y.*.*");
+        bindQueue(session, "amq.topic", DURABLE_DEFAULT_PERSIST_NAME, "*.*.Y.*");
+        bindQueue(session, "amq.topic", NONDURABLE_ALWAYS_PERSIST_NAME, "*.*.*.Y");
 
-        amqSession.bindQueue(DURABLE_NEVER_PERSIST_NAME,
-                             "*.Y.*.*",
-                             null,
-                             ExchangeDefaults.TOPIC_EXCHANGE_NAME,
-                             null);
+        _topicNameFormat = isBroker10() ? "amq.topic/%s" : "%s";
 
-        amqSession.bindQueue(DURABLE_DEFAULT_PERSIST_NAME,
-                             "*.*.Y.*",
-                             null,
-                             ExchangeDefaults.TOPIC_EXCHANGE_NAME,
-                             null);
-
-        amqSession.bindQueue(NONDURABLE_ALWAYS_PERSIST_NAME,
-                             "*.*.*.Y",
-                             null,
-                             ExchangeDefaults.TOPIC_EXCHANGE_NAME,
-                             null);
+        conn.close();
     }
 
     public void testSendPersistentMessageToAll() throws Exception
     {
-        Connection conn = getConnection();
+        Connection conn = createStartedConnection();
         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
         MessageProducer producer = session.createProducer(null);
         conn.start();
-        producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+        producer.send(session.createTopic(String.format(_topicNameFormat, "Y.Y.Y.Y")), session.createTextMessage("test"));
         session.commit();
 
-        AMQSession amqSession = (AMQSession) session;
-        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
-        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+        assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+        assertEquals(1, getQueueDepth(conn, _durableNeverPersist));
+        assertEquals(1, getQueueDepth(conn, _durableDefaultPersist));
+        assertEquals(1, getQueueDepth(conn,_nonDurableAlwaysPersist));
 
         restartDefaultBroker();
 
-        conn = getConnection();
-        session = conn.createSession(true, Session.SESSION_TRANSACTED);
-        amqSession = (AMQSession) session;
-        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-        assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+        conn = createStartedConnection();
+        assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+        assertEquals(0, getQueueDepth(conn, _durableNeverPersist));
+        assertEquals(1, getQueueDepth(conn, _durableDefaultPersist));
 
-        assertFalse(amqSession.isQueueBound((AMQDestination) session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
-
+        assertFalse(isQueueExist(conn, _nonDurableAlwaysPersist));
     }
 
-
     public void testSendNonPersistentMessageToAll() throws Exception
     {
-        Connection conn = getConnection();
+        Connection conn = createStartedConnection();
         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
         MessageProducer producer = session.createProducer(null);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
         conn.start();
-        producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+        producer.send(session.createTopic(String.format(_topicNameFormat, "Y.Y.Y.Y")), session.createTextMessage("test"));
         session.commit();
 
-        AMQSession amqSession = (AMQSession) session;
-        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
-        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+        assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+        assertEquals(1, getQueueDepth(conn, _durableNeverPersist));
+        assertEquals(1, getQueueDepth(conn, _durableDefaultPersist));
+        assertEquals(1, getQueueDepth(conn,_nonDurableAlwaysPersist));
 
         restartDefaultBroker();
 
-        conn = getConnection();
-        session = conn.createSession(true, Session.SESSION_TRANSACTED);
-        amqSession = (AMQSession) session;
-        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-        assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-        assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+        conn = createStartedConnection();
+        assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+        assertEquals(0, getQueueDepth(conn, _durableNeverPersist));
+        assertEquals(0, getQueueDepth(conn, _durableDefaultPersist));
 
-        assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+        assertFalse(isQueueExist(conn, _nonDurableAlwaysPersist));
 
     }
 
     public void testNonPersistentContentRetained() throws Exception
     {
-        Connection conn = getConnection();
+        Connection conn = createStartedConnection();
         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
         MessageProducer producer = session.createProducer(null);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
         conn.start();
-        producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
-        producer.send(session.createTopic("Y.N.Y.Y"), session.createTextMessage("test2"));
+        producer.send(session.createTopic(String.format(_topicNameFormat, "N.N.Y.Y")), session.createTextMessage("test1"));
+        producer.send(session.createTopic(String.format(_topicNameFormat, "Y.N.Y.Y")), session.createTextMessage("test2"));
         session.commit();
-        MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
-        Message msg = consumer.receive(1000l);
+        MessageConsumer consumer = session.createConsumer(_durableAlwaysPersist);
+        Message msg = consumer.receive(getReceiveTimeout());
         assertNotNull(msg);
         assertTrue(msg instanceof TextMessage);
         assertEquals("test2", ((TextMessage) msg).getText());
         session.rollback();
         restartDefaultBroker();
-        conn = getConnection();
-        conn.start();
+        conn = createStartedConnection();
         session = conn.createSession(true, Session.SESSION_TRANSACTED);
-        AMQSession amqSession = (AMQSession) session;
-        assertEquals(1, amqSession.getQueueDepth((AMQDestination) session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-        assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-        assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
-        consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
-        msg = consumer.receive(1000l);
+        assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+        assertEquals(0, getQueueDepth(conn, _durableNeverPersist));
+        assertEquals(0, getQueueDepth(conn, _durableDefaultPersist));
+
+        consumer = session.createConsumer(_durableAlwaysPersist);
+        msg = consumer.receive(getReceiveTimeout());
         assertNotNull(msg);
         assertTrue(msg instanceof TextMessage);
         assertEquals("test2", ((TextMessage)msg).getText());
@@ -189,27 +171,51 @@
     public void testPersistentContentRetainedOnTransientQueue() throws Exception
     {
         setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
-        Connection conn = getConnection();
+        Connection conn = createStartedConnection();
         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
         MessageProducer producer = session.createProducer(null);
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
         conn.start();
-        producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
+        producer.send(session.createTopic(String.format(_topicNameFormat, "N.N.Y.Y")), session.createTextMessage("test1"));
         session.commit();
-        MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME));
-        Message msg = consumer.receive(1000l);
+        MessageConsumer consumer = session.createConsumer(_durableDefaultPersist);
+        Message msg = consumer.receive(getReceiveTimeout());
         assertNotNull(msg);
         assertTrue(msg instanceof TextMessage);
         assertEquals("test1", ((TextMessage)msg).getText());
         session.commit();
         System.gc();
-        consumer = session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME));
-        msg = consumer.receive(1000l);
+        consumer = session.createConsumer(_nonDurableAlwaysPersist);
+        msg = consumer.receive(getReceiveTimeout());
         assertNotNull(msg);
         assertTrue(msg instanceof TextMessage);
         assertEquals("test1", ((TextMessage)msg).getText());
         session.commit();
     }
 
+    private Connection createStartedConnection() throws JMSException, NamingException
+    {
+        Connection conn = getConnection();
+        conn.start();
+        return conn;
+    }
+
+    private Queue createQueueWithArguments(final Session session,
+                                           final String testQueueName,
+                                           final Map<String, Object> arguments) throws Exception
+    {
+        createEntityUsingAmqpManagement(testQueueName, session, "org.apache.qpid.Queue", arguments);
+        return getQueueFromName(session, testQueueName);
+    }
+
+    private void bindQueue(final Session session, final String exchange, final String queueName,
+                           final String bindingKey) throws Exception
+    {
+
+        final Map<String, Object> arguments = new HashMap<>();
+        arguments.put("destination", queueName);
+        arguments.put("bindingKey", bindingKey);
+        performOperationUsingAmqpManagement(exchange, "bind", session, "org.apache.qpid.TopicExchange", arguments);
+    }
 
 }