AMQNET-637 Handle async send timeout properly
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index 4cf72fa..d52a278 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -119,7 +119,7 @@
return target;
}
- public Task Send(OutboundMessageDispatch envelope)
+ public async Task Send(OutboundMessageDispatch envelope)
{
if (envelope.Message.Facade is AmqpNmsMessageFacade facade)
{
@@ -132,20 +132,16 @@
// If the transaction has failed due to remote termination etc then we just indicate
// the send has succeeded until the a new transaction is started.
if (session.IsTransacted && session.IsTransactionFailed)
- return Task.CompletedTask;
+ return;
var transactionalState = session.TransactionContext?.GetTxnEnrolledState();
if (envelope.FireAndForget)
{
SendSync(message, transactionalState);
- return Task.CompletedTask;
+ return;
}
- return SendAsync(message, transactionalState);
- }
- catch (TimeoutException tex)
- {
- throw ExceptionSupport.GetTimeoutException(this.senderLink, tex.Message);
+ await SendAsync(message, transactionalState).ConfigureAwait(false);
}
catch (AmqpException amqpEx)
{
@@ -159,7 +155,6 @@
throw ExceptionSupport.Wrap(ex);
}
}
- throw ExceptionSupport.GetException(this.senderLink, "unexpected enveloper");
}
private void SendSync(global::Amqp.Message message, DeliveryState deliveryState)
@@ -174,7 +169,11 @@
if (session.Connection.Provider.SendTimeout != NmsConnectionInfo.INFINITE)
{
cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(session.Connection.Provider.SendTimeout));
- cts.Token.Register(_ => tcs.TrySetCanceled(), null);
+ cts.Token.Register(_ =>
+ {
+ var timeoutException = ExceptionSupport.GetTimeoutException(this.senderLink, $"The operation did not complete within the allocated time {session.Connection.Provider.SendTimeout}ms.");
+ tcs.TrySetException(timeoutException);
+ }, null);
}
try
{