QPIDJMS-515 Do not recycle a delivery tag from a failed send operation

When send fails that has used a delivery tag do not recycle the tag as
the remote state is unknown and the next send could reuse that tag and
create an error as tags are meant to be unique amongst the unsettled
deliveries on the link.
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index f7e9c2a..ad993bd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -400,19 +400,17 @@
         private void handleSendCompletion(boolean successful) {
             setRequestTimeout(null);
 
+            // Null delivery means that we never had credit to send so no delivery was created to carry the message.
             if (getDelivery() != null) {
                 sent.remove(envelope.getMessageId());
                 delivery.settle();
-                tagGenerator.returnTag(delivery.getTag());
-            } else {
-                blocked.remove(envelope.getMessageId());
-            }
-
-            // Null delivery means that we never had credit to send so no delivery was created to carry the message.
-            if (delivery != null) {
-                DeliveryState remoteState = delivery.getRemoteState();
+                if (successful) {
+                    tagGenerator.returnTag(delivery.getTag());
+                }
+                final DeliveryState remoteState = delivery.getRemoteState();
                 tracer.completeSend(envelope.getMessage().getFacade(), remoteState == null ? null : remoteState.getType().name());
             } else {
+                blocked.remove(envelope.getMessageId());
                 tracer.completeSend(envelope.getMessage().getFacade(), null);
             }
 
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 5f74521..888b2d8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -3051,4 +3051,42 @@
             finalPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout = 20000)
+    public void testSendTimeoutDoesNotRecycleDeliveryTag() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(500);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+            // Expect the producer to attach and grant it some credit, it should send
+            // a transfer which we will not send any response for which should cause the
+            // send operation to time out.
+            testPeer.expectSenderAttach();
+            testPeer.expectTransferButDoNotRespond(messageMatcher, equalTo(new Binary(new byte[] { 0 })));
+            testPeer.expectTransfer(messageMatcher, equalTo(new Binary(new byte[] { 1 })));
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(queue);
+
+            try {
+                producer.send(session.createTextMessage("text"));
+                fail("Send should fail after send timeout exceeded.");
+            } catch (JmsSendTimedOutException error) {
+            }
+
+            producer.send(session.createTextMessage("text"));
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 67b2582..6d54148 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -2167,11 +2167,21 @@
         expectTransfer(expectedPayloadMatcher, nullValue(), false, false, null, false);
     }
 
+    public void expectTransferButDoNotRespond(Matcher<Binary> expectedPayloadMatcher, Matcher<Binary> deliveryTagMatcher)
+    {
+        expectTransfer(expectedPayloadMatcher, deliveryTagMatcher, nullValue(), false, false, null, false, 0, 0);
+    }
+
     public void expectTransfer(Matcher<Binary> expectedPayloadMatcher)
     {
         expectTransfer(expectedPayloadMatcher, nullValue(), false, true, new Accepted(), true);
     }
 
+    public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<Binary> deliveryTagMatcher)
+    {
+        expectTransfer(expectedPayloadMatcher, deliveryTagMatcher, nullValue(), false, true, new Accepted(), true, 0, 0);
+    }
+
     public void expectTransfer(int frameSize)
     {
         expectTransfer(null, nullValue(), false, true, new Accepted(), true, frameSize, 0);
@@ -2189,9 +2199,17 @@
         expectTransfer(expectedPayloadMatcher, stateMatcher, settled, sendResponseDisposition, responseState, responseSettled, 0, 0);
     }
 
+    public void expectTransfer(Matcher<Binary> expectedPayloadMatcher,
+            Matcher<?> stateMatcher, boolean settled, boolean sendResponseDisposition, ListDescribedType responseState,
+            boolean responseSettled, int frameSize, long dispositionDelay)
+    {
+        expectTransfer(expectedPayloadMatcher, null, stateMatcher, settled, sendResponseDisposition, responseState, responseSettled, frameSize, dispositionDelay);
+    }
+
     //TODO: fix responseState to only admit applicable types.
-    public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher, boolean settled,
-            boolean sendResponseDisposition, ListDescribedType responseState, boolean responseSettled, int frameSize, long dispositionDelay)
+    public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<Binary> deliveryTagMatcher,
+            Matcher<?> stateMatcher, boolean settled, boolean sendResponseDisposition, ListDescribedType responseState,
+            boolean responseSettled, int frameSize, long dispositionDelay)
     {
         Matcher<Boolean> settledMatcher = null;
         if(settled)
@@ -2207,6 +2225,9 @@
         transferMatcher.setPayloadMatcher(expectedPayloadMatcher);
         transferMatcher.withSettled(settledMatcher);
         transferMatcher.withState(stateMatcher);
+        if (deliveryTagMatcher != null) {
+            transferMatcher.withDeliveryTag(deliveryTagMatcher);
+        }
 
         if(sendResponseDisposition) {
             final DispositionFrame dispositionResponse = new DispositionFrame()