AMQNET-637 Implement async send for AmqpProducer
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index 4ccbdae..a6e8206 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -29,6 +29,8 @@
{
public class AmqpProducer
{
+ private static readonly OutcomeCallback _onOutcome = OnOutcome;
+
private readonly AmqpSession session;
private readonly NmsProducerInfo info;
private SenderLink senderLink;
@@ -117,7 +119,7 @@
return target;
}
- public void Send(OutboundMessageDispatch envelope)
+ public Task Send(OutboundMessageDispatch envelope)
{
if (envelope.Message.Facade is AmqpNmsMessageFacade facade)
{
@@ -130,14 +132,17 @@
// If the transaction has failed due to remote termination etc then we just indicate
// the send has succeeded until the a new transaction is started.
if (session.IsTransacted && session.IsTransactionFailed)
- return;
+ return Task.CompletedTask;
var transactionalState = session.TransactionContext?.GetTxnEnrolledState();
if (envelope.SendAsync)
- SendAsync(message, transactionalState);
+ return SendAsync(message, transactionalState);
else
+ {
SendSync(message, transactionalState);
+ return Task.CompletedTask;
+ }
}
catch (TimeoutException tex)
{
@@ -157,39 +162,39 @@
}
}
- private void SendAsync(global::Amqp.Message message, DeliveryState deliveryState)
+ private void SendSync(global::Amqp.Message message, DeliveryState deliveryState)
{
senderLink.Send(message, deliveryState, null, null);
}
-
- private void SendSync(global::Amqp.Message message, DeliveryState deliveryState)
+
+ private Task SendAsync(global::Amqp.Message message, DeliveryState deliveryState)
{
- ManualResetEvent manualResetEvent = new ManualResetEvent(false);
- Outcome outcome = null;
-
- senderLink.Send(message, deliveryState, Callback, manualResetEvent);
- if (!manualResetEvent.WaitOne((int) session.Connection.Provider.SendTimeout))
+ var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
+ senderLink.Send(message, deliveryState, _onOutcome, tcs);
+ return tcs.Task;
+ }
+
+ private static void OnOutcome(ILink sender, global::Amqp.Message message, Outcome outcome, object state)
+ {
+ var tcs = (TaskCompletionSource<bool>) state;
+ if (outcome.Descriptor.Code == MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code)
{
- throw new TimeoutException(Fx.Format(SRAmqp.AmqpTimeout, "send", session.Connection.Provider.SendTimeout, nameof(message)));
+ tcs.TrySetResult(true);
}
- if (outcome == null)
- return;
-
- if (outcome.Descriptor.Name.Equals(MessageSupport.RELEASED_INSTANCE.Descriptor.Name))
- {
- Error error = new Error(ErrorCode.MessageReleased);
- throw ExceptionSupport.GetException(error, $"Message {message.Properties.GetMessageId()} released");
- }
- if (outcome.Descriptor.Name.Equals(MessageSupport.REJECTED_INSTANCE.Descriptor.Name))
+ else if (outcome.Descriptor.Code == MessageSupport.REJECTED_INSTANCE.Descriptor.Code)
{
Rejected rejected = (Rejected) outcome;
- throw ExceptionSupport.GetException(rejected.Error, $"Message {message.Properties.GetMessageId()} rejected");
+ tcs.TrySetException(ExceptionSupport.GetException(rejected.Error, $"Message {message.Properties.GetMessageId()} rejected"));
}
-
- void Callback(ILink l, global::Amqp.Message m, Outcome o, object s)
+ else if (outcome.Descriptor.Code == MessageSupport.RELEASED_INSTANCE.Descriptor.Code)
{
- outcome = o;
- manualResetEvent.Set();
+ Error error = new Error(ErrorCode.MessageReleased);
+ tcs.TrySetException(ExceptionSupport.GetException(error, $"Message {message.Properties.GetMessageId()} released"));
+ }
+ else
+ {
+ Error error = new Error(ErrorCode.InternalError);
+ tcs.TrySetException(ExceptionSupport.GetException(error, outcome.ToString()));
}
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
index e908fcd..c31bdeb 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
@@ -266,13 +266,12 @@
public INmsMessageFactory MessageFactory => connection.MessageFactory;
- public Task Send(OutboundMessageDispatch envelope)
+ public async Task Send(OutboundMessageDispatch envelope)
{
AmqpSession session = connection.GetSession(envelope.ProducerInfo.SessionId);
AmqpProducer producer = session.GetProducer(envelope.ProducerId);
- producer.Send(envelope);
+ await producer.Send(envelope).ConfigureAwait(false);
envelope.Message.IsReadOnly = false;
- return Task.CompletedTask;
}
public Task Unsubscribe(string subscriptionName)