QPID-7603: [Java Broker, AMQP 1.0] Add support for maximum delivery count
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 4da1af4..02f33db 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -31,9 +31,14 @@
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
@@ -49,13 +54,16 @@
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
@@ -375,7 +383,7 @@
txn.dequeue(_queueEntry.getEnqueueRecord(),
new ServerTransaction.Action()
{
-
+ @Override
public void postCommit()
{
if (_queueEntry.isAcquiredBy(getConsumer()))
@@ -384,6 +392,7 @@
}
}
+ @Override
public void onRollback()
{
@@ -392,6 +401,7 @@
}
txn.addPostTransactionAction(new ServerTransaction.Action()
{
+ @Override
public void postCommit()
{
if(Boolean.TRUE.equals(settled))
@@ -405,16 +415,13 @@
_link.getEndpoint().sendFlowConditional();
}
+ @Override
public void onRollback()
{
if(Boolean.TRUE.equals(settled))
{
- final Modified modified = new Modified();
- modified.setDeliveryFailed(true);
- _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
- _link.getEndpoint().sendFlowConditional();
- _queueEntry.incrementDeliveryCount();
- _queueEntry.release(getConsumer());
+ // TODO: apply source's default outcome
+ applyModifiedOutcome();
}
}
});
@@ -423,6 +430,7 @@
{
txn.addPostTransactionAction(new ServerTransaction.Action()
{
+ @Override
public void postCommit()
{
@@ -430,9 +438,12 @@
_link.getEndpoint().settle(_deliveryTag);
}
+ @Override
public void onRollback()
{
_link.getEndpoint().settle(_deliveryTag);
+
+ // TODO: apply source's default outcome if settled
}
});
}
@@ -441,25 +452,52 @@
{
txn.addPostTransactionAction(new ServerTransaction.Action()
{
+ @Override
public void postCommit()
{
+ // TODO: add handling of undeliverable-here
- _queueEntry.release(getConsumer());
if(Boolean.TRUE.equals(((Modified)outcome).getDeliveryFailed()))
{
- _queueEntry.incrementDeliveryCount();
+ incrementDeliveryCountOrRouteToAlternateOrDiscard();
+ }
+ else
+ {
+ _queueEntry.release(getConsumer());
}
_link.getEndpoint().settle(_deliveryTag);
}
+ @Override
public void onRollback()
{
if(Boolean.TRUE.equals(settled))
{
- final Modified modified = new Modified();
- modified.setDeliveryFailed(true);
- _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
- _link.getEndpoint().sendFlowConditional();
+ // TODO: apply source's default outcome
+ applyModifiedOutcome();
+ }
+ }
+ });
+ }
+ else if (outcome instanceof Rejected)
+ {
+ txn.addPostTransactionAction(new ServerTransaction.Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ _link.getEndpoint().settle(_deliveryTag);
+ incrementDeliveryCountOrRouteToAlternateOrDiscard();
+ _link.getEndpoint().sendFlowConditional();
+ }
+
+ @Override
+ public void onRollback()
+ {
+ if(Boolean.TRUE.equals(settled))
+ {
+ // TODO: apply source's default outcome
+ applyModifiedOutcome();
}
}
});
@@ -467,6 +505,77 @@
return (transactionId == null && outcome != null);
}
+
+ private void applyModifiedOutcome()
+ {
+ final Modified modified = new Modified();
+ modified.setDeliveryFailed(true);
+ _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
+ _link.getEndpoint().sendFlowConditional();
+ incrementDeliveryCountOrRouteToAlternateOrDiscard();
+ }
+
+ private void incrementDeliveryCountOrRouteToAlternateOrDiscard()
+ {
+ _queueEntry.incrementDeliveryCount();
+ if (_queueEntry.getMaximumDeliveryCount() > 0
+ && _queueEntry.getDeliveryCount() >= _queueEntry.getMaximumDeliveryCount())
+ {
+ routeToAlternateOrDiscard();
+ }
+ else
+ {
+ _queueEntry.release(getConsumer());
+ }
+ }
+
+ private void routeToAlternateOrDiscard()
+ {
+ final Session_1_0 session = _link.getSession();
+ final ServerMessage message = _queueEntry.getMessage();
+ final EventLogger eventLogger = session.getEventLogger();
+ final LogSubject logSubject = session.getLogSubject();
+ int requeues = 0;
+ if (_queueEntry.makeAcquisitionUnstealable(getConsumer()))
+ {
+ requeues = _queueEntry.routeToAlternate(new Action<MessageInstance>()
+ {
+ @Override
+ public void performAction(final MessageInstance requeueEntry)
+ {
+
+ eventLogger.message(logSubject,
+ ChannelMessages.DEADLETTERMSG(message.getMessageNumber(),
+ requeueEntry.getOwningResource().getName()));
+ }
+ }, null);
+ }
+
+ if (requeues == 0)
+ {
+ final TransactionLogResource owningResource = _queueEntry.getOwningResource();
+ if (owningResource instanceof Queue)
+ {
+ final Queue<?> queue = (Queue<?>) owningResource;
+
+ final Exchange altExchange = queue.getAlternateExchange();
+
+ if (altExchange == null)
+ {
+ eventLogger.message(logSubject,
+ ChannelMessages.DISCARDMSG_NOALTEXCH(message.getMessageNumber(),
+ queue.getName(),
+ message.getInitialRoutingAddress()));
+ }
+ else
+ {
+ eventLogger.message(logSubject,
+ ChannelMessages.DISCARDMSG_NOROUTE(message.getMessageNumber(),
+ altExchange.getName()));
+ }
+ }
+ }
+ }
}
private class DoNothingAction implements UnsettledAction
diff --git a/broker-plugins/management-http/src/main/java/resources/showMessage.html b/broker-plugins/management-http/src/main/java/resources/showMessage.html
index 22e0816..b03599c 100644
--- a/broker-plugins/management-http/src/main/java/resources/showMessage.html
+++ b/broker-plugins/management-http/src/main/java/resources/showMessage.html
@@ -83,6 +83,10 @@
<td><span class="message-userId"></span></td>
</tr>
<tr style="margin-bottom: 4pt">
+ <td style="width: 10em; vertical-align: top"><span style="font-weight: bold;">Delivery Count:</span></td>
+ <td><span class="message-deliveryCount"></span></td>
+ </tr>
+ <tr style="margin-bottom: 4pt">
<td style="width: 10em; vertical-align: top"><span style="font-weight: bold;">Headers:</span></td>
<td>
<div class="message-headers map confidential"></div>
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
new file mode 100644
index 0000000..f1cbe41
--- /dev/null
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
@@ -0,0 +1,113 @@
+package org.apache.qpid.systests.jms_2_0.deliverycount;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class DeliveryCountTest extends QpidBrokerTestCase
+{
+ private static final int MAX_DELIVERY_ATTEMPTS = 3;
+ private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
+ private Queue _queue;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ try (Connection connection = getConnectionWithPrefetch(0))
+ {
+ String testQueueName = getTestQueueName();
+ connection.start();
+ Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
+ final Map<String, Object> attributes = new HashMap<>();
+ attributes.put(org.apache.qpid.server.model.Queue.NAME, testQueueName);
+ attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS, MAX_DELIVERY_ATTEMPTS);
+ createEntityUsingAmqpManagement(testQueueName,
+ session,
+ "org.apache.qpid.StandardQueue",
+ attributes);
+ _queue = session.createQueue(testQueueName);
+ sendMessage(session, _queue, 1);
+ }
+ }
+
+
+ public void testDeliveryCountChangedOnRollback() throws Exception
+ {
+ try (Connection connection = getConnectionWithPrefetch(0))
+ {
+ Session session = connection.createSession(JMSContext.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(_queue);
+ connection.start();
+ for (int i = 0; i < MAX_DELIVERY_ATTEMPTS; i++)
+ {
+ Message message = consumer.receive(getReceiveTimeout());
+ session.rollback();
+ assertDeliveryCountHeaders(message, i);
+ }
+ Message message = consumer.receive(getReceiveTimeout());
+ assertNull("Message should be discarded", message);
+ }
+ }
+
+ public void testDeliveryCountChangedOnSessionClose() throws Exception
+ {
+ try (Connection connection = getConnectionWithPrefetch(0))
+ {
+ connection.start();
+ for (int i = 0; i < MAX_DELIVERY_ATTEMPTS; i++)
+ {
+ Session consumingSession = connection.createSession(JMSContext.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumingSession.createConsumer(_queue);
+ Message message = consumer.receive(getReceiveTimeout());
+ assertDeliveryCountHeaders(message, i);
+ consumingSession.close();
+ }
+
+ Session session = connection.createSession(JMSContext.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(_queue);
+ Message message = consumer.receive(getReceiveTimeout());
+ assertNull("Message should be discarded", message);
+ }
+ }
+
+ private void assertDeliveryCountHeaders(final Message message, final int deliveryAttempt) throws JMSException
+ {
+ assertNotNull(String.format("Message is not received on attempt %d", deliveryAttempt), message);
+ assertEquals(String.format("Unexpected redelivered flag on attempt %d", deliveryAttempt),
+ deliveryAttempt > 0,
+ message.getJMSRedelivered());
+ assertEquals(String.format("Unexpected message delivery count on attempt %d", deliveryAttempt + 1),
+ deliveryAttempt + 1,
+ message.getIntProperty(JMSX_DELIVERY_COUNT));
+ }
+}
+
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
index f336e40..ae397a7 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
@@ -363,8 +363,17 @@
{
return Long.valueOf(((Map) body).get("queueDepthMessages").toString());
}
+ else
+ {
+ throw new IllegalArgumentException("Cannot parse the results from a management operation."
+ + " Unexpected message object type : " + body);
+ }
}
- throw new IllegalArgumentException("Cannot parse the results from a management operation");
+ else
+ {
+ throw new IllegalArgumentException("Cannot parse the results from a management operation."
+ + " Unexpected response message type : " + response.getClass());
+ }
}
finally
{
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
index 86b68ad..383d48a 100644
--- a/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
+++ b/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -22,7 +22,9 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -33,6 +35,7 @@
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
@@ -40,15 +43,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.RejectBehaviour;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
/**
* Test that the MaxRedelivery feature works as expected, allowing the client to reject
@@ -74,12 +73,10 @@
/** index numbers of messages to be redelivered */
private final List<Integer> _redeliverMsgs = Arrays.asList(1, 2, 5, 14);
+ private String _testQueueName;
public void setUp() throws Exception
{
- //enable DLQ/maximumDeliveryCount support for all queues at the vhost level
-
- TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
setTestSystemProperty("queue.deadLetterQueueEnabled","true");
setTestSystemProperty("queue.maximumDeliveryAttempts", String.valueOf(MAX_DELIVERY_COUNT));
@@ -90,55 +87,45 @@
setTestClientSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.SERVER.toString());
}
super.setUp();
-
+ _testQueueName = getTestQueueName();
boolean durableSub = isDurSubTest();
- //declare the test queue
- Connection consumerConnection = getConnection();
- Session consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- Destination destination = getDestination(consumerSession, durableSub);
+ Connection connection = getConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination;
if(durableSub)
{
- consumerSession.createDurableSubscriber((Topic)destination, getName()).close();
+ destination = createTopic(connection, _testQueueName);
+ session.createDurableSubscriber((Topic)destination, getName()).close();
}
else
{
- consumerSession.createConsumer(destination).close();
+ final Map<String, Object> attributes = new HashMap<>();
+ attributes.put(org.apache.qpid.server.model.Queue.NAME, _testQueueName);
+ attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS, MAX_DELIVERY_COUNT);
+ attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
+ createEntityUsingAmqpManagement(_testQueueName,
+ session,
+ "org.apache.qpid.StandardQueue",
+ attributes);
+ destination = getQueueFromName(session, _testQueueName);
}
- consumerConnection.close();
-
- //Create Producer put some messages on the queue
- Connection producerConnection = getConnection();
- producerConnection.start();
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(getDestination(producerSession, durableSub));
-
+ MessageProducer producer = session.createProducer(destination);
for (int count = 1; count <= MSG_COUNT; count++)
{
- Message msg = producerSession.createTextMessage(generateContent(count));
+ Message msg = session.createTextMessage(generateContent(count));
msg.setIntProperty("count", count);
producer.send(msg);
}
- producerConnection.close();
+ connection.close();
_failed = false;
_awaitCompletion = new CountDownLatch(1);
}
- private Destination getDestination(Session consumerSession, boolean durableSub) throws JMSException
- {
- if(durableSub)
- {
- return consumerSession.createTopic(getTestQueueName());
- }
- else
- {
- return consumerSession.createQueue(getTestQueueName());
- }
- }
-
private String generateContent(int count)
{
return "Message " + count + " content.";
@@ -214,27 +201,27 @@
{
final Connection clientConnection = getConnection();
- final boolean transacted = deliveryMode == Session.SESSION_TRANSACTED ? true : false;
+ final boolean transacted = deliveryMode == Session.SESSION_TRANSACTED;
final Session clientSession = clientConnection.createSession(transacted, deliveryMode);
MessageConsumer consumer;
- Destination dest = getDestination(clientSession, durableSub);
- AMQQueue checkQueue;
+ Destination dest = durableSub ? clientSession.createTopic(_testQueueName) : clientSession.createQueue(_testQueueName);
+ Queue checkQueue;
if(durableSub)
{
consumer = clientSession.createDurableSubscriber((Topic)dest, getName());
- checkQueue = new AMQQueue("amq.topic", "clientid" + ":" + getName());
+
+ checkQueue = clientSession.createQueue(getDurableSubscriptionQueueName());
}
else
{
consumer = clientSession.createConsumer(dest);
- checkQueue = (AMQQueue) dest;
+ checkQueue = (Queue) dest;
}
-
- assertEquals("The queue should have " + MSG_COUNT + " msgs at start",
- MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue));
-
clientConnection.start();
+ assertEquals("The queue should have " + MSG_COUNT + " msgs at start",
+ MSG_COUNT, getQueueDepth(clientConnection, checkQueue));
+
int expectedDeliveries = MSG_COUNT + ((MAX_DELIVERY_COUNT -1) * redeliverMsgs.size());
@@ -269,51 +256,64 @@
consumer.close();
//check the source queue is now empty
- assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue, true));
+ assertEquals("The queue should have 0 msgs left", 0, getQueueDepth(clientConnection, checkQueue));
//check the DLQ has the required number of rejected-without-requeue messages
- verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub);
+ verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub, clientConnection);
- if(isBrokerStorePersistent())
+ if (!isBroker10())
{
- //restart the broker to verify persistence of the DLQ and the messages on it
- clientConnection.close();
+ if (isBrokerStorePersistent())
+ {
+ //restart the broker to verify persistence of the DLQ and the messages on it
+ clientConnection.close();
- restartDefaultBroker();
+ restartDefaultBroker();
- final Connection clientConnection2 = getConnection();
- clientConnection2.start();
+ final Connection clientConnection2 = getConnection();
+ clientConnection2.start();
- //verify the messages on the DLQ
- verifyDLQcontent(clientConnection2, redeliverMsgs, getTestQueueName(), durableSub);
- clientConnection2.close();
- }
- else
- {
+ //verify the messages on the DLQ
+ verifyDLQcontent(clientConnection2, redeliverMsgs, getTestQueueName(), durableSub);
+ clientConnection2.close();
+ }
+ else
+ {
- //verify the messages on the DLQ
- verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub);
- clientConnection.close();
+ //verify the messages on the DLQ
+ verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub);
+ clientConnection.close();
+ }
}
}
- private void verifyDLQdepth(int expected, Session clientSession, boolean durableSub) throws QpidException
+ private String getDurableSubscriptionQueueName()
{
- AMQDestination checkQueueDLQ;
- if(durableSub)
+ if ( isBroker10())
{
- checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
+ return "qpidsub_/clientid_/" + getName() + "_/durable";
}
else
{
- checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
+ return "clientid:" + getName();
}
-
- assertEquals("The DLQ should have " + expected + " msgs on it", expected,
- ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueueDLQ, true));
}
+ private void verifyDLQdepth(int expected,
+ Session clientSession,
+ boolean durableSub,
+ final Connection clientConnection) throws Exception
+ {
+ String queueName = (durableSub ? getDurableSubscriptionQueueName() : _testQueueName )
+ + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
+
+ assertEquals("The DLQ should have " + expected + " msgs on it",
+ expected,
+ getQueueDepth(clientConnection, clientSession.createQueue(queueName)));
+ }
+
+
private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, String destName, boolean durableSub) throws JMSException
{
Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -321,7 +321,9 @@
MessageConsumer consumer;
if(durableSub)
{
- consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX));
+ String queueName = (durableSub ? getDurableSubscriptionQueueName() : _testQueueName )
+ + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
+ consumer = clientSession.createConsumer(clientSession.createQueue(queueName));
}
else
{
@@ -330,7 +332,7 @@
}
//keep track of the message we expect to still be on the DLQ
- List<Integer> outstandingMessages = new ArrayList<Integer>(redeliverMsgs);
+ List<Integer> outstandingMessages = new ArrayList<>(redeliverMsgs);
int numMsg = outstandingMessages.size();
for(int i = 0; i < numMsg; i++)
@@ -530,8 +532,7 @@
}
private void doSynchronousTest(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount,
- final int expectedTotalNumberOfDeliveries, final List<Integer> redeliverMsgs) throws JMSException,
- QpidException, InterruptedException
+ final int expectedTotalNumberOfDeliveries, final List<Integer> redeliverMsgs) throws Exception
{
if(deliveryMode == Session.AUTO_ACKNOWLEDGE
|| deliveryMode == Session.DUPS_OK_ACKNOWLEDGE
@@ -597,11 +598,6 @@
break;
case Session.CLIENT_ACKNOWLEDGE:
session.recover();
-
- //sleep then do a synchronous op to give the broker
- //time to resend all the messages
- Thread.sleep(500);
- ((AMQSession<?,?>) session).sync();
break;
}
diff --git a/test-profiles/Java10BrokenTestsExcludes b/test-profiles/Java10BrokenTestsExcludes
index fb3774a..25e021d 100644
--- a/test-profiles/Java10BrokenTestsExcludes
+++ b/test-profiles/Java10BrokenTestsExcludes
@@ -68,5 +68,3 @@
org.apache.qpid.server.queue.ConsumerPriorityTest#*
org.apache.qpid.server.queue.ArrivalTimeFilterTest#*
-// QPID-7603: broker side delivery count support is not complete
-org.apache.qpid.test.unit.client.MaxDeliveryCountTest#*
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index 3a044b6..0df3e3d 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -213,7 +213,11 @@
org.apache.qpid.server.logging.QueueLoggingTest#*
org.apache.qpid.server.logging.TransientQueueLoggingTest#*
-
+// Tests verify the 0-x client's behaviour on recover which is not applicable to new client
+org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testSynchronousClientAckSession
+org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testAsynchronousClientAckSession
+org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testAsynchronousDupsOkSession
+org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testAsynchronousAutoAckSession