AMQNET-637 Implement async send for AmqpProducer

diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index 4ccbdae..a6e8206 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -29,6 +29,8 @@
 {
     public class AmqpProducer
     {
+        private static readonly OutcomeCallback _onOutcome = OnOutcome;
+        
         private readonly AmqpSession session;
         private readonly NmsProducerInfo info;
         private SenderLink senderLink;
@@ -117,7 +119,7 @@
             return target;
         }
 
-        public void Send(OutboundMessageDispatch envelope)
+        public Task Send(OutboundMessageDispatch envelope)
         {
             if (envelope.Message.Facade is AmqpNmsMessageFacade facade)
             {
@@ -130,14 +132,17 @@
                     // If the transaction has failed due to remote termination etc then we just indicate
                     // the send has succeeded until the a new transaction is started.
                     if (session.IsTransacted && session.IsTransactionFailed)
-                        return;
+                        return Task.CompletedTask;
 
                     var transactionalState = session.TransactionContext?.GetTxnEnrolledState();
 
                     if (envelope.SendAsync)
-                        SendAsync(message, transactionalState);
+                        return SendAsync(message, transactionalState);
                     else
+                    {
                         SendSync(message, transactionalState);
+                        return Task.CompletedTask;
+                    }
                 }
                 catch (TimeoutException tex)
                 {
@@ -157,39 +162,39 @@
             }
         }
 
-        private void SendAsync(global::Amqp.Message message, DeliveryState deliveryState)
+        private void SendSync(global::Amqp.Message message, DeliveryState deliveryState)
         {
             senderLink.Send(message, deliveryState, null, null);
         }
-
-        private void SendSync(global::Amqp.Message message, DeliveryState deliveryState)
+        
+        private Task SendAsync(global::Amqp.Message message, DeliveryState deliveryState)
         {
-            ManualResetEvent manualResetEvent = new ManualResetEvent(false);
-            Outcome outcome = null;
-
-            senderLink.Send(message, deliveryState, Callback, manualResetEvent);
-            if (!manualResetEvent.WaitOne((int) session.Connection.Provider.SendTimeout))
+            var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
+            senderLink.Send(message, deliveryState, _onOutcome, tcs);
+            return tcs.Task;
+        }
+        
+        private static void OnOutcome(ILink sender, global::Amqp.Message message, Outcome outcome, object state)
+        {
+            var tcs = (TaskCompletionSource<bool>) state;
+            if (outcome.Descriptor.Code == MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code)
             {
-                throw new TimeoutException(Fx.Format(SRAmqp.AmqpTimeout, "send", session.Connection.Provider.SendTimeout, nameof(message)));
+                tcs.TrySetResult(true);
             }
-            if (outcome == null)
-                return;
-            
-            if (outcome.Descriptor.Name.Equals(MessageSupport.RELEASED_INSTANCE.Descriptor.Name))
-            {
-                Error error = new Error(ErrorCode.MessageReleased);
-                throw ExceptionSupport.GetException(error, $"Message {message.Properties.GetMessageId()} released");
-            }
-            if (outcome.Descriptor.Name.Equals(MessageSupport.REJECTED_INSTANCE.Descriptor.Name))
+            else if (outcome.Descriptor.Code == MessageSupport.REJECTED_INSTANCE.Descriptor.Code)
             {
                 Rejected rejected = (Rejected) outcome;
-                throw ExceptionSupport.GetException(rejected.Error, $"Message {message.Properties.GetMessageId()} rejected");
+                tcs.TrySetException(ExceptionSupport.GetException(rejected.Error, $"Message {message.Properties.GetMessageId()} rejected"));
             }
-            
-            void Callback(ILink l, global::Amqp.Message m, Outcome o, object s)
+            else if (outcome.Descriptor.Code == MessageSupport.RELEASED_INSTANCE.Descriptor.Code)
             {
-                outcome = o;
-                manualResetEvent.Set();
+                Error error = new Error(ErrorCode.MessageReleased);
+                tcs.TrySetException(ExceptionSupport.GetException(error, $"Message {message.Properties.GetMessageId()} released"));
+            }
+            else
+            {
+                Error error = new Error(ErrorCode.InternalError);
+                tcs.TrySetException(ExceptionSupport.GetException(error, outcome.ToString()));
             }
         }
 
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
index e908fcd..c31bdeb 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
@@ -266,13 +266,12 @@
 
         public INmsMessageFactory MessageFactory => connection.MessageFactory;
 
-        public Task Send(OutboundMessageDispatch envelope)
+        public async Task Send(OutboundMessageDispatch envelope)
         {
             AmqpSession session = connection.GetSession(envelope.ProducerInfo.SessionId);
             AmqpProducer producer = session.GetProducer(envelope.ProducerId);
-            producer.Send(envelope);
+            await producer.Send(envelope).ConfigureAwait(false);
             envelope.Message.IsReadOnly = false;
-            return Task.CompletedTask;
         }
 
         public Task Unsubscribe(string subscriptionName)