QPID-2973: enable management during tests, and add a test using MaxDeliveryCount with a DurableSubscription

git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1075743 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
index 576bb3c..47bb517 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -21,33 +21,29 @@
 package org.apache.qpid.test.unit.client;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
-import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.CustomJMSXProperty;
-import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.client.configuration.ClientProperties;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.BindingURL;
 
 /**
  * Test that the MaxRedelivery feature works as expected, allowing the client to reject
@@ -65,7 +61,6 @@
 public class MaxDeliveryCountTest extends QpidTestCase
 {
     private static final Logger _logger = Logger.getLogger(MaxDeliveryCountTest.class); 
-    private Queue _queue;
     private boolean _failed;
     private String _failMsg;
     private static final int MSG_COUNT = 15;
@@ -74,28 +69,41 @@
 
     public void setUp() throws Exception
     {
-        super.setUp();
-        String queueName = getTestQueueName();
-        
-        //create an AMQQueue object using a BindingURL to set the Max Delivery Count for the consumer
-        BindingURL burl = new AMQBindingURL("direct://amq.direct//" + queueName + "?maxdeliverycount='" + MAX_DELIVERY_COUNT + "'");
-        _queue = new AMQQueue(burl);
+        //enable DLQ support for all queues at the vhost level
+        setConfigurationProperty("virtualhosts.virtualhost.test.queues.deadLetterQueues",
+                                String.valueOf(true));
 
-        //declare the test queue, using some AMQSession hackery to enable DLQing
+        //Ensure management is on
+        setConfigurationProperty("management.enabled", "true");
+        setConfigurationProperty("management.ssl.enabled", "false");
+
+        //enable max delivery count on all client connections
+        setTestClientSystemProperty(ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME, "2");
+
+        super.setUp();
+
+        boolean durableSub = isDurSubTest();
+
+        //declare the test queue
         Connection consumerConnection = getConnection();
         Session consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put(AMQQueueFactory.X_QPID_DLQ_ENABLED.asString(), true);
-        ((AMQSession<?,?>) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments);
-        ((AMQSession<?,?>) consumerSession).declareAndBind((AMQDestination) new AMQQueue("amq.direct",queueName));
+        Destination destination = getDestination(consumerSession, durableSub);
+        if(durableSub)
+        {
+            consumerSession.createDurableSubscriber((Topic)destination, getName()).close();
+        }
+        else
+        {
+            consumerSession.createConsumer(destination).close();
+        }
+
         consumerConnection.close();
 
         //Create Producer put some messages on the queue
         Connection producerConnection = getConnection();
         producerConnection.start();
-
         Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = producerSession.createProducer(_queue);
+        MessageProducer producer = producerSession.createProducer(getDestination(producerSession, durableSub));
 
         for (int count = 1; count <= MSG_COUNT; count++)
         {
@@ -110,6 +118,18 @@
         _awaitCompletion = new CountDownLatch(1);
     }
 
+    private Destination getDestination(Session consumerSession, boolean durableSub) throws JMSException
+    {
+        if(durableSub)
+        {
+            return consumerSession.createTopic(getTestQueueName());
+        }
+        else
+        {
+            return consumerSession.createQueue(getTestQueueName());
+        }
+    }
+
     private String generateContent(int count)
     {
         return "Message " + count + " content.";
@@ -127,7 +147,7 @@
         redeliverMsgs.add(5);
         redeliverMsgs.add(14);
 
-        doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, false);
+        doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, false, false);
     }
 
     /**
@@ -142,7 +162,7 @@
         redeliverMsgs.add(5);
         redeliverMsgs.add(14);
 
-        doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false);
+        doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, false);
     }
 
     /**
@@ -157,7 +177,7 @@
         redeliverMsgs.add(5);
         redeliverMsgs.add(14);
 
-        doTest(Session.AUTO_ACKNOWLEDGE, redeliverMsgs, false);
+        doTest(Session.AUTO_ACKNOWLEDGE, redeliverMsgs, false, false);
     }
     
     /**
@@ -172,7 +192,7 @@
         redeliverMsgs.add(5);
         redeliverMsgs.add(14);
 
-        doTest(Session.DUPS_OK_ACKNOWLEDGE, redeliverMsgs, false);
+        doTest(Session.DUPS_OK_ACKNOWLEDGE, redeliverMsgs, false, false);
     }
 
     /**
@@ -187,7 +207,7 @@
         redeliverMsgs.add(5);
         redeliverMsgs.add(14);
 
-        doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, true);
+        doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, true, false);
     }
 
     /**
@@ -202,20 +222,43 @@
         redeliverMsgs.add(5);
         redeliverMsgs.add(14);
 
-        doTest(Session.SESSION_TRANSACTED, redeliverMsgs, true);
+        doTest(Session.SESSION_TRANSACTED, redeliverMsgs, true, false);
     }
-    
-    public void doTest(int deliveryMode, ArrayList<Integer> redeliverMsgs, boolean synchronous) throws Exception
+
+    public void testDurableSubscription() throws Exception
+    {
+        final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
+        redeliverMsgs.add(1);
+        redeliverMsgs.add(2);
+        redeliverMsgs.add(5);
+        redeliverMsgs.add(14);
+
+        doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, true);
+    }
+
+    public void doTest(int deliveryMode, ArrayList<Integer> redeliverMsgs, boolean synchronous, boolean durableSub) throws Exception
     {
         Connection clientConnection = getConnection();
         
         boolean transacted = deliveryMode == Session.SESSION_TRANSACTED ? true : false;
         Session clientSession = clientConnection.createSession(transacted, deliveryMode);
 
-        MessageConsumer consumer = clientSession.createConsumer(_queue);
+        MessageConsumer consumer;
+        Destination dest = getDestination(clientSession, durableSub);
+        AMQQueue checkQueue;
+        if(durableSub)
+        {
+            consumer = clientSession.createDurableSubscriber((Topic)dest, getName());
+            checkQueue = new AMQQueue("amq.topic", "clientid" + ":" + getName());
+        }
+        else
+        {
+            consumer = clientSession.createConsumer(dest);
+            checkQueue = (AMQQueue) dest;
+        }
 
         assertEquals("The queue should have " + MSG_COUNT + " msgs at start",
-                MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth((AMQDestination) _queue));
+                MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue));
 
         clientConnection.start();
 
@@ -252,13 +295,10 @@
         consumer.close();
 
         //check the source queue is now empty
-        assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth((AMQDestination) _queue));
+        assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue));
         
         //check the DLQ has the required number of rejected-without-requeue messages
-        String dlQueueName = getTestQueueName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
-        AMQDestination queue =  new AMQQueue("amq.direct", dlQueueName);
-        assertEquals("The DLQ should have " + redeliverMsgs.size() + " msgs on it", redeliverMsgs.size(),
-                        ((AMQSession<?,?>) clientSession).getQueueDepth(queue));
+        verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub);
 
         if(isBrokerStorePersistent())
         {
@@ -273,15 +313,42 @@
         }
 
         //verify the messages on the DLQ
-        verifyDLQcontent(clientConnection, redeliverMsgs, queue);
+        verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub);
 
         clientConnection.close();
     }
 
-    private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, AMQDestination queue) throws JMSException
+    private void verifyDLQdepth(int expected, Session clientSession, boolean durableSub) throws AMQException
+    {
+        AMQDestination checkQueueDLQ;
+        if(durableSub)
+        {
+            checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+        }
+        else
+        {
+            checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+        }
+        
+        assertEquals("The DLQ should have " + expected + " msgs on it", expected,
+                        ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueueDLQ));
+    }
+
+    private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, String destName, boolean durableSub) throws JMSException
     {
         Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = clientSession.createConsumer(queue);
+
+        MessageConsumer consumer;
+        if(durableSub)
+        {
+            consumer = clientSession.createDurableSubscriber(
+                    clientSession.createTopic(destName), getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+        }
+        else
+        {
+            consumer = clientSession.createConsumer(
+                    clientSession.createQueue(destName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX));
+        }
 
         //keep track of the message we expect to still be on the DLQ
         List<Integer> outstandingMessages = new ArrayList<Integer>(redeliverMsgs);
@@ -611,4 +678,9 @@
             }
         }
    }
+
+    private boolean isDurSubTest()
+    {
+        return getTestQueueName().contains("DurableSubscription");
+    }
 }