AMQNET-718 Cancel inflight messages after send timeout
diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index 90f8c20..5af3318 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -94,7 +94,7 @@
<ItemGroup>
<!-- AMQPNetLite.Core is .NET Standard 1.3 package -->
- <PackageReference Include="AMQPNetLite.Core" Version="2.4.0" />
+ <PackageReference Include="AMQPNetLite.Core" Version="2.4.3" />
<PackageReference Include="Apache.NMS" Version="1.8.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
</ItemGroup>
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index 4ccbdae..7d92c61 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -170,6 +170,7 @@
senderLink.Send(message, deliveryState, Callback, manualResetEvent);
if (!manualResetEvent.WaitOne((int) session.Connection.Provider.SendTimeout))
{
+ senderLink.Cancel(message);
throw new TimeoutException(Fx.Format(SRAmqp.AmqpTimeout, "send", session.Connection.Provider.SendTimeout, nameof(message)));
}
if (outcome == null)
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
index 18e0b9e..47076aa 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
@@ -642,6 +642,10 @@
// send operation to time out.
testPeer.ExpectSenderAttach();
testPeer.ExpectTransferButDoNotRespond(messageMatcher: Assert.NotNull);
+ testPeer.ExpectDisposition(settled: true, state =>
+ {
+ Assert.AreEqual(state.Descriptor.Code, MessageSupport.RELEASED_INSTANCE.Descriptor.Code);
+ });
testPeer.ExpectClose();
IMessageProducer producer = session.CreateProducer(queue);