/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using System.Threading.Tasks;
using Amqp.Framing;
using Amqp.Types;
using Apache.NMS;
using Apache.NMS.AMQP;
using Apache.NMS.AMQP.Util;
using Moq;
using NMS.AMQP.Test.TestAmqp;
using NMS.AMQP.Test.TestAmqp.BasicTypes;
using NUnit.Framework;

namespace NMS.AMQP.Test.Integration.Async
{
    [TestFixture]
    public class ProducerIntegrationTestAsync : IntegrationTestFixture
    {
        private const long TICKS_PER_MILLISECOND = 10000;

        [Test, Timeout(20_000)]
        public async Task TestCloseSender()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await base.EstablishConnectionAsync(testPeer);
                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue queue = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync();

                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
                testPeer.ExpectClose();

                await producer.CloseAsync();
                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestSentTextMessageCanBeModified()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await base.EstablishConnectionAsync(testPeer);
                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue queue = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(queue);

                // Create and transfer a new message
                String text = "myMessage";
                testPeer.ExpectTransfer(x => Assert.AreEqual(text, (x.BodySection as AmqpValue).Value));
                testPeer.ExpectClose();

                ITextMessage message = await session.CreateTextMessageAsync(text);
                await producer.SendAsync(message);

                Assert.AreEqual(text, message.Text);
                message.Text = text + text;
                Assert.AreEqual(text + text, message.Text);

                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestDefaultDeliveryModeProducesDurableMessages()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await base.EstablishConnectionAsync(testPeer);
                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue queue = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(queue);

