Merge pull request #20 from Havret/set_batchable_flag_properly

AMQNET-602: Set batchable flag to false for amqp transfer
diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index 5f67b68..dfb16a4 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -49,8 +49,8 @@
     </PropertyGroup>
 
     <ItemGroup>
-        <None Include="..\..\LICENSE.txt" Pack="true" PackagePath="LICENSE.txt" />
-        <None Include="..\..\NOTICE.txt" Pack="true" PackagePath="NOTICE.txt" />
+        <None Include="..\..\LICENSE.txt" Pack="true" PackagePath="LICENSE.txt"/>
+        <None Include="..\..\NOTICE.txt" Pack="true" PackagePath="NOTICE.txt"/>
     </ItemGroup>
 
     <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index 0ca5b55..38f16d4 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -44,9 +44,10 @@
         public AmqpProvider Provider { get; }
         private readonly ITransportContext transport;
         private readonly Uri remoteUri;
-        private global::Amqp.Connection underlyingConnection;
+        private Connection underlyingConnection;
         private readonly AmqpMessageFactory messageFactory;
         private AmqpConnectionSession connectionSession;
+        private TaskCompletionSource<bool> tsc;
 
         public AmqpConnection(AmqpProvider provider, ITransportContext transport, ConnectionInfo info)
         {
@@ -68,39 +69,39 @@
         internal async Task Start()
         {
             Address address = UriUtil.ToAddress(remoteUri, Info.username, Info.password);
-            underlyingConnection = await transport.CreateAsync(address, CreateOpenFrame(Info), OnOpened);
+            this.tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
+            underlyingConnection = await transport.CreateAsync(address, new AmqpHandler(this)).ConfigureAwait(false);
             underlyingConnection.AddClosedCallback(Provider.OnInternalClosed);
+            
+            // Wait for connection to be opened
+            await tsc.Task;
 
             // Create a Session for this connection that is used for Temporary Destinations
             // and perhaps later on management and advisory monitoring.
-
             // TODO: change the way how connection session id is obtained
             SessionInfo sessionInfo = new SessionInfo(Info.Id);
             sessionInfo.AcknowledgementMode = AcknowledgementMode.AutoAcknowledge;
 
             connectionSession = new AmqpConnectionSession(this, sessionInfo);
-            await connectionSession.Start();
+            await connectionSession.Start().ConfigureAwait(false);
         }
 
-        private Open CreateOpenFrame(ConnectionInfo connInfo)
+        internal void OnLocalOpen(Open open)
         {
-            return new Open
+            open.ContainerId = Info.ClientId;
+            open.ChannelMax = Info.channelMax;
+            open.MaxFrameSize = Convert.ToUInt32(Info.maxFrameSize);
+            open.HostName = remoteUri.Host;
+            open.IdleTimeOut = Convert.ToUInt32(Info.idleTimout);
+            open.DesiredCapabilities = new[]
             {
-                ContainerId = connInfo.ClientId,
-                ChannelMax = connInfo.channelMax,
-                MaxFrameSize = Convert.ToUInt32(connInfo.maxFrameSize),
-                HostName = remoteUri.Host,
-                IdleTimeOut = Convert.ToUInt32(connInfo.idleTimout),
-                DesiredCapabilities = new[]
-                {
-                    SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
-                    SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY,
-                    SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY
-                }
+                SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
+                SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY,
+                SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY
             };
         }
 
-        private void OnOpened(global::Amqp.IConnection connection, Open open)
+        internal void OnRemoteOpened(Open open)
         {
             if (SymbolUtil.CheckAndCompareFields(open.Properties, SymbolUtil.CONNECTION_ESTABLISH_FAILED, SymbolUtil.BOOLEAN_TRUE))
             {
@@ -121,6 +122,7 @@
                     Info.QueuePrefix = queuePrefix;
                 }
 
+                this.tsc.SetResult(true);
                 Provider.FireConnectionEstablished();
             }
         }
@@ -172,7 +174,7 @@
         {
             return temporaryDestinations.TryGetValue(destination.Id, out AmqpTemporaryDestination amqpTemporaryDestination) ? amqpTemporaryDestination : null;
         }
-        
+
         public void RemoveTemporaryDestination(Id destinationId)
         {
             temporaryDestinations.TryRemove(destinationId, out _);
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpHandler.cs b/src/NMS.AMQP/Provider/Amqp/AmqpHandler.cs
new file mode 100644
index 0000000..6841d78
--- /dev/null
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpHandler.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Amqp.Framing;
+using Amqp.Handler;
+
+namespace Apache.NMS.AMQP.Provider.Amqp
+{
+    internal class AmqpHandler : IHandler
+    {
+        private readonly AmqpConnection connection;
+
+        public AmqpHandler(AmqpConnection connection)
+        {
+            this.connection = connection;
+        }
+
+        public bool CanHandle(EventId id)
+        {
+            switch (id)
+            {
+                case EventId.SendDelivery:
+                case EventId.ConnectionRemoteOpen:
+                case EventId.ConnectionLocalOpen:
+                    return true;
+                default:
+                    return false;
+            }
+        }
+
+        public void Handle(Event protocolEvent)
+        {
+            switch (protocolEvent.Id)
+            {
+                case EventId.SendDelivery when protocolEvent.Context is IDelivery delivery:
+                    delivery.Batchable = false;
+                    break;
+                case EventId.ConnectionRemoteOpen when protocolEvent.Context is Open open:
+                    this.connection.OnRemoteOpened(open);
+                    break;
+                case EventId.ConnectionLocalOpen when protocolEvent.Context is Open open:
+                    this.connection.OnLocalOpen(open);
+                    break;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index 9fcaa6e..d767577 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -121,13 +121,7 @@
                     if (envelope.SendAsync)
                         SendAsync(message, transactionalState);
                     else
-                    {
-                        // TODO: Should be unified after https://github.com/Azure/amqpnetlite/pull/374 is sorted out
-                        if (transactionalState != null)
-                            SendSync(message, transactionalState);
-                        else
-                            senderLink.Send(message, TimeSpan.FromMilliseconds(session.Connection.Provider.SendTimeout));
-                    }
+                        SendSync(message, transactionalState);
                 }
                 catch (TimeoutException tex)
                 {
diff --git a/src/NMS.AMQP/Transport/ITransportContext.cs b/src/NMS.AMQP/Transport/ITransportContext.cs
index d3132a9..ad225be 100644
--- a/src/NMS.AMQP/Transport/ITransportContext.cs
+++ b/src/NMS.AMQP/Transport/ITransportContext.cs
@@ -18,6 +18,7 @@
 using System.Threading.Tasks;
 using Amqp;
 using Amqp.Framing;
+using Amqp.Handler;
 
 namespace Apache.NMS.AMQP.Transport
 {
@@ -37,6 +38,6 @@
 
         ITransportContext Copy();
 
-        Task<Amqp.Connection> CreateAsync(Address address, Open open = null, OnOpened onOpened = null);
+        Task<Connection> CreateAsync(Address address, IHandler handler);
     }
 }
diff --git a/src/NMS.AMQP/Transport/SecureTransportContext.cs b/src/NMS.AMQP/Transport/SecureTransportContext.cs
index a4a3eb8..cc3913c 100644
--- a/src/NMS.AMQP/Transport/SecureTransportContext.cs
+++ b/src/NMS.AMQP/Transport/SecureTransportContext.cs
@@ -24,6 +24,7 @@
 using System.Threading.Tasks;
 using Amqp;
 using Amqp.Framing;
+using Amqp.Handler;
 using Apache.NMS.AMQP.Util;
 
 namespace Apache.NMS.AMQP.Transport
@@ -228,7 +229,7 @@
 
         #region IProviderSecureTransportContext Methods
 
-        public override Task<Amqp.Connection> CreateAsync(Address address, Open open = null, OnOpened onOpened = null)
+        public override Task<Amqp.Connection> CreateAsync(Address address, IHandler handler)
         {
             // Load local certificates
             this.connectionBuilder.SSL.ClientCertificates.AddRange(LoadClientCertificates());
@@ -242,7 +243,7 @@
                 throw new NMSSecurityException(string.Format("Invalid SSL Protocol {0} selected from system supported protocols {1}", this.SSLProtocol, PropertyUtil.ToString(SupportedProtocols)));
             }
             
-            return base.CreateAsync(address, open, onOpened);
+            return base.CreateAsync(address, handler);
         }
 
         #endregion
diff --git a/src/NMS.AMQP/Transport/TransportContext.cs b/src/NMS.AMQP/Transport/TransportContext.cs
index 5e1666f..229d8e7 100644
--- a/src/NMS.AMQP/Transport/TransportContext.cs
+++ b/src/NMS.AMQP/Transport/TransportContext.cs
@@ -20,6 +20,8 @@
 using System.Threading.Tasks;
 using Amqp;
 using Amqp.Framing;
+using Amqp.Handler;
+using Apache.NMS.AMQP.Provider.Amqp;
 using Apache.NMS.AMQP.Util;
 
 namespace Apache.NMS.AMQP.Transport
@@ -38,7 +40,7 @@
         internal TransportContext()
         {
             connectionBuilder = new Amqp.ConnectionFactory();
-            connectionBuilder.SASL.Profile = Amqp.Sasl.SaslProfile.Anonymous;
+            connectionBuilder.SASL.Profile = Amqp.Sasl.SaslProfile.Anonymous;            
         }
 
         static TransportContext()
@@ -156,9 +158,9 @@
             return copy;
         }
 
-        public virtual Task<Amqp.Connection> CreateAsync(Address address, Open open = null, OnOpened onOpened = null)
+        public virtual Task<Connection> CreateAsync(Address address, IHandler handler)
         {
-            return connectionBuilder.CreateAsync(address, open, onOpened);
+            return connectionBuilder.CreateAsync(address, handler);    
         }
 
         protected virtual void CopyInto(TransportContext copy)
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
index 110f65d..d34d529 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
@@ -419,9 +419,9 @@
                 producer.Send(message, MsgDeliveryMode.Persistent, (MsgPriority) priority, NMSConstants.defaultTimeToLive);
 
                 Assert.AreEqual((MsgPriority) priority, message.NMSPriority);
-                
+
                 connection.Close();
-                
+
                 testPeer.WaitForAllMatchersToComplete(1000);
             }
         }
@@ -438,7 +438,7 @@
                 ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
                 IQueue destination = session.GetQueue("myQueue");
                 IMessageProducer producer = session.CreateProducer(destination);
-                
+
                 string text = "myMessage";
                 string actualMessageId = null;
                 testPeer.ExpectTransfer(m =>
@@ -448,18 +448,18 @@
                     actualMessageId = m.Properties.MessageId;
                 });
                 testPeer.ExpectClose();
-                
+
                 ITextMessage message = session.CreateTextMessage(text);
                 Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set");
-                
+
                 producer.Send(message);
-                
+
                 Assert.IsNotNull(message.NMSMessageId);
                 Assert.IsNotEmpty(message.NMSMessageId, "NMSMessageId should be set");
                 Assert.IsTrue(message.NMSMessageId.StartsWith("ID:"), "MMS 'ID:' prefix not found");
-                
+
                 connection.Close();
-                
+
                 testPeer.WaitForAllMatchersToComplete(1000);
                 // Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally
                 Assert.AreEqual(message.NMSMessageId, actualMessageId, "Expected NMSMessageId value to be present in AMQP message");
@@ -489,7 +489,7 @@
                 ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
                 IQueue destination = session.GetQueue("myQueue");
                 IMessageProducer producer = session.CreateProducer(destination);
-                
+
                 string text = "myMessage";
                 testPeer.ExpectTransfer(m =>
                 {
@@ -498,11 +498,11 @@
                     Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
                 });
                 testPeer.ExpectClose();
-                
+
                 ITextMessage message = session.CreateTextMessage(text);
-                
+
                 Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set");
-                
+
                 if (existingId)
                 {
                     string existingMessageId = "ID:this-should-be-overwritten-in-send";
@@ -511,13 +511,13 @@
                 }
 
                 producer.DisableMessageID = true;
-                
+
                 producer.Send(message);
 
                 Assert.IsNull(message.NMSMessageId, "NMSMessageID should be null");
-                
+
                 connection.Close();
-                
+
                 testPeer.WaitForAllMatchersToComplete(2000);
             }
         }
@@ -526,7 +526,7 @@
         public void TestRemotelyCloseProducer()
         {
             string breadCrumb = "ErrorMessageBreadCrumb";
-            
+
             ManualResetEvent producerClosed = new ManualResetEvent(false);
             Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
             mockConnectionListener
@@ -537,23 +537,23 @@
             {
                 NmsConnection connection = (NmsConnection) EstablishConnection(testPeer);
                 connection.AddConnectionListener(mockConnectionListener.Object);
-                
+
                 testPeer.ExpectBegin();
                 ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-                
+
                 // Create a producer, then remotely end it afterwards.
                 testPeer.ExpectSenderAttach();
                 testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, breadCrumb);
-                
+
                 IQueue destination = session.GetQueue("myQueue");
                 IMessageProducer producer = session.CreateProducer(destination);
-                
+
                 // Verify the producer gets marked closed
                 testPeer.WaitForAllMatchersToComplete(1000);
-                
+
                 Assert.True(producerClosed.WaitOne(TimeSpan.FromMilliseconds(1000)), "Producer closed callback didn't trigger");
                 Assert.That(() => producer.DisableMessageID, Throws.Exception.InstanceOf<IllegalStateException>(), "Producer never closed");
-                
+
                 // Try closing it explicitly, should effectively no-op in client.
                 // The test peer will throw during close if it sends anything.
                 producer.Close();
@@ -567,23 +567,23 @@
             {
                 IConnection connection = EstablishConnection(testPeer, optionsString: "nms.sendTimeout=500");
                 testPeer.ExpectBegin();
-                
+
                 ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
                 IQueue queue = session.GetQueue("myQueue");
-                
+
                 ITextMessage message = session.CreateTextMessage("text");
-                
+
                 // Expect the producer to attach. Don't send any credit so that the client will
                 // block on a send and we can test our timeouts.
                 testPeer.ExpectSenderAttachWithoutGrantingCredit();
                 testPeer.ExpectClose();
-                
+
                 IMessageProducer producer = session.CreateProducer(queue);
-                
+
                 Assert.Catch<Exception>(() => producer.Send(message), "Send should time out.");
-                
+
                 connection.Close();
-                
+
                 testPeer.WaitForAllMatchersToComplete(1000);
             }
         }
@@ -600,23 +600,20 @@
                 IQueue queue = session.GetQueue("myQueue");
 
                 ITextMessage message = session.CreateTextMessage("text");
-                
+
                 // Expect the producer to attach and grant it some credit, it should send
                 // a transfer which we will not send any response for which should cause the
                 // send operation to time out.
                 testPeer.ExpectSenderAttach();
                 testPeer.ExpectTransferButDoNotRespond(messageMatcher: Assert.NotNull);
-                
-                // When send operation timed out, released and settled disposition is issued by the provider
-                testPeer.ExpectDispositionThatIsReleasedAndSettled();
                 testPeer.ExpectClose();
-                
+
                 IMessageProducer producer = session.CreateProducer(queue);
-                
+
                 Assert.Catch<Exception>(() => producer.Send(message), "Send should time out.");
-                
+
                 connection.Close();
-                
+
                 testPeer.WaitForAllMatchersToComplete(1000);
             }
         }
@@ -627,22 +624,22 @@
             using (TestAmqpPeer testPeer = new TestAmqpPeer())
             {
                 IConnection connection = EstablishConnection(testPeer);
-                
+
                 testPeer.ExpectBegin();
                 testPeer.ExpectSenderAttach();
-                
+
                 ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
                 IQueue destination = session.GetQueue("myQueue");
                 IMessageProducer producer = session.CreateProducer(destination);
-                
+
                 testPeer.ExpectTransfer(Assert.IsNotNull);
-                
-                
+
+
                 producer.Send(session.CreateMessage());
-                
+
                 testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
                 producer.Close();
-                
+
                 testPeer.WaitForAllMatchersToComplete(1000);
             }
         }
@@ -657,24 +654,92 @@
 
                 testPeer.ExpectBegin();
                 testPeer.ExpectSenderAttach();
-                
+
                 ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
                 IQueue destination = session.GetQueue("myQueue");
                 IMessageProducer producer = session.CreateProducer(destination);
-                
+
                 testPeer.ExpectTransfer(Assert.IsNotNull);
-                
+
                 connection.Stop();
-                
+
                 producer.Send(session.CreateMessage());
-                
+
                 testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
                 testPeer.ExpectClose();
-                
+
                 producer.Close();
                 connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+  
+        [Test, Timeout(20_000)]
+        public void TestSendingMessagePersistentSetsBatchableFalse()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue destination = session.GetQueue("myQueue");
+                IMessageProducer producer = session.CreateProducer(destination);
+                testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull,
+                    stateMatcher: Assert.IsNull,
+                    settled: false,
+                    sendResponseDisposition: true,
+                    responseState: new Accepted(),
+                    responseSettled: true,
+                    batchable: false);
+
+                IMessage message = session.CreateMessage();
+                producer.Send(message: message, deliveryMode: MsgDeliveryMode.Persistent, MsgPriority.Normal, NMSConstants.defaultTimeToLive);
                 
                 testPeer.WaitForAllMatchersToComplete(1000);
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageNonPersistentSetsBatchableFalse()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue destination = session.GetQueue("myQueue");
+                IMessageProducer producer = session.CreateProducer(destination);
+                testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull,
+                    stateMatcher: Assert.IsNull,
+                    settled: true,
+                    sendResponseDisposition: true,
+                    responseState: new Accepted(),
+                    responseSettled: true,
+                    batchable: false);
+
+                IMessage message = session.CreateMessage();
+                producer.Send(message: message, deliveryMode: MsgDeliveryMode.NonPersistent, MsgPriority.Normal, NMSConstants.defaultTimeToLive);
+                
+                testPeer.WaitForAllMatchersToComplete(1000);
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
             }
         }
     }
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 0b67c9c..9a519f2 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -675,12 +675,14 @@
             bool sendResponseDisposition,
             DeliveryState responseState,
             bool responseSettled,
-            int dispositionDelay = 0
+            int dispositionDelay = 0,
+            bool batchable = false
         )
         {
             var transferMatcher = new FrameMatcher<Transfer>()
                 .WithAssertion(transfer => Assert.AreEqual(settled, transfer.Settled))
                 .WithAssertion(transfer => stateMatcher(transfer.State))
+                .WithAssertion(transfer => Assert.AreEqual(batchable, transfer.Batchable))
                 .WithAssertion(messageMatcher);
 
             if (sendResponseDisposition)