QPIDJMS-484: track client-ack recovery beyond recover() call, send modified-failed dispositions where appropriate at consumer/session close/shutdown
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 7015a6f..09bcd05 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -378,7 +378,7 @@
try {
if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
- acknowledge(ACK_TYPE.MODIFIED_FAILED);
+ acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
}
} catch (Exception e) {
LOG.trace("Exception during session shutdown cleanup acknowledgement", e);
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
index e55426f..51c0441 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
@@ -31,6 +31,7 @@
private JmsMessage message;
private boolean enqueueFirst;
private boolean delivered;
+ private boolean recovered;
private transient JmsConsumerInfo consumerInfo;
private transient String stringView;
@@ -75,6 +76,14 @@
this.delivered = delivered;
}
+ public boolean isRecovered() {
+ return recovered;
+ }
+
+ public void setRecovered(boolean recovered) {
+ this.recovered = recovered;
+ }
+
public int getRedeliveryCount() {
int redeliveryCount = 0;
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
index d00f35f..2e07a75 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
@@ -31,6 +31,7 @@
MODIFIED_FAILED,
MODIFIED_FAILED_UNDELIVERABLE,
// Conceptual
- DELIVERED
+ DELIVERED,
+ SESSION_SHUTDOWN
}
}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 0acbbb5..202a9e7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -24,6 +24,8 @@
import java.util.ListIterator;
import java.util.concurrent.ScheduledFuture;
+import javax.jms.Session;
+
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
@@ -54,6 +56,7 @@
private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
protected final AmqpSession session;
+ protected final int acknowledgementMode;
protected AsyncResult stopRequest;
protected AsyncResult pullRequest;
protected long incomingSequence;
@@ -65,10 +68,13 @@
super(info, receiver, session);
this.session = session;
+ this.acknowledgementMode = info.getAcknowledgementMode();
}
@Override
public void close(AsyncResult request) {
+ acknowledgeUndeliveredRecoveredMessages();
+
// If we have pending deliveries we remain open to allow for ACK or for a
// pending transaction that this consumer is active in to complete.
if (shouldDeferClose()) {
@@ -79,6 +85,27 @@
}
}
+ private void acknowledgeUndeliveredRecoveredMessages() {
+ if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
+ // Send dispositions for any messages which were previously delivered and
+ // session recovered, but were then not delivered again afterwards.
+ Delivery delivery = getEndpoint().head();
+ while (delivery != null) {
+ Delivery current = delivery;
+ delivery = delivery.next();
+
+ if (!(current.getContext() instanceof JmsInboundMessageDispatch)) {
+ continue;
+ }
+
+ JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext();
+ if (envelope.isRecovered() && !envelope.isDelivered()) {
+ handleDisposition(envelope, current, MODIFIED_FAILED);
+ }
+ }
+ }
+ }
+
/**
* Starts the consumer by setting the link credit to the given prefetch value.
*
@@ -220,12 +247,14 @@
delivery = delivery.next();
if (!(current.getContext() instanceof JmsInboundMessageDispatch)) {
- LOG.debug("{} Found incomplete delivery with no context during recover processing", AmqpConsumer.this);
+ LOG.debug("{} Found incomplete delivery with no context during session acknowledge processing", AmqpConsumer.this);
continue;
}
JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext();
- if (envelope.isDelivered()) {
+ if(ackType == ACK_TYPE.SESSION_SHUTDOWN && (envelope.isDelivered() || envelope.isRecovered())) {
+ handleDisposition(envelope, current, MODIFIED_FAILED);
+ } else if (envelope.isDelivered()) {
final DeliveryState disposition;
switch (ackType) {
@@ -304,6 +333,7 @@
private void handleDelivered(JmsInboundMessageDispatch envelope, Delivery delivery) {
LOG.debug("Delivered Ack of message: {}", envelope);
deliveredCount++;
+ envelope.setRecovered(false);
envelope.setDelivered(true);
delivery.setDefaultDeliveryState(MODIFIED_FAILED);
}
@@ -410,6 +440,9 @@
envelope.getMessage().getFacade().getRedeliveryCount() + 1);
envelope.setEnqueueFirst(true);
envelope.setDelivered(false);
+ if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
+ envelope.setRecovered(true);
+ }
redispatchList.add(envelope);
}
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index e04b8f6..0c6dab9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -413,7 +413,7 @@
}
@Test(timeout=20000)
- public void testCloseDurableSubscriberWithUnackedAnUnconsumedPrefetchedMessages() throws Exception {
+ public void testCloseDurableSubscriberWithUnconsumedPrefetchedMessages() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
@@ -429,7 +429,7 @@
int messageCount = 5;
// Create a consumer and fill the prefetch with some messages,
- // which we will consume some of but ack none of.
+ // which we will only consume some of.
testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 46e7ba5..ce50edc 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -64,6 +64,7 @@
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.JmsSession;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -2259,8 +2260,8 @@
receivedTextMessage.acknowledge();
- testPeer.expectDetach(false, true, false);
testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 2, 2);
+ testPeer.expectDetach(false, true, false);
testPeer.expectDisposition(true, new ReleasedMatcher(), 3, 3);
subscriber.close();
@@ -2328,4 +2329,260 @@
connection.close();
}
}
+
+ @Test(timeout = 20000)
+ public void testCloseConnectionWithRecoveredUndeliveredAndRedeliveredClientAckMessages() throws Exception {
+ // Send 6, recover 4, redeliver 2, close connection (and so implicitly, session)
+ doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(false, false, 6, 4, 2);
+ }
+
+ @Test(timeout = 20000)
+ public void testCloseConnectionWithRecoveredUndeliveredClientAckMessages() throws Exception {
+ // Send 4, recover 2, redeliver none, close connection (and so implicitly, session)
+ doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(false, false, 4, 2, 0);
+ }
+
+ @Test(timeout = 20000)
+ public void testCloseSessionWithRecoveredUndeliveredAndRedeliveredClientAckMessages() throws Exception {
+ // Send 6, recover 4, redeliver 2, close session (then connection)
+ doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(false, true, 6, 4, 2);
+ }
+
+ @Test(timeout = 20000)
+ public void testCloseSessionWithRecoveredUndeliveredClientAckMessages() throws Exception {
+ // Send 4, recover 2, redeliver none, close session (then connection)
+ doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(false, true, 4, 2, 0);
+ }
+
+ @Test(timeout = 20000)
+ public void testCloseConsumerWithRecoveredUndeliveredAndRedeliveredClientAckMessages() throws Exception {
+ // Send 6, recover 4, redeliver 2, close consumer then connection (and so implicitly, session)
+ doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(true, false, 6, 4, 2);
+ }
+
+ @Test(timeout = 20000)
+ public void testCloseConsumerWithRecoveredUndeliveredClientAckMessages() throws Exception {
+ // Send 4, recover 2, redeliver none, close consumer then connection (and so implicitly, session)
+ doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(true, false, 4, 2, 0);
+ }
+
+ @Test(timeout = 20000)
+ public void testCloseConsumerAndSessionWithRecoveredAndRedeliveredUndeliveredClientAckMessages() throws Exception {
+ // Send 6, recover 4, redeliver 2, close consumer then session (then connection)
+ doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(true, true, 6, 4, 2);
+ }
+
+ @Test(timeout = 20000)
+ public void testCloseConsumerAndSessionWithRecoveredUndeliveredClientAckMessages() throws Exception {
+ // Send 6, recover 4, redeliver 2, close consumer then session (then connection)
+ doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(true, true, 4, 2, 0);
+ }
+
+ private void doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(
+ boolean closeConsumer, boolean closeSession, int msgCount, int deliverBeforeRecoverCount, int deliverAfterRecoverCount) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
+
+ String topicName = "myTopic";
+ Topic topic = session.createTopic(topicName);
+
+ final CountDownLatch incoming = new CountDownLatch(msgCount);
+ ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+
+ @Override
+ public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+ incoming.countDown();
+ }
+ });
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), msgCount, false, false,
+ equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, false, true);
+
+ MessageConsumer consumer = session.createConsumer(topic);
+
+ TextMessage receivedTextMessage = null;
+ for (int i = 1; i <= deliverBeforeRecoverCount; i++) {
+ assertNotNull("Expected message did not arrive: " + i, receivedTextMessage = (TextMessage) consumer.receive(3000));
+ assertEquals("Unexpected delivery number", i, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+ }
+
+ // Await all incoming messages to arrive at consumer before we recover, ensure deterministic test behaviour.
+ assertTrue("Messages did not arrive in a timely fashion", incoming.await(3, TimeUnit.SECONDS));
+
+ session.recover();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ for (int i = 1; i <= deliverAfterRecoverCount; i++) {
+ assertNotNull("Expected message did not arrive after recover: " + i, receivedTextMessage = (TextMessage) consumer.receive(3000));
+ assertEquals("Unexpected delivery number after recover", i, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+ }
+
+ int deliveredAtAnyPoint = Math.max(deliverBeforeRecoverCount, deliverAfterRecoverCount);
+
+ if (closeConsumer) {
+ if(deliverAfterRecoverCount > 0) {
+ // Remaining credit will be drained if there are delivered messages yet to be acknowledged or recovered again.
+ testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH - msgCount)));
+ }
+
+ // Any message delivered+recovered before but not then delivered again afterwards, will have disposition sent now.
+ for (int i = deliverAfterRecoverCount + 1; i <= deliverBeforeRecoverCount; i++) {
+ testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), i, i);
+ }
+
+ if(deliverAfterRecoverCount > 0) {
+ // Any further remaining messages prefetched will be released.
+ for (int i = deliveredAtAnyPoint + 1; i <= msgCount; i++) {
+ testPeer.expectDisposition(true, new ReleasedMatcher(), i, i);
+ }
+ } else {
+ // The link will close now
+ testPeer.expectDetach(true, true, true);
+
+ // Dispositions sent by proton when the link is freed
+ for (int i = deliveredAtAnyPoint + 1; i <= msgCount; i++) {
+ testPeer.expectDisposition(true, new ReleasedMatcher(), i, i);
+ }
+ }
+
+ consumer.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ if(deliverAfterRecoverCount > 0) {
+ // When the session or connection is closed, outstanding delivered messages will have disposition sent.
+ for (int i = 1; i <= deliverAfterRecoverCount; i++) {
+ testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), i, i);
+ }
+ testPeer.expectDetach(true, true, true);
+ }
+ } else {
+ // If we dont close the consumer first, all previously delivered messages will have
+ // disposition sent when the session or connection is closed.
+ for (int i = 1; i <= deliveredAtAnyPoint; i++) {
+ testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), i, i);
+ }
+ }
+
+ if(closeSession) {
+ testPeer.expectEnd();
+
+ session.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+
+ testPeer.expectClose();
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAcknowledgeAllPreviouslyRecoveredClientAckMessages() throws Exception {
+ doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(true, false, true);
+ doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(false, true, true);
+ doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(false, false, true);
+ }
+
+ @Test(timeout = 20000)
+ public void testAcknowledgeSomePreviouslyRecoveredClientAckMessages() throws Exception {
+ doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(true, false, false);
+ doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(false, true, false);
+ doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(false, false, false);
+ }
+
+ private void doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(boolean closeConsumer, boolean closeSession, boolean consumeAllRecovered) throws JMSException, Exception, IOException {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, false, "?jms.clientID=myClientId", null, null, false);
+ connection.start();
+
+ int msgCount = 7;
+ int deliverBeforeRecoverCount = 4;
+ int acknowledgeAfterRecoverCount = consumeAllRecovered ? 5 : 2;
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
+
+ String topicName = "myTopic";
+ Topic topic = session.createTopic(topicName);
+
+ final CountDownLatch incoming = new CountDownLatch(msgCount);
+ ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+
+ @Override
+ public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+ incoming.countDown();
+ }
+ });
+
+ testPeer.expectReceiverAttach();
+
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), msgCount, false, false,
+ equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, false, true);
+
+ MessageConsumer consumer = session.createConsumer(topic);
+
+ TextMessage receivedTextMessage = null;
+ for (int i = 1; i <= deliverBeforeRecoverCount; i++) {
+ assertNotNull("Expected message did not arrive: " + i, receivedTextMessage = (TextMessage) consumer.receive(3000));
+ assertEquals("Unexpected delivery number", i, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+ }
+
+ // Await all incoming messages to arrive at consumer before we recover, ensure deterministic test behaviour.
+ assertTrue("Messages did not arrive in a timely fashion", incoming.await(3, TimeUnit.SECONDS));
+
+ session.recover();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ for (int i = 1; i <= acknowledgeAfterRecoverCount; i++) {
+ assertNotNull("Expected message did not arrive after recover: " + i, receivedTextMessage = (TextMessage) consumer.receive(3000));
+ assertEquals("Unexpected delivery number after recover", i, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+ testPeer.expectDisposition(true, new AcceptedMatcher(), i, i);
+ receivedTextMessage.acknowledge();
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ if(!consumeAllRecovered) {
+ // Any message delivered+recovered before but not then delivered and acknowledged afterwards, will have
+ // disposition sent as consumer/session/connection is closed.
+ for (int i = acknowledgeAfterRecoverCount + 1; i <= deliverBeforeRecoverCount; i++) {
+ testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), i, i);
+ }
+ }
+
+ if(closeConsumer) {
+ testPeer.expectDetach(true, true, true);
+
+ // Dispositions sent by proton when the link is freed
+ for (int i = Math.max(deliverBeforeRecoverCount, acknowledgeAfterRecoverCount) + 1; i <= msgCount; i++) {
+ testPeer.expectDisposition(true, new ReleasedMatcher(), i, i);
+ }
+
+ consumer.close();
+ }
+
+ if(closeSession) {
+ testPeer.expectEnd();
+
+ session.close();
+ }
+
+ testPeer.expectClose();
+
+ connection.close();
+ }
+ }
}