                // Create and transfer a new message
                testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable));
                testPeer.ExpectClose();

                ITextMessage textMessage = await session.CreateTextMessageAsync();

                await producer.SendAsync(textMessage);
                Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);

                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestProducerOverridesMessageDeliveryMode()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await base.EstablishConnectionAsync(testPeer);
                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue queue = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(queue);

                // Create and transfer a new message, explicitly setting the deliveryMode on the
                // message (which applications shouldn't) to NON_PERSISTENT and sending it to check
                // that the producer ignores this value and sends the message as PERSISTENT(/durable)
                testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable));
                testPeer.ExpectClose();

                ITextMessage textMessage = await session.CreateTextMessageAsync();
                textMessage.NMSDeliveryMode = MsgDeliveryMode.NonPersistent;
                Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode);

                await producer.SendAsync(textMessage);

                Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);

                await connection.CloseAsync();
                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageNonPersistentProducerSetDurableFalse()
        {
            await DoSendingMessageNonPersistentTestImpl(false, true, true);
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageNonPersistentProducerSetDurableFalseAnonymousProducer()
        {
            await DoSendingMessageNonPersistentTestImpl(true, true, true);
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageNonPersistentSendSetDurableFalse()
        {
            await DoSendingMessageNonPersistentTestImpl(false, true, false);
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageNonPersistentSendSetDurableFalseAnonymousProducer()
        {
            await DoSendingMessageNonPersistentTestImpl(true, true, false);
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageNonPersistentProducerOmitsHeader()
        {
            await DoSendingMessageNonPersistentTestImpl(false, false, true);
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageNonPersistentProducerOmitsHeaderAnonymousProducer()
        {
            await DoSendingMessageNonPersistentTestImpl(true, false, true);
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageNonPersistentSendOmitsHeader()
        {
            await DoSendingMessageNonPersistentTestImpl(false, false, false);
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageNonPersistentSendOmitsHeaderAnonymousProducer()
        {
            await DoSendingMessageNonPersistentTestImpl(true, false, false);
        }

        private async Task DoSendingMessageNonPersistentTestImpl(bool anonymousProducer, bool setPriority, bool setOnProducer)
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                //Add capability to indicate support for ANONYMOUS-RELAY
                Symbol[] serverCapabilities = { SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY };
                IConnection connection = await EstablishConnectionAsync(testPeer, serverCapabilities: serverCapabilities);
                testPeer.ExpectBegin();

                string queueName = "myQueue";
                Action<object> targetMatcher = t =>
                {
                    var target = t as Target;
                    Assert.IsNotNull(target);
                    if (anonymousProducer)
                        Assert.IsNull(target.Address);
                    else
                        Assert.AreEqual(queueName, target.Address);
                };
                

                testPeer.ExpectSenderAttach(targetMatcher: targetMatcher, sourceMatcher: Assert.NotNull, senderSettled: false);

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue queue = await session.GetQueueAsync(queueName);
                IMessageProducer producer = null;
                if (anonymousProducer)
                    producer = await session.CreateProducerAsync();
                else
                    producer = await session.CreateProducerAsync(queue);

                byte priority = 5;
                String text = "myMessage";
                testPeer.ExpectTransfer(messageMatcher: message =>
                    {
                        if (setPriority)
                        {
                            Assert.IsFalse(message.Header.Durable);
                            Assert.AreEqual(5, message.Header.Priority);
                        }

                        Assert.AreEqual(text, (message.BodySection as AmqpValue).Value);
                    }, stateMatcher: Assert.IsNull,
                    settled: false,
                    sendResponseDisposition: true,
                    responseState: new Accepted(),
                    responseSettled: true);

                ITextMessage textMessage = await session.CreateTextMessageAsync(text);

                if (setOnProducer)
                {
                    producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
                    if (setPriority)
                        producer.Priority = (MsgPriority) 5;

                    if (anonymousProducer)
                        await producer.SendAsync(queue, textMessage);
                    else
                        await producer.SendAsync(textMessage);
                }
                else
                {
                    if (anonymousProducer)
                    {
                        await producer.SendAsync(destination: queue,
                            message: textMessage,
                            deliveryMode: MsgDeliveryMode.NonPersistent,
                            priority: setPriority ? (MsgPriority) priority : NMSConstants.defaultPriority,
                            timeToLive: NMSConstants.defaultTimeToLive);
                    }
                    else
                    {
                        await producer.SendAsync(message: textMessage,
                            deliveryMode: MsgDeliveryMode.NonPersistent,
                            priority: setPriority ? (MsgPriority) priority : NMSConstants.defaultPriority,
                            timeToLive: NMSConstants.defaultTimeToLive);
                    }
                }

                Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode, "Should have NonPersistent delivery mode set");

                testPeer.WaitForAllMatchersToComplete(1000);

                testPeer.ExpectClose();
                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageSetsNMSDestination()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer);
                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue destination = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(destination);

                string text = "myMessage";
                ITextMessage message = await session.CreateTextMessageAsync(text);

                testPeer.ExpectTransfer(m => Assert.AreEqual(text, (m.BodySection as AmqpValue).Value));
                testPeer.ExpectClose();

                Assert.IsNull(message.NMSDestination, "Should not yet have a NMSDestination");

                await producer.SendAsync(message);

                Assert.AreEqual(destination, message.NMSDestination, "Should have had NMSDestination set");

                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageSetsNMSTimestamp()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer);
                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue destination = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(destination);

                // Create matcher to expect the absolute-expiry-time field of the properties section to
                // be set to a value greater than 'now'+ttl, within a delta.

                DateTime creationLower = DateTime.UtcNow;
                DateTime creationUpper = creationLower + TimeSpan.FromMilliseconds(3000);

                var text = "myMessage";
                testPeer.ExpectTransfer(m =>
                {
                    Assert.IsTrue(m.Header.Durable);
                    Assert.That(m.Properties.CreationTime.Ticks, Is.GreaterThanOrEqualTo(creationLower.Ticks).Within(TICKS_PER_MILLISECOND));
                    Assert.That(m.Properties.CreationTime.Ticks, Is.LessThanOrEqualTo(creationUpper.Ticks).Within(TICKS_PER_MILLISECOND));
                    Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
                });

                ITextMessage message = await session.CreateTextMessageAsync(text);
                await producer.SendAsync(message);

                testPeer.WaitForAllMatchersToComplete(1000);

                testPeer.ExpectClose();
                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageSetsNMSExpirationRelatedAbsoluteExpiryAndTtlFields()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer);
                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue destination = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(destination);

                uint ttl = 100_000;
                DateTime currentTime = DateTime.UtcNow;
                DateTime expirationLower = currentTime + TimeSpan.FromMilliseconds(ttl);
                DateTime expirationUpper = currentTime + TimeSpan.FromMilliseconds(ttl) + TimeSpan.FromMilliseconds(5000);

                // Create matcher to expect the absolute-expiry-time field of the properties section to
                // be set to a value greater than 'now'+ttl, within a delta.
                string text = "myMessage";
                testPeer.ExpectTransfer(m =>
                {
                    Assert.IsTrue(m.Header.Durable);
                    Assert.AreEqual(ttl, m.Header.Ttl);
                    Assert.That(m.Properties.AbsoluteExpiryTime.Ticks, Is.GreaterThanOrEqualTo(expirationLower.Ticks).Within(TICKS_PER_MILLISECOND));
                    Assert.That(m.Properties.AbsoluteExpiryTime.Ticks, Is.LessThanOrEqualTo(expirationUpper.Ticks).Within(TICKS_PER_MILLISECOND));
                    Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
                });

                ITextMessage message = await session.CreateTextMessageAsync(text);
                await producer.SendAsync(message, NMSConstants.defaultDeliveryMode, NMSConstants.defaultPriority, TimeSpan.FromMilliseconds(ttl));

                testPeer.WaitForAllMatchersToComplete(1000);

                testPeer.ExpectClose();
                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }
        
        [Test, Timeout(20_000)]
        public async Task TestMessagesAreProducedWithProperDefaultPriorityWhenNoPrioritySpecified()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer);
                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue destination = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(destination);

                byte priority = 4;

                testPeer.ExpectTransfer(m => Assert.AreEqual(priority, m.Header.Priority));
                testPeer.ExpectClose();

                ITextMessage message = await session.CreateTextMessageAsync();
                Assert.AreEqual(MsgPriority.BelowNormal, message.NMSPriority);

                await producer.SendAsync(message);

                Assert.AreEqual((MsgPriority) priority, message.NMSPriority);

                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestNonDefaultPriorityProducesMessagesWithPriorityFieldAndSetsNMSPriority()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer);
                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue destination = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(destination);

                byte priority = 9;

                testPeer.ExpectTransfer(m => Assert.AreEqual(priority, m.Header.Priority));
                testPeer.ExpectClose();

                ITextMessage message = await session.CreateTextMessageAsync();
                Assert.AreEqual(MsgPriority.BelowNormal, message.NMSPriority);

                await producer.SendAsync(message, MsgDeliveryMode.Persistent, (MsgPriority) priority, NMSConstants.defaultTimeToLive);

                Assert.AreEqual((MsgPriority) priority, message.NMSPriority);

                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageSetsNMSMessageId()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer);
                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue destination = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(destination);

                string text = "myMessage";
                string actualMessageId = null;
                testPeer.ExpectTransfer(m =>
                {
                    Assert.IsTrue(m.Header.Durable);
                    Assert.IsNotEmpty(m.Properties.MessageId);
                    actualMessageId = m.Properties.MessageId;
                });
                testPeer.ExpectClose();

                ITextMessage message = await session.CreateTextMessageAsync(text);
                Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set");

                await producer.SendAsync(message);

                Assert.IsNotNull(message.NMSMessageId);
                Assert.IsNotEmpty(message.NMSMessageId, "NMSMessageId should be set");
                Assert.IsTrue(message.NMSMessageId.StartsWith("ID:"), "MMS 'ID:' prefix not found");

                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
                // Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally
                Assert.AreEqual(message.NMSMessageId, actualMessageId, "Expected NMSMessageId value to be present in AMQP message");
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageWithDisableMessageIdHint()
        {
            await DoSendingMessageWithDisableMessageIdHintTestImpl(false);
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageWithDisableMessageIdHintAndExistingMessageId()
        {
            await DoSendingMessageWithDisableMessageIdHintTestImpl(true);
        }

        private async Task DoSendingMessageWithDisableMessageIdHintTestImpl(bool existingId)
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer);
                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue destination = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(destination);

                string text = "myMessage";
                testPeer.ExpectTransfer(m =>
                {
                    Assert.IsTrue(m.Header.Durable);
                    Assert.IsNull(m.Properties.MessageId); // Check there is no message-id value;
                    Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
                });
                testPeer.ExpectClose();

                ITextMessage message = await session.CreateTextMessageAsync(text);

                Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set");

                if (existingId)
                {
                    string existingMessageId = "ID:this-should-be-overwritten-in-send";
                    message.NMSMessageId = existingMessageId;
                    Assert.AreEqual(existingMessageId, message.NMSMessageId, "NMSMessageId should now be se");
                }

                producer.DisableMessageID = true;

                await producer.SendAsync(message);

                Assert.IsNull(message.NMSMessageId, "NMSMessageID should be null");

                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(2000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestRemotelyCloseProducer()
        {
            string breadCrumb = "ErrorMessageBreadCrumb";

            ManualResetEvent producerClosed = new ManualResetEvent(false);
            Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
            mockConnectionListener
                .Setup(listener => listener.OnProducerClosed(It.IsAny<NmsMessageProducer>(), It.IsAny<Exception>()))
                .Callback(() => { producerClosed.Set(); });

            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                NmsConnection connection = (NmsConnection) await EstablishConnectionAsync(testPeer);
                connection.AddConnectionListener(mockConnectionListener.Object);

                testPeer.ExpectBegin();
                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);

                // Create a producer, then remotely end it afterwards.
                testPeer.ExpectSenderAttach();
                testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, breadCrumb, delayBeforeSend: 10);

                IQueue destination = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(destination);

                // Verify the producer gets marked closed
                testPeer.WaitForAllMatchersToComplete(1000);

                Assert.True(producerClosed.WaitOne(TimeSpan.FromMilliseconds(1000)), "Producer closed callback didn't trigger");
                Assert.That(() => producer.DisableMessageID, Throws.Exception.InstanceOf<IllegalStateException>(), "Producer never closed");

                // Try closing it explicitly, should effectively no-op in client.
                // The test peer will throw during close if it sends anything.
                await producer.CloseAsync();
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestSendWhenLinkCreditIsZeroAndTimeout()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer, optionsString: "nms.sendTimeout=500");
                testPeer.ExpectBegin();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue queue = await session.GetQueueAsync("myQueue");

                ITextMessage message = await session.CreateTextMessageAsync("text");

                // Expect the producer to attach. Don't send any credit so that the client will
                // block on a send and we can test our timeouts.
                testPeer.ExpectSenderAttachWithoutGrantingCredit();
                testPeer.ExpectClose();

                IMessageProducer producer = await session.CreateProducerAsync(queue);

                Assert.CatchAsync<Exception>(async () => await producer.SendAsync(message), "Send should time out.");

                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestSendTimesOutWhenNoDispositionArrives()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer, optionsString: "nms.sendTimeout=500");
                testPeer.ExpectBegin();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue queue = await session.GetQueueAsync("myQueue");

                ITextMessage message = await session.CreateTextMessageAsync("text");

                // Expect the producer to attach and grant it some credit, it should send
                // a transfer which we will not send any response for which should cause the
                // send operation to time out.
                testPeer.ExpectSenderAttach();
                testPeer.ExpectTransferButDoNotRespond(messageMatcher: Assert.NotNull);
                testPeer.ExpectClose();

                IMessageProducer producer = await session.CreateProducerAsync(queue);

                Assert.CatchAsync<Exception>(async () => await producer.SendAsync(message), "Send should time out.");

                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestSendWorksWhenConnectionNotStarted()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer);

                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue destination = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(destination);

                testPeer.ExpectTransfer(Assert.IsNotNull);

                await producer.SendAsync(await session.CreateMessageAsync());

                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
                await producer.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestSendWorksAfterConnectionStopped()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer);
                await connection.StartAsync();

                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue destination = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(destination);

                testPeer.ExpectTransfer(Assert.IsNotNull);

                await connection.StopAsync();

                await producer.SendAsync(await session.CreateMessageAsync());

                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
                testPeer.ExpectClose();

                await producer.CloseAsync();
                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }
  
        [Test, Timeout(20_000)]
        public async Task TestSendingMessagePersistentSetsBatchableFalse()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer);
                await connection.StartAsync();

                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue destination = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(destination);
                testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull,
                    stateMatcher: Assert.IsNull,
                    settled: false,
                    sendResponseDisposition: true,
                    responseState: new Accepted(),
                    responseSettled: true,
                    batchable: false);

                IMessage message = await session.CreateMessageAsync();
                await producer.SendAsync(message: message, deliveryMode: MsgDeliveryMode.Persistent, MsgPriority.Normal, NMSConstants.defaultTimeToLive);
                
                testPeer.WaitForAllMatchersToComplete(1000);

                testPeer.ExpectClose();
                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }

        [Test, Timeout(20_000)]
        public async Task TestSendingMessageNonPersistentSetsBatchableFalse()
        {
            using (TestAmqpPeer testPeer = new TestAmqpPeer())
            {
                IConnection connection = await EstablishConnectionAsync(testPeer);
                await connection.StartAsync();

                testPeer.ExpectBegin();
                testPeer.ExpectSenderAttach();

                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
                IQueue destination = await session.GetQueueAsync("myQueue");
                IMessageProducer producer = await session.CreateProducerAsync(destination);
                testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull,
                    stateMatcher: Assert.IsNull,
                    settled: false,
                    sendResponseDisposition: true,
                    responseState: new Accepted(),
                    responseSettled: true,
                    batchable: false);

                IMessage message = await session.CreateMessageAsync();
                await producer.SendAsync(message: message, deliveryMode: MsgDeliveryMode.NonPersistent, MsgPriority.Normal, NMSConstants.defaultTimeToLive);
                
                testPeer.WaitForAllMatchersToComplete(1000);

                testPeer.ExpectClose();
                await connection.CloseAsync();

                testPeer.WaitForAllMatchersToComplete(1000);
            }
        }
    }
}