QPIDJMS-484: track client-ack recovery beyond recover() call, send modified-failed dispositions where appropriate at consumer/session close/shutdown
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 7015a6f..09bcd05 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -378,7 +378,7 @@
 
                 try {
                     if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
-                        acknowledge(ACK_TYPE.MODIFIED_FAILED);
+                        acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
                     }
                 } catch (Exception e) {
                     LOG.trace("Exception during session shutdown cleanup acknowledgement", e);
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
index e55426f..51c0441 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
@@ -31,6 +31,7 @@
     private JmsMessage message;
     private boolean enqueueFirst;
     private boolean delivered;
+    private boolean recovered;
 
     private transient JmsConsumerInfo consumerInfo;
     private transient String stringView;
@@ -75,6 +76,14 @@
         this.delivered = delivered;
     }
 
+    public boolean isRecovered() {
+        return recovered;
+    }
+
+    public void setRecovered(boolean recovered) {
+        this.recovered = recovered;
+    }
+
     public int getRedeliveryCount() {
         int redeliveryCount = 0;
 
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
index d00f35f..2e07a75 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
@@ -31,6 +31,7 @@
         MODIFIED_FAILED,
         MODIFIED_FAILED_UNDELIVERABLE,
         // Conceptual
-        DELIVERED
+        DELIVERED,
+        SESSION_SHUTDOWN
     }
 }
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 0acbbb5..202a9e7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -24,6 +24,8 @@
 import java.util.ListIterator;
 import java.util.concurrent.ScheduledFuture;
 
+import javax.jms.Session;
+
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessage;
@@ -54,6 +56,7 @@
     private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
 
     protected final AmqpSession session;
+    protected final int acknowledgementMode;
     protected AsyncResult stopRequest;
     protected AsyncResult pullRequest;
     protected long incomingSequence;
@@ -65,10 +68,13 @@
         super(info, receiver, session);
 
         this.session = session;
+        this.acknowledgementMode = info.getAcknowledgementMode();
     }
 
     @Override
     public void close(AsyncResult request) {
+        acknowledgeUndeliveredRecoveredMessages();
+
         // If we have pending deliveries we remain open to allow for ACK or for a
         // pending transaction that this consumer is active in to complete.
         if (shouldDeferClose()) {
@@ -79,6 +85,27 @@
         }
     }
 
+    private void acknowledgeUndeliveredRecoveredMessages() {
+        if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
+            // Send dispositions for any messages which were previously delivered and
+            // session recovered, but were then not delivered again afterwards.
+            Delivery delivery = getEndpoint().head();
+            while (delivery != null) {
+                Delivery current = delivery;
+                delivery = delivery.next();
+
+                if (!(current.getContext() instanceof JmsInboundMessageDispatch)) {
+                    continue;
+                }
+
+                JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext();
+                if (envelope.isRecovered() && !envelope.isDelivered()) {
+                    handleDisposition(envelope, current, MODIFIED_FAILED);
+                }
+            }
+        }
+    }
+
     /**
      * Starts the consumer by setting the link credit to the given prefetch value.
      *
@@ -220,12 +247,14 @@
             delivery = delivery.next();
 
             if (!(current.getContext() instanceof JmsInboundMessageDispatch)) {
-                LOG.debug("{} Found incomplete delivery with no context during recover processing", AmqpConsumer.this);
+                LOG.debug("{} Found incomplete delivery with no context during session acknowledge processing", AmqpConsumer.this);
                 continue;
             }
 
             JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext();
-            if (envelope.isDelivered()) {
+            if(ackType == ACK_TYPE.SESSION_SHUTDOWN && (envelope.isDelivered() || envelope.isRecovered())) {
+                handleDisposition(envelope, current, MODIFIED_FAILED);
+            } else if (envelope.isDelivered()) {
                 final DeliveryState disposition;
 
                 switch (ackType) {
@@ -304,6 +333,7 @@
     private void handleDelivered(JmsInboundMessageDispatch envelope, Delivery delivery) {
         LOG.debug("Delivered Ack of message: {}", envelope);
         deliveredCount++;
+        envelope.setRecovered(false);
         envelope.setDelivered(true);
         delivery.setDefaultDeliveryState(MODIFIED_FAILED);
     }
@@ -410,6 +440,9 @@
                     envelope.getMessage().getFacade().getRedeliveryCount() + 1);
                 envelope.setEnqueueFirst(true);
                 envelope.setDelivered(false);
+                if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
+                    envelope.setRecovered(true);
+                }
 
                 redispatchList.add(envelope);
             }
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index e04b8f6..0c6dab9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -413,7 +413,7 @@
     }
 
     @Test(timeout=20000)
-    public void testCloseDurableSubscriberWithUnackedAnUnconsumedPrefetchedMessages() throws Exception {
+    public void testCloseDurableSubscriberWithUnconsumedPrefetchedMessages() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);
             connection.start();
@@ -429,7 +429,7 @@
 
             int messageCount = 5;
             // Create a consumer and fill the prefetch with some messages,
-            // which we will consume some of but ack none of.
+            // which we will only consume some of.
             testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
             testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
 
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 46e7ba5..ce50edc 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -64,6 +64,7 @@
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.JmsSession;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -2259,8 +2260,8 @@
 
             receivedTextMessage.acknowledge();
 
-            testPeer.expectDetach(false, true, false);
             testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 2, 2);
+            testPeer.expectDetach(false, true, false);
             testPeer.expectDisposition(true, new ReleasedMatcher(), 3, 3);
 
             subscriber.close();
@@ -2328,4 +2329,260 @@
             connection.close();
         }
     }
+
+    @Test(timeout = 20000)
+    public void testCloseConnectionWithRecoveredUndeliveredAndRedeliveredClientAckMessages() throws Exception {
+        // Send 6, recover 4, redeliver 2, close connection (and so implicitly, session)
+        doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(false, false, 6, 4, 2);
+    }
+
+    @Test(timeout = 20000)
+    public void testCloseConnectionWithRecoveredUndeliveredClientAckMessages() throws Exception {
+        // Send 4, recover 2, redeliver none, close connection (and so implicitly, session)
+        doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(false, false, 4, 2, 0);
+    }
+
+    @Test(timeout = 20000)
+    public void testCloseSessionWithRecoveredUndeliveredAndRedeliveredClientAckMessages() throws Exception {
+        // Send 6, recover 4, redeliver 2, close session (then connection)
+        doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(false, true, 6, 4, 2);
+    }
+
+    @Test(timeout = 20000)
+    public void testCloseSessionWithRecoveredUndeliveredClientAckMessages() throws Exception {
+        // Send 4, recover 2, redeliver none, close session (then connection)
+        doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(false, true, 4, 2, 0);
+    }
+
+    @Test(timeout = 20000)
+    public void testCloseConsumerWithRecoveredUndeliveredAndRedeliveredClientAckMessages() throws Exception {
+        // Send 6, recover 4, redeliver 2, close consumer then connection (and so implicitly, session)
+        doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(true, false, 6, 4, 2);
+    }
+
+    @Test(timeout = 20000)
+    public void testCloseConsumerWithRecoveredUndeliveredClientAckMessages() throws Exception {
+        // Send 4, recover 2, redeliver none, close consumer then connection (and so implicitly, session)
+        doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(true, false, 4, 2, 0);
+    }
+
+    @Test(timeout = 20000)
+    public void testCloseConsumerAndSessionWithRecoveredAndRedeliveredUndeliveredClientAckMessages() throws Exception {
+        // Send 6, recover 4, redeliver 2, close consumer then session (then connection)
+        doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(true, true, 6, 4, 2);
+    }
+
+    @Test(timeout = 20000)
+    public void testCloseConsumerAndSessionWithRecoveredUndeliveredClientAckMessages() throws Exception {
+        // Send 6, recover 4, redeliver 2, close consumer then session (then connection)
+        doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(true, true, 4, 2, 0);
+    }
+
+    private void doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(
+            boolean closeConsumer, boolean closeSession, int msgCount, int deliverBeforeRecoverCount, int deliverAfterRecoverCount) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic topic = session.createTopic(topicName);
+
+            final CountDownLatch incoming = new CountDownLatch(msgCount);
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+
+                @Override
+                public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+                    incoming.countDown();
+                }
+            });
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), msgCount, false, false,
+                    equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, false, true);
+
+            MessageConsumer consumer = session.createConsumer(topic);
+
+            TextMessage receivedTextMessage = null;
+            for (int i = 1; i <= deliverBeforeRecoverCount; i++) {
+                assertNotNull("Expected message did not arrive: " + i, receivedTextMessage = (TextMessage) consumer.receive(3000));
+                assertEquals("Unexpected delivery number", i,  receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+            }
+
+            // Await all incoming messages to arrive at consumer before we recover, ensure deterministic test behaviour.
+            assertTrue("Messages did not arrive in a timely fashion", incoming.await(3, TimeUnit.SECONDS));
+
+            session.recover();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            for (int i = 1; i <= deliverAfterRecoverCount; i++) {
+                assertNotNull("Expected message did not arrive after recover: " + i, receivedTextMessage = (TextMessage) consumer.receive(3000));
+                assertEquals("Unexpected delivery number after recover", i,  receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+            }
+
+            int deliveredAtAnyPoint = Math.max(deliverBeforeRecoverCount, deliverAfterRecoverCount);
+
+            if (closeConsumer) {
+                if(deliverAfterRecoverCount > 0) {
+                    // Remaining credit will be drained if there are delivered messages yet to be acknowledged or recovered again.
+                    testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH - msgCount)));
+                }
+
+                // Any message delivered+recovered before but not then delivered again afterwards, will have disposition sent now.
+                for (int i = deliverAfterRecoverCount + 1; i <= deliverBeforeRecoverCount; i++) {
+                    testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), i, i);
+                }
+
+                if(deliverAfterRecoverCount > 0) {
+                    // Any further remaining messages prefetched will be released.
+                    for (int i = deliveredAtAnyPoint + 1; i <= msgCount; i++) {
+                        testPeer.expectDisposition(true, new ReleasedMatcher(), i, i);
+                    }
+                } else {
+                    // The link will close now
+                    testPeer.expectDetach(true, true, true);
+
+                    // Dispositions sent by proton when the link is freed
+                    for (int i = deliveredAtAnyPoint + 1; i <= msgCount; i++) {
+                        testPeer.expectDisposition(true, new ReleasedMatcher(), i, i);
+                    }
+                }
+
+                consumer.close();
+
+                testPeer.waitForAllHandlersToComplete(1000);
+
+                if(deliverAfterRecoverCount > 0) {
+                    // When the session or connection is closed, outstanding delivered messages will have disposition sent.
+                    for (int i = 1; i <= deliverAfterRecoverCount; i++) {
+                        testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), i, i);
+                    }
+                    testPeer.expectDetach(true, true, true);
+                }
+            } else {
+                // If we dont close the consumer first, all previously delivered messages will have
+                // disposition sent when the session or connection is closed.
+                for (int i = 1; i <= deliveredAtAnyPoint; i++) {
+                    testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), i, i);
+                }
+            }
+
+            if(closeSession) {
+                testPeer.expectEnd();
+
+                session.close();
+
+                testPeer.waitForAllHandlersToComplete(1000);
+            }
+
+            testPeer.expectClose();
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAcknowledgeAllPreviouslyRecoveredClientAckMessages() throws Exception {
+        doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(true, false, true);
+        doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(false, true, true);
+        doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(false, false, true);
+    }
+
+    @Test(timeout = 20000)
+    public void testAcknowledgeSomePreviouslyRecoveredClientAckMessages() throws Exception {
+        doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(true, false, false);
+        doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(false, true, false);
+        doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(false, false, false);
+    }
+
+    private void doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(boolean closeConsumer, boolean closeSession, boolean consumeAllRecovered) throws JMSException, Exception, IOException {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, false, "?jms.clientID=myClientId", null, null, false);
+            connection.start();
+
+            int msgCount = 7;
+            int deliverBeforeRecoverCount = 4;
+            int acknowledgeAfterRecoverCount = consumeAllRecovered ? 5 : 2;
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic topic = session.createTopic(topicName);
+
+            final CountDownLatch incoming = new CountDownLatch(msgCount);
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+
+                @Override
+                public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+                    incoming.countDown();
+                }
+            });
+
+            testPeer.expectReceiverAttach();
+
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), msgCount, false, false,
+                    equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, false, true);
+
+            MessageConsumer consumer = session.createConsumer(topic);
+
+            TextMessage receivedTextMessage = null;
+            for (int i = 1; i <= deliverBeforeRecoverCount; i++) {
+                assertNotNull("Expected message did not arrive: " + i, receivedTextMessage = (TextMessage) consumer.receive(3000));
+                assertEquals("Unexpected delivery number", i,  receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+            }
+
+            // Await all incoming messages to arrive at consumer before we recover, ensure deterministic test behaviour.
+            assertTrue("Messages did not arrive in a timely fashion", incoming.await(3, TimeUnit.SECONDS));
+
+            session.recover();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            for (int i = 1; i <= acknowledgeAfterRecoverCount; i++) {
+                assertNotNull("Expected message did not arrive after recover: " + i, receivedTextMessage = (TextMessage) consumer.receive(3000));
+                assertEquals("Unexpected delivery number after recover", i,  receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+                testPeer.expectDisposition(true, new AcceptedMatcher(), i, i);
+                receivedTextMessage.acknowledge();
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            if(!consumeAllRecovered) {
+                // Any message delivered+recovered before but not then delivered and acknowledged afterwards, will have
+                // disposition sent as consumer/session/connection is closed.
+                for (int i = acknowledgeAfterRecoverCount + 1; i <= deliverBeforeRecoverCount; i++) {
+                    testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), i, i);
+                }
+            }
+
+            if(closeConsumer) {
+                testPeer.expectDetach(true,  true,  true);
+
+                // Dispositions sent by proton when the link is freed
+                for (int i = Math.max(deliverBeforeRecoverCount, acknowledgeAfterRecoverCount) + 1; i <= msgCount; i++) {
+                    testPeer.expectDisposition(true, new ReleasedMatcher(), i, i);
+                }
+
+                consumer.close();
+            }
+
+            if(closeSession) {
+                testPeer.expectEnd();
+
+                session.close();
+            }
+
+            testPeer.expectClose();
+
+            connection.close();
+        }
+    }
 }