QPID-7603: [Java Broker, AMQP 1.0] Add support for maximum delivery count
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 4da1af4..02f33db 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -31,9 +31,14 @@
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
@@ -49,13 +54,16 @@
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 
 class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
 {
@@ -375,7 +383,7 @@
                     txn.dequeue(_queueEntry.getEnqueueRecord(),
                                 new ServerTransaction.Action()
                                 {
-
+                                    @Override
                                     public void postCommit()
                                     {
                                         if (_queueEntry.isAcquiredBy(getConsumer()))
@@ -384,6 +392,7 @@
                                         }
                                     }
 
+                                    @Override
                                     public void onRollback()
                                     {
 
@@ -392,6 +401,7 @@
                 }
                 txn.addPostTransactionAction(new ServerTransaction.Action()
                     {
+                        @Override
                         public void postCommit()
                         {
                             if(Boolean.TRUE.equals(settled))
@@ -405,16 +415,13 @@
                             _link.getEndpoint().sendFlowConditional();
                         }
 
+                        @Override
                         public void onRollback()
                         {
                             if(Boolean.TRUE.equals(settled))
                             {
-                                final Modified modified = new Modified();
-                                modified.setDeliveryFailed(true);
-                                _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
-                                _link.getEndpoint().sendFlowConditional();
-                                _queueEntry.incrementDeliveryCount();
-                                _queueEntry.release(getConsumer());
+                                // TODO: apply source's default outcome
+                                applyModifiedOutcome();
                             }
                         }
                     });
@@ -423,6 +430,7 @@
             {
                 txn.addPostTransactionAction(new ServerTransaction.Action()
                 {
+                    @Override
                     public void postCommit()
                     {
 
@@ -430,9 +438,12 @@
                         _link.getEndpoint().settle(_deliveryTag);
                     }
 
+                    @Override
                     public void onRollback()
                     {
                         _link.getEndpoint().settle(_deliveryTag);
+
+                        // TODO: apply source's default outcome if settled
                     }
                 });
             }
@@ -441,25 +452,52 @@
             {
                 txn.addPostTransactionAction(new ServerTransaction.Action()
                 {
+                    @Override
                     public void postCommit()
                     {
+                        // TODO: add handling of undeliverable-here
 
-                        _queueEntry.release(getConsumer());
                         if(Boolean.TRUE.equals(((Modified)outcome).getDeliveryFailed()))
                         {
-                            _queueEntry.incrementDeliveryCount();
+                            incrementDeliveryCountOrRouteToAlternateOrDiscard();
+                        }
+                        else
+                        {
+                            _queueEntry.release(getConsumer());
                         }
                         _link.getEndpoint().settle(_deliveryTag);
                     }
 
+                    @Override
                     public void onRollback()
                     {
                         if(Boolean.TRUE.equals(settled))
                         {
-                            final Modified modified = new Modified();
-                            modified.setDeliveryFailed(true);
-                            _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
-                            _link.getEndpoint().sendFlowConditional();
+                            // TODO: apply source's default outcome
+                            applyModifiedOutcome();
+                        }
+                    }
+                });
+            }
+            else if (outcome instanceof Rejected)
+            {
+                txn.addPostTransactionAction(new ServerTransaction.Action()
+                {
+                    @Override
+                    public void postCommit()
+                    {
+                        _link.getEndpoint().settle(_deliveryTag);
+                        incrementDeliveryCountOrRouteToAlternateOrDiscard();
+                        _link.getEndpoint().sendFlowConditional();
+                    }
+
+                    @Override
+                    public void onRollback()
+                    {
+                        if(Boolean.TRUE.equals(settled))
+                        {
+                            // TODO: apply source's default outcome
+                            applyModifiedOutcome();
                         }
                     }
                 });
@@ -467,6 +505,77 @@
 
             return (transactionId == null && outcome != null);
         }
+
+        private void applyModifiedOutcome()
+        {
+            final Modified modified = new Modified();
+            modified.setDeliveryFailed(true);
+            _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
+            _link.getEndpoint().sendFlowConditional();
+            incrementDeliveryCountOrRouteToAlternateOrDiscard();
+        }
+
+        private void incrementDeliveryCountOrRouteToAlternateOrDiscard()
+        {
+            _queueEntry.incrementDeliveryCount();
+            if (_queueEntry.getMaximumDeliveryCount() > 0
+                && _queueEntry.getDeliveryCount() >= _queueEntry.getMaximumDeliveryCount())
+            {
+                routeToAlternateOrDiscard();
+            }
+            else
+            {
+                _queueEntry.release(getConsumer());
+            }
+        }
+
+        private void routeToAlternateOrDiscard()
+        {
+            final Session_1_0 session = _link.getSession();
+            final ServerMessage message = _queueEntry.getMessage();
+            final EventLogger eventLogger = session.getEventLogger();
+            final LogSubject logSubject = session.getLogSubject();
+            int requeues = 0;
+            if (_queueEntry.makeAcquisitionUnstealable(getConsumer()))
+            {
+                requeues = _queueEntry.routeToAlternate(new Action<MessageInstance>()
+                {
+                    @Override
+                    public void performAction(final MessageInstance requeueEntry)
+                    {
+
+                        eventLogger.message(logSubject,
+                                            ChannelMessages.DEADLETTERMSG(message.getMessageNumber(),
+                                                                          requeueEntry.getOwningResource().getName()));
+                    }
+                }, null);
+            }
+
+            if (requeues == 0)
+            {
+                final TransactionLogResource owningResource = _queueEntry.getOwningResource();
+                if (owningResource instanceof Queue)
+                {
+                    final Queue<?> queue = (Queue<?>) owningResource;
+
+                    final Exchange altExchange = queue.getAlternateExchange();
+
+                    if (altExchange == null)
+                    {
+                        eventLogger.message(logSubject,
+                                            ChannelMessages.DISCARDMSG_NOALTEXCH(message.getMessageNumber(),
+                                                                                 queue.getName(),
+                                                                                 message.getInitialRoutingAddress()));
+                    }
+                    else
+                    {
+                        eventLogger.message(logSubject,
+                                            ChannelMessages.DISCARDMSG_NOROUTE(message.getMessageNumber(),
+                                                                               altExchange.getName()));
+                    }
+                }
+            }
+        }
     }
 
     private class DoNothingAction implements UnsettledAction
diff --git a/broker-plugins/management-http/src/main/java/resources/showMessage.html b/broker-plugins/management-http/src/main/java/resources/showMessage.html
index 22e0816..b03599c 100644
--- a/broker-plugins/management-http/src/main/java/resources/showMessage.html
+++ b/broker-plugins/management-http/src/main/java/resources/showMessage.html
@@ -83,6 +83,10 @@
             <td><span class="message-userId"></span></td>
         </tr>
         <tr style="margin-bottom: 4pt">
+            <td style="width: 10em; vertical-align: top"><span style="font-weight: bold;">Delivery Count:</span></td>
+            <td><span class="message-deliveryCount"></span></td>
+        </tr>
+        <tr style="margin-bottom: 4pt">
             <td style="width: 10em; vertical-align: top"><span style="font-weight: bold;">Headers:</span></td>
             <td>
                 <div class="message-headers map confidential"></div>
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
new file mode 100644
index 0000000..f1cbe41
--- /dev/null
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
@@ -0,0 +1,113 @@
+package org.apache.qpid.systests.jms_2_0.deliverycount;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class DeliveryCountTest extends QpidBrokerTestCase
+{
+    private static final int MAX_DELIVERY_ATTEMPTS = 3;
+    private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
+    private Queue _queue;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        try (Connection connection = getConnectionWithPrefetch(0))
+        {
+            String testQueueName = getTestQueueName();
+            connection.start();
+            Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
+            final Map<String, Object> attributes = new HashMap<>();
+            attributes.put(org.apache.qpid.server.model.Queue.NAME, testQueueName);
+            attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS, MAX_DELIVERY_ATTEMPTS);
+            createEntityUsingAmqpManagement(testQueueName,
+                                            session,
+                                            "org.apache.qpid.StandardQueue",
+                                            attributes);
+           _queue = session.createQueue(testQueueName);
+            sendMessage(session, _queue, 1);
+        }
+    }
+
+
+    public void testDeliveryCountChangedOnRollback() throws Exception
+    {
+        try (Connection connection = getConnectionWithPrefetch(0))
+        {
+            Session session = connection.createSession(JMSContext.SESSION_TRANSACTED);
+            MessageConsumer consumer = session.createConsumer(_queue);
+            connection.start();
+            for (int i = 0; i < MAX_DELIVERY_ATTEMPTS; i++)
+            {
+                Message message = consumer.receive(getReceiveTimeout());
+                session.rollback();
+                assertDeliveryCountHeaders(message, i);
+            }
+            Message message = consumer.receive(getReceiveTimeout());
+            assertNull("Message should be discarded", message);
+        }
+    }
+
+    public void testDeliveryCountChangedOnSessionClose() throws Exception
+    {
+        try (Connection connection = getConnectionWithPrefetch(0))
+        {
+            connection.start();
+            for (int i = 0; i < MAX_DELIVERY_ATTEMPTS; i++)
+            {
+                Session consumingSession = connection.createSession(JMSContext.SESSION_TRANSACTED);
+                MessageConsumer consumer = consumingSession.createConsumer(_queue);
+                Message message = consumer.receive(getReceiveTimeout());
+                assertDeliveryCountHeaders(message, i);
+                consumingSession.close();
+            }
+
+            Session session = connection.createSession(JMSContext.SESSION_TRANSACTED);
+            MessageConsumer consumer = session.createConsumer(_queue);
+            Message message = consumer.receive(getReceiveTimeout());
+            assertNull("Message should be discarded", message);
+        }
+    }
+
+    private void assertDeliveryCountHeaders(final Message message, final int deliveryAttempt) throws JMSException
+    {
+        assertNotNull(String.format("Message is not received on attempt %d", deliveryAttempt), message);
+        assertEquals(String.format("Unexpected redelivered flag on attempt %d", deliveryAttempt),
+                     deliveryAttempt > 0,
+                     message.getJMSRedelivered());
+        assertEquals(String.format("Unexpected message delivery count on attempt %d", deliveryAttempt + 1),
+                     deliveryAttempt + 1,
+                     message.getIntProperty(JMSX_DELIVERY_COUNT));
+    }
+}
+
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
index f336e40..ae397a7 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
@@ -363,8 +363,17 @@
                     {
                         return Long.valueOf(((Map) body).get("queueDepthMessages").toString());
                     }
+                    else
+                    {
+                        throw new IllegalArgumentException("Cannot parse the results from a management operation."
+                                                           + " Unexpected message object type : " + body);
+                    }
                 }
-                throw new IllegalArgumentException("Cannot parse the results from a management operation");
+                else
+                {
+                    throw new IllegalArgumentException("Cannot parse the results from a management operation."
+                                                       + " Unexpected response message type : " + response.getClass());
+                }
             }
             finally
             {
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
index 86b68ad..383d48a 100644
--- a/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
+++ b/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -22,7 +22,9 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -33,6 +35,7 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
@@ -40,15 +43,11 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.RejectBehaviour;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
 
 /**
  * Test that the MaxRedelivery feature works as expected, allowing the client to reject
@@ -74,12 +73,10 @@
 
     /** index numbers of messages to be redelivered */
     private final List<Integer> _redeliverMsgs = Arrays.asList(1, 2, 5, 14);
+    private String _testQueueName;
 
     public void setUp() throws Exception
     {
-        //enable DLQ/maximumDeliveryCount support for all queues at the vhost level
-
-        TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
         setTestSystemProperty("queue.deadLetterQueueEnabled","true");
         setTestSystemProperty("queue.maximumDeliveryAttempts", String.valueOf(MAX_DELIVERY_COUNT));
 
@@ -90,55 +87,45 @@
             setTestClientSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.SERVER.toString());
         }
         super.setUp();
-
+        _testQueueName = getTestQueueName();
         boolean durableSub = isDurSubTest();
 
-        //declare the test queue
-        Connection consumerConnection = getConnection();
-        Session consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        Destination destination = getDestination(consumerSession, durableSub);
+        Connection connection = getConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination;
         if(durableSub)
         {
-            consumerSession.createDurableSubscriber((Topic)destination, getName()).close();
+            destination = createTopic(connection, _testQueueName);
+            session.createDurableSubscriber((Topic)destination, getName()).close();
         }
         else
         {
-            consumerSession.createConsumer(destination).close();
+            final Map<String, Object> attributes = new HashMap<>();
+            attributes.put(org.apache.qpid.server.model.Queue.NAME, _testQueueName);
+            attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS, MAX_DELIVERY_COUNT);
+            attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
+            createEntityUsingAmqpManagement(_testQueueName,
+                                            session,
+                                            "org.apache.qpid.StandardQueue",
+                                            attributes);
+            destination = getQueueFromName(session, _testQueueName);
         }
 
-        consumerConnection.close();
-
-        //Create Producer put some messages on the queue
-        Connection producerConnection = getConnection();
-        producerConnection.start();
-        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = producerSession.createProducer(getDestination(producerSession, durableSub));
-
+        MessageProducer producer = session.createProducer(destination);
         for (int count = 1; count <= MSG_COUNT; count++)
         {
-            Message msg = producerSession.createTextMessage(generateContent(count));
+            Message msg = session.createTextMessage(generateContent(count));
             msg.setIntProperty("count", count);
             producer.send(msg);
         }
 
-        producerConnection.close();
+        connection.close();
 
         _failed = false;
         _awaitCompletion = new CountDownLatch(1);
     }
 
-    private Destination getDestination(Session consumerSession, boolean durableSub) throws JMSException
-    {
-        if(durableSub)
-        {
-            return consumerSession.createTopic(getTestQueueName());
-        }
-        else
-        {
-            return consumerSession.createQueue(getTestQueueName());
-        }
-    }
-
     private String generateContent(int count)
     {
         return "Message " + count + " content.";
@@ -214,27 +201,27 @@
     {
         final Connection clientConnection = getConnection();
 
-        final boolean transacted = deliveryMode == Session.SESSION_TRANSACTED ? true : false;
+        final boolean transacted = deliveryMode == Session.SESSION_TRANSACTED;
         final Session clientSession = clientConnection.createSession(transacted, deliveryMode);
 
         MessageConsumer consumer;
-        Destination dest = getDestination(clientSession, durableSub);
-        AMQQueue checkQueue;
+        Destination dest = durableSub ? clientSession.createTopic(_testQueueName) : clientSession.createQueue(_testQueueName);
+        Queue checkQueue;
         if(durableSub)
         {
             consumer = clientSession.createDurableSubscriber((Topic)dest, getName());
-            checkQueue = new AMQQueue("amq.topic", "clientid" + ":" + getName());
+
+            checkQueue = clientSession.createQueue(getDurableSubscriptionQueueName());
         }
         else
         {
             consumer = clientSession.createConsumer(dest);
-            checkQueue = (AMQQueue) dest;
+            checkQueue = (Queue) dest;
         }
-
-        assertEquals("The queue should have " + MSG_COUNT + " msgs at start",
-                MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue));
-
         clientConnection.start();
+        assertEquals("The queue should have " + MSG_COUNT + " msgs at start",
+                MSG_COUNT, getQueueDepth(clientConnection, checkQueue));
+
 
         int expectedDeliveries = MSG_COUNT + ((MAX_DELIVERY_COUNT -1) * redeliverMsgs.size());
 
@@ -269,51 +256,64 @@
         consumer.close();
 
         //check the source queue is now empty
-        assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue, true));
+        assertEquals("The queue should have 0 msgs left", 0, getQueueDepth(clientConnection, checkQueue));
 
         //check the DLQ has the required number of rejected-without-requeue messages
-        verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub);
+        verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub, clientConnection);
 
-        if(isBrokerStorePersistent())
+        if (!isBroker10())
         {
-            //restart the broker to verify persistence of the DLQ and the messages on it
-            clientConnection.close();
+            if (isBrokerStorePersistent())
+            {
+                //restart the broker to verify persistence of the DLQ and the messages on it
+                clientConnection.close();
 
-            restartDefaultBroker();
+                restartDefaultBroker();
 
-            final Connection clientConnection2 = getConnection();
-            clientConnection2.start();
+                final Connection clientConnection2 = getConnection();
+                clientConnection2.start();
 
-            //verify the messages on the DLQ
-            verifyDLQcontent(clientConnection2, redeliverMsgs, getTestQueueName(), durableSub);
-            clientConnection2.close();
-        }
-        else
-        {
+                //verify the messages on the DLQ
+                verifyDLQcontent(clientConnection2, redeliverMsgs, getTestQueueName(), durableSub);
+                clientConnection2.close();
+            }
+            else
+            {
 
-            //verify the messages on the DLQ
-            verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub);
-            clientConnection.close();
+                //verify the messages on the DLQ
+                verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub);
+                clientConnection.close();
+            }
         }
 
     }
 
-    private void verifyDLQdepth(int expected, Session clientSession, boolean durableSub) throws QpidException
+    private String getDurableSubscriptionQueueName()
     {
-        AMQDestination checkQueueDLQ;
-        if(durableSub)
+        if ( isBroker10())
         {
-            checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
+            return "qpidsub_/clientid_/" + getName() + "_/durable";
         }
         else
         {
-            checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
+            return "clientid:" + getName();
         }
-
-        assertEquals("The DLQ should have " + expected + " msgs on it", expected,
-                        ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueueDLQ, true));
     }
 
+    private void verifyDLQdepth(int expected,
+                                Session clientSession,
+                                boolean durableSub,
+                                final Connection clientConnection) throws Exception
+    {
+        String queueName = (durableSub ? getDurableSubscriptionQueueName() : _testQueueName )
+                           + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
+
+        assertEquals("The DLQ should have " + expected + " msgs on it",
+                     expected,
+                     getQueueDepth(clientConnection, clientSession.createQueue(queueName)));
+    }
+
+
     private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, String destName, boolean durableSub) throws JMSException
     {
         Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -321,7 +321,9 @@
         MessageConsumer consumer;
         if(durableSub)
         {
-            consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX));
+            String queueName = (durableSub ? getDurableSubscriptionQueueName() : _testQueueName )
+                               + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
+            consumer = clientSession.createConsumer(clientSession.createQueue(queueName));
         }
         else
         {
@@ -330,7 +332,7 @@
         }
 
         //keep track of the message we expect to still be on the DLQ
-        List<Integer> outstandingMessages = new ArrayList<Integer>(redeliverMsgs);
+        List<Integer> outstandingMessages = new ArrayList<>(redeliverMsgs);
         int numMsg = outstandingMessages.size();
 
         for(int i = 0; i < numMsg; i++)
@@ -530,8 +532,7 @@
     }
 
     private void doSynchronousTest(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount,
-            final int expectedTotalNumberOfDeliveries, final List<Integer> redeliverMsgs) throws JMSException,
-                                                                                                 QpidException, InterruptedException
+            final int expectedTotalNumberOfDeliveries, final List<Integer> redeliverMsgs) throws Exception
    {
         if(deliveryMode == Session.AUTO_ACKNOWLEDGE
                 || deliveryMode == Session.DUPS_OK_ACKNOWLEDGE
@@ -597,11 +598,6 @@
                             break;
                         case Session.CLIENT_ACKNOWLEDGE:
                             session.recover();
-
-                            //sleep then do a synchronous op to give the broker
-                            //time to resend all the messages
-                            Thread.sleep(500);
-                            ((AMQSession<?,?>) session).sync();
                             break;
                     }
 
diff --git a/test-profiles/Java10BrokenTestsExcludes b/test-profiles/Java10BrokenTestsExcludes
index fb3774a..25e021d 100644
--- a/test-profiles/Java10BrokenTestsExcludes
+++ b/test-profiles/Java10BrokenTestsExcludes
@@ -68,5 +68,3 @@
 org.apache.qpid.server.queue.ConsumerPriorityTest#*
 org.apache.qpid.server.queue.ArrivalTimeFilterTest#*
 
-// QPID-7603: broker side delivery count support is not complete
-org.apache.qpid.test.unit.client.MaxDeliveryCountTest#*
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index 3a044b6..0df3e3d 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -213,7 +213,11 @@
 org.apache.qpid.server.logging.QueueLoggingTest#*
 org.apache.qpid.server.logging.TransientQueueLoggingTest#*
 
-
+// Tests verify the 0-x client's behaviour on recover which is not applicable to new client
+org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testSynchronousClientAckSession
+org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testAsynchronousClientAckSession
+org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testAsynchronousDupsOkSession
+org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testAsynchronousAutoAckSession