Merge pull request #20 from Havret/set_batchable_flag_properly
AMQNET-602: Set batchable flag to false for amqp transfer
diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index 5f67b68..dfb16a4 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -49,8 +49,8 @@
</PropertyGroup>
<ItemGroup>
- <None Include="..\..\LICENSE.txt" Pack="true" PackagePath="LICENSE.txt" />
- <None Include="..\..\NOTICE.txt" Pack="true" PackagePath="NOTICE.txt" />
+ <None Include="..\..\LICENSE.txt" Pack="true" PackagePath="LICENSE.txt"/>
+ <None Include="..\..\NOTICE.txt" Pack="true" PackagePath="NOTICE.txt"/>
</ItemGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index 0ca5b55..38f16d4 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -44,9 +44,10 @@
public AmqpProvider Provider { get; }
private readonly ITransportContext transport;
private readonly Uri remoteUri;
- private global::Amqp.Connection underlyingConnection;
+ private Connection underlyingConnection;
private readonly AmqpMessageFactory messageFactory;
private AmqpConnectionSession connectionSession;
+ private TaskCompletionSource<bool> tsc;
public AmqpConnection(AmqpProvider provider, ITransportContext transport, ConnectionInfo info)
{
@@ -68,39 +69,39 @@
internal async Task Start()
{
Address address = UriUtil.ToAddress(remoteUri, Info.username, Info.password);
- underlyingConnection = await transport.CreateAsync(address, CreateOpenFrame(Info), OnOpened);
+ this.tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
+ underlyingConnection = await transport.CreateAsync(address, new AmqpHandler(this)).ConfigureAwait(false);
underlyingConnection.AddClosedCallback(Provider.OnInternalClosed);
+
+ // Wait for connection to be opened
+ await tsc.Task;
// Create a Session for this connection that is used for Temporary Destinations
// and perhaps later on management and advisory monitoring.
-
// TODO: change the way how connection session id is obtained
SessionInfo sessionInfo = new SessionInfo(Info.Id);
sessionInfo.AcknowledgementMode = AcknowledgementMode.AutoAcknowledge;
connectionSession = new AmqpConnectionSession(this, sessionInfo);
- await connectionSession.Start();
+ await connectionSession.Start().ConfigureAwait(false);
}
- private Open CreateOpenFrame(ConnectionInfo connInfo)
+ internal void OnLocalOpen(Open open)
{
- return new Open
+ open.ContainerId = Info.ClientId;
+ open.ChannelMax = Info.channelMax;
+ open.MaxFrameSize = Convert.ToUInt32(Info.maxFrameSize);
+ open.HostName = remoteUri.Host;
+ open.IdleTimeOut = Convert.ToUInt32(Info.idleTimout);
+ open.DesiredCapabilities = new[]
{
- ContainerId = connInfo.ClientId,
- ChannelMax = connInfo.channelMax,
- MaxFrameSize = Convert.ToUInt32(connInfo.maxFrameSize),
- HostName = remoteUri.Host,
- IdleTimeOut = Convert.ToUInt32(connInfo.idleTimout),
- DesiredCapabilities = new[]
- {
- SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
- SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY,
- SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY
- }
+ SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
+ SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY,
+ SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY
};
}
- private void OnOpened(global::Amqp.IConnection connection, Open open)
+ internal void OnRemoteOpened(Open open)
{
if (SymbolUtil.CheckAndCompareFields(open.Properties, SymbolUtil.CONNECTION_ESTABLISH_FAILED, SymbolUtil.BOOLEAN_TRUE))
{
@@ -121,6 +122,7 @@
Info.QueuePrefix = queuePrefix;
}
+ this.tsc.SetResult(true);
Provider.FireConnectionEstablished();
}
}
@@ -172,7 +174,7 @@
{
return temporaryDestinations.TryGetValue(destination.Id, out AmqpTemporaryDestination amqpTemporaryDestination) ? amqpTemporaryDestination : null;
}
-
+
public void RemoveTemporaryDestination(Id destinationId)
{
temporaryDestinations.TryRemove(destinationId, out _);
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpHandler.cs b/src/NMS.AMQP/Provider/Amqp/AmqpHandler.cs
new file mode 100644
index 0000000..6841d78
--- /dev/null
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpHandler.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Amqp.Framing;
+using Amqp.Handler;
+
+namespace Apache.NMS.AMQP.Provider.Amqp
+{
+ internal class AmqpHandler : IHandler
+ {
+ private readonly AmqpConnection connection;
+
+ public AmqpHandler(AmqpConnection connection)
+ {
+ this.connection = connection;
+ }
+
+ public bool CanHandle(EventId id)
+ {
+ switch (id)
+ {
+ case EventId.SendDelivery:
+ case EventId.ConnectionRemoteOpen:
+ case EventId.ConnectionLocalOpen:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ public void Handle(Event protocolEvent)
+ {
+ switch (protocolEvent.Id)
+ {
+ case EventId.SendDelivery when protocolEvent.Context is IDelivery delivery:
+ delivery.Batchable = false;
+ break;
+ case EventId.ConnectionRemoteOpen when protocolEvent.Context is Open open:
+ this.connection.OnRemoteOpened(open);
+ break;
+ case EventId.ConnectionLocalOpen when protocolEvent.Context is Open open:
+ this.connection.OnLocalOpen(open);
+ break;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index 9fcaa6e..d767577 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -121,13 +121,7 @@
if (envelope.SendAsync)
SendAsync(message, transactionalState);
else
- {
- // TODO: Should be unified after https://github.com/Azure/amqpnetlite/pull/374 is sorted out
- if (transactionalState != null)
- SendSync(message, transactionalState);
- else
- senderLink.Send(message, TimeSpan.FromMilliseconds(session.Connection.Provider.SendTimeout));
- }
+ SendSync(message, transactionalState);
}
catch (TimeoutException tex)
{
diff --git a/src/NMS.AMQP/Transport/ITransportContext.cs b/src/NMS.AMQP/Transport/ITransportContext.cs
index d3132a9..ad225be 100644
--- a/src/NMS.AMQP/Transport/ITransportContext.cs
+++ b/src/NMS.AMQP/Transport/ITransportContext.cs
@@ -18,6 +18,7 @@
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
+using Amqp.Handler;
namespace Apache.NMS.AMQP.Transport
{
@@ -37,6 +38,6 @@
ITransportContext Copy();
- Task<Amqp.Connection> CreateAsync(Address address, Open open = null, OnOpened onOpened = null);
+ Task<Connection> CreateAsync(Address address, IHandler handler);
}
}
diff --git a/src/NMS.AMQP/Transport/SecureTransportContext.cs b/src/NMS.AMQP/Transport/SecureTransportContext.cs
index a4a3eb8..cc3913c 100644
--- a/src/NMS.AMQP/Transport/SecureTransportContext.cs
+++ b/src/NMS.AMQP/Transport/SecureTransportContext.cs
@@ -24,6 +24,7 @@
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
+using Amqp.Handler;
using Apache.NMS.AMQP.Util;
namespace Apache.NMS.AMQP.Transport
@@ -228,7 +229,7 @@
#region IProviderSecureTransportContext Methods
- public override Task<Amqp.Connection> CreateAsync(Address address, Open open = null, OnOpened onOpened = null)
+ public override Task<Amqp.Connection> CreateAsync(Address address, IHandler handler)
{
// Load local certificates
this.connectionBuilder.SSL.ClientCertificates.AddRange(LoadClientCertificates());
@@ -242,7 +243,7 @@
throw new NMSSecurityException(string.Format("Invalid SSL Protocol {0} selected from system supported protocols {1}", this.SSLProtocol, PropertyUtil.ToString(SupportedProtocols)));
}
- return base.CreateAsync(address, open, onOpened);
+ return base.CreateAsync(address, handler);
}
#endregion
diff --git a/src/NMS.AMQP/Transport/TransportContext.cs b/src/NMS.AMQP/Transport/TransportContext.cs
index 5e1666f..229d8e7 100644
--- a/src/NMS.AMQP/Transport/TransportContext.cs
+++ b/src/NMS.AMQP/Transport/TransportContext.cs
@@ -20,6 +20,8 @@
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
+using Amqp.Handler;
+using Apache.NMS.AMQP.Provider.Amqp;
using Apache.NMS.AMQP.Util;
namespace Apache.NMS.AMQP.Transport
@@ -38,7 +40,7 @@
internal TransportContext()
{
connectionBuilder = new Amqp.ConnectionFactory();
- connectionBuilder.SASL.Profile = Amqp.Sasl.SaslProfile.Anonymous;
+ connectionBuilder.SASL.Profile = Amqp.Sasl.SaslProfile.Anonymous;
}
static TransportContext()
@@ -156,9 +158,9 @@
return copy;
}
- public virtual Task<Amqp.Connection> CreateAsync(Address address, Open open = null, OnOpened onOpened = null)
+ public virtual Task<Connection> CreateAsync(Address address, IHandler handler)
{
- return connectionBuilder.CreateAsync(address, open, onOpened);
+ return connectionBuilder.CreateAsync(address, handler);
}
protected virtual void CopyInto(TransportContext copy)
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
index 110f65d..d34d529 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
@@ -419,9 +419,9 @@
producer.Send(message, MsgDeliveryMode.Persistent, (MsgPriority) priority, NMSConstants.defaultTimeToLive);
Assert.AreEqual((MsgPriority) priority, message.NMSPriority);
-
+
connection.Close();
-
+
testPeer.WaitForAllMatchersToComplete(1000);
}
}
@@ -438,7 +438,7 @@
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
IQueue destination = session.GetQueue("myQueue");
IMessageProducer producer = session.CreateProducer(destination);
-
+
string text = "myMessage";
string actualMessageId = null;
testPeer.ExpectTransfer(m =>
@@ -448,18 +448,18 @@
actualMessageId = m.Properties.MessageId;
});
testPeer.ExpectClose();
-
+
ITextMessage message = session.CreateTextMessage(text);
Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set");
-
+
producer.Send(message);
-
+
Assert.IsNotNull(message.NMSMessageId);
Assert.IsNotEmpty(message.NMSMessageId, "NMSMessageId should be set");
Assert.IsTrue(message.NMSMessageId.StartsWith("ID:"), "MMS 'ID:' prefix not found");
-
+
connection.Close();
-
+
testPeer.WaitForAllMatchersToComplete(1000);
// Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally
Assert.AreEqual(message.NMSMessageId, actualMessageId, "Expected NMSMessageId value to be present in AMQP message");
@@ -489,7 +489,7 @@
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
IQueue destination = session.GetQueue("myQueue");
IMessageProducer producer = session.CreateProducer(destination);
-
+
string text = "myMessage";
testPeer.ExpectTransfer(m =>
{
@@ -498,11 +498,11 @@
Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
});
testPeer.ExpectClose();
-
+
ITextMessage message = session.CreateTextMessage(text);
-
+
Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set");
-
+
if (existingId)
{
string existingMessageId = "ID:this-should-be-overwritten-in-send";
@@ -511,13 +511,13 @@
}
producer.DisableMessageID = true;
-
+
producer.Send(message);
Assert.IsNull(message.NMSMessageId, "NMSMessageID should be null");
-
+
connection.Close();
-
+
testPeer.WaitForAllMatchersToComplete(2000);
}
}
@@ -526,7 +526,7 @@
public void TestRemotelyCloseProducer()
{
string breadCrumb = "ErrorMessageBreadCrumb";
-
+
ManualResetEvent producerClosed = new ManualResetEvent(false);
Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
mockConnectionListener
@@ -537,23 +537,23 @@
{
NmsConnection connection = (NmsConnection) EstablishConnection(testPeer);
connection.AddConnectionListener(mockConnectionListener.Object);
-
+
testPeer.ExpectBegin();
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-
+
// Create a producer, then remotely end it afterwards.
testPeer.ExpectSenderAttach();
testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, breadCrumb);
-
+
IQueue destination = session.GetQueue("myQueue");
IMessageProducer producer = session.CreateProducer(destination);
-
+
// Verify the producer gets marked closed
testPeer.WaitForAllMatchersToComplete(1000);
-
+
Assert.True(producerClosed.WaitOne(TimeSpan.FromMilliseconds(1000)), "Producer closed callback didn't trigger");
Assert.That(() => producer.DisableMessageID, Throws.Exception.InstanceOf<IllegalStateException>(), "Producer never closed");
-
+
// Try closing it explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
producer.Close();
@@ -567,23 +567,23 @@
{
IConnection connection = EstablishConnection(testPeer, optionsString: "nms.sendTimeout=500");
testPeer.ExpectBegin();
-
+
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
IQueue queue = session.GetQueue("myQueue");
-
+
ITextMessage message = session.CreateTextMessage("text");
-
+
// Expect the producer to attach. Don't send any credit so that the client will
// block on a send and we can test our timeouts.
testPeer.ExpectSenderAttachWithoutGrantingCredit();
testPeer.ExpectClose();
-
+
IMessageProducer producer = session.CreateProducer(queue);
-
+
Assert.Catch<Exception>(() => producer.Send(message), "Send should time out.");
-
+
connection.Close();
-
+
testPeer.WaitForAllMatchersToComplete(1000);
}
}
@@ -600,23 +600,20 @@
IQueue queue = session.GetQueue("myQueue");
ITextMessage message = session.CreateTextMessage("text");
-
+
// Expect the producer to attach and grant it some credit, it should send
// a transfer which we will not send any response for which should cause the
// send operation to time out.
testPeer.ExpectSenderAttach();
testPeer.ExpectTransferButDoNotRespond(messageMatcher: Assert.NotNull);
-
- // When send operation timed out, released and settled disposition is issued by the provider
- testPeer.ExpectDispositionThatIsReleasedAndSettled();
testPeer.ExpectClose();
-
+
IMessageProducer producer = session.CreateProducer(queue);
-
+
Assert.Catch<Exception>(() => producer.Send(message), "Send should time out.");
-
+
connection.Close();
-
+
testPeer.WaitForAllMatchersToComplete(1000);
}
}
@@ -627,22 +624,22 @@
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
-
+
testPeer.ExpectBegin();
testPeer.ExpectSenderAttach();
-
+
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
IQueue destination = session.GetQueue("myQueue");
IMessageProducer producer = session.CreateProducer(destination);
-
+
testPeer.ExpectTransfer(Assert.IsNotNull);
-
-
+
+
producer.Send(session.CreateMessage());
-
+
testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
producer.Close();
-
+
testPeer.WaitForAllMatchersToComplete(1000);
}
}
@@ -657,24 +654,92 @@
testPeer.ExpectBegin();
testPeer.ExpectSenderAttach();
-
+
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
IQueue destination = session.GetQueue("myQueue");
IMessageProducer producer = session.CreateProducer(destination);
-
+
testPeer.ExpectTransfer(Assert.IsNotNull);
-
+
connection.Stop();
-
+
producer.Send(session.CreateMessage());
-
+
testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
testPeer.ExpectClose();
-
+
producer.Close();
connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendingMessagePersistentSetsBatchableFalse()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue destination = session.GetQueue("myQueue");
+ IMessageProducer producer = session.CreateProducer(destination);
+ testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull,
+ stateMatcher: Assert.IsNull,
+ settled: false,
+ sendResponseDisposition: true,
+ responseState: new Accepted(),
+ responseSettled: true,
+ batchable: false);
+
+ IMessage message = session.CreateMessage();
+ producer.Send(message: message, deliveryMode: MsgDeliveryMode.Persistent, MsgPriority.Normal, NMSConstants.defaultTimeToLive);
testPeer.WaitForAllMatchersToComplete(1000);
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendingMessageNonPersistentSetsBatchableFalse()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue destination = session.GetQueue("myQueue");
+ IMessageProducer producer = session.CreateProducer(destination);
+ testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull,
+ stateMatcher: Assert.IsNull,
+ settled: true,
+ sendResponseDisposition: true,
+ responseState: new Accepted(),
+ responseSettled: true,
+ batchable: false);
+
+ IMessage message = session.CreateMessage();
+ producer.Send(message: message, deliveryMode: MsgDeliveryMode.NonPersistent, MsgPriority.Normal, NMSConstants.defaultTimeToLive);
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
}
}
}
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 0b67c9c..9a519f2 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -675,12 +675,14 @@
bool sendResponseDisposition,
DeliveryState responseState,
bool responseSettled,
- int dispositionDelay = 0
+ int dispositionDelay = 0,
+ bool batchable = false
)
{
var transferMatcher = new FrameMatcher<Transfer>()
.WithAssertion(transfer => Assert.AreEqual(settled, transfer.Settled))
.WithAssertion(transfer => stateMatcher(transfer.State))
+ .WithAssertion(transfer => Assert.AreEqual(batchable, transfer.Batchable))
.WithAssertion(messageMatcher);
if (sendResponseDisposition)