Merge pull request #3 from lukeabsent/AMQNET-637
Mostly work on tests and some fixes
diff --git a/src/NMS.AMQP/NmsMessageProducer.cs b/src/NMS.AMQP/NmsMessageProducer.cs
index 6257177..3f7e3db 100644
--- a/src/NMS.AMQP/NmsMessageProducer.cs
+++ b/src/NMS.AMQP/NmsMessageProducer.cs
@@ -263,6 +263,11 @@
}
set
{
+ if (!session.Connection.ConnectionInfo.DelayedDeliverySupported)
+ {
+ throw new NotSupportedException("Delayed Delivery is not supported");
+ }
+
CheckClosed();
deliveryDelay = value;
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index 1e86ced..6b40430 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -147,9 +147,9 @@
}
if (Array.Exists(capabilities,
- symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY)))
+ symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS)))
{
- Info.DelayedDeliverySupported = true;
+ Info.SharedSubsSupported = true;
}
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index 7670d2f..d4ceeac 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -130,7 +130,7 @@
bool supported = false;
if (remoteOfferedCapabilities != null)
{
- if (Array.Exists(remoteOfferedCapabilities, symbol => symbol == SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS))
+ if (Array.Exists(remoteOfferedCapabilities, symbol => SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS.Equals(symbol)))
{
supported = true;
}
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
index d5332f1..4799ada 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
@@ -269,15 +269,16 @@
case ulong _:
case int _:
case uint _:
- return new DateTime(621355968000000000L + (long) deliveryTime * 10000L, DateTimeKind.Utc);
+ return new DateTime(621355968000000000L + Convert.ToInt64(deliveryTime) * 10000L, DateTimeKind.Utc);
default:
return syntheticDeliveryTime;
}
}
set
{
+ // Assumption that if it is being set through property, then it is with purpose of send out this value
syntheticDeliveryTime = value;
- RemoveMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME);
+ SetMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME, new DateTimeOffset(value).ToUnixTimeMilliseconds());
}
}
@@ -488,6 +489,7 @@
target.connection = connection;
target.consumerDestination = consumerDestination;
target.syntheticExpiration = syntheticExpiration;
+ target.syntheticDeliveryTime = syntheticDeliveryTime;
target.amqpTimeToLiveOverride = amqpTimeToLiveOverride;
target.destination = destination;
target.replyTo = replyTo;
@@ -508,7 +510,7 @@
return MessageAnnotations != null && MessageAnnotations.Map.ContainsKey(annotationName);
}
- public void SetMessageAnnotation(Symbol symbolKeyName, string value)
+ public void SetMessageAnnotation(Symbol symbolKeyName, object value)
{
LazyCreateMessageAnnotations();
MessageAnnotations.Map.Add(symbolKeyName, value);
diff --git a/src/NMS.AMQP/Util/SymbolUtil.cs b/src/NMS.AMQP/Util/SymbolUtil.cs
index bb64387..e3fe2c3 100644
--- a/src/NMS.AMQP/Util/SymbolUtil.cs
+++ b/src/NMS.AMQP/Util/SymbolUtil.cs
@@ -39,7 +39,7 @@
public static readonly Symbol OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER = new Symbol("sole-connection-for-container");
public static readonly Symbol OPEN_CAPABILITY_ANONYMOUS_RELAY = new Symbol("ANONYMOUS-RELAY");
public static readonly Symbol OPEN_CAPABILITY_DELAYED_DELIVERY = new Symbol("DELAYED_DELIVERY");
- public static readonly Symbol OPEN_CAPABILITY_SHARED_SUBS = new Symbol("SHARED_SUBS");
+ public static readonly Symbol OPEN_CAPABILITY_SHARED_SUBS = new Symbol("SHARED-SUBS");
// Attach Frame
public readonly static Symbol ATTACH_EXPIRY_POLICY_LINK_DETACH = new Symbol("link-detach");
diff --git a/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs b/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
index 2155d86..bf309c4 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
@@ -18,6 +18,7 @@
using System;
using Apache.NMS;
using Apache.NMS.AMQP;
+using NMS.AMQP.Test.TestAmqp;
using NUnit.Framework;
namespace NMS.AMQP.Test
@@ -30,19 +31,32 @@
protected string TestName => TestContext.CurrentContext.Test.Name;
+ static AmqpTestSupport()
+ {
+ Tracer.Trace = new NLogAdapter();
+ }
+
[TearDown]
public void TearDown()
{
Connection?.Close();
}
- protected IConnection CreateAmqpConnection()
+ protected IConnection CreateAmqpConnectionStarted(string clientId = null)
+ {
+ var connection = CreateAmqpConnection(clientId);
+ connection.Start();
+ return connection;
+ }
+
+ protected IConnection CreateAmqpConnection(string clientId = null)
{
string brokerUri = Environment.GetEnvironmentVariable("NMS_AMQP_TEST_URI") ?? "amqp://127.0.0.1:5672";
string userName = Environment.GetEnvironmentVariable("NMS_AMQP_TEST_CU") ?? "admin";
string password = Environment.GetEnvironmentVariable("NMS_AMQP_TEST_CPWD") ?? "admin";
NmsConnectionFactory factory = new NmsConnectionFactory(brokerUri);
+ factory.ClientId = clientId;
return factory.CreateConnection(userName, password);
}
@@ -108,11 +122,7 @@
IQueue queue = session.GetQueue(TestName);
IMessageConsumer consumer = session.CreateConsumer(queue);
- IMessage message;
- do
- {
- message = consumer.Receive(timeout);
- } while (message != null);
+ PurgeConsumer(consumer, timeout);
amqpConnection.Close();
}
@@ -125,13 +135,18 @@
ITopic queue = session.GetTopic(TestName);
IMessageConsumer consumer = session.CreateConsumer(queue);
+ PurgeConsumer(consumer, timeout);
+
+ amqpConnection.Close();
+ }
+
+ protected void PurgeConsumer(IMessageConsumer consumer, TimeSpan timeout)
+ {
IMessage message;
do
{
message = consumer.Receive(timeout);
} while (message != null);
-
- amqpConnection.Close();
}
}
}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj b/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj
index f7ce890..e167532 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj
+++ b/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj
@@ -16,11 +16,11 @@
-->
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
- <TargetFrameworks>net462;netcoreapp2.2</TargetFrameworks>
+ <TargetFrameworks>net462;netcoreapp3.1</TargetFrameworks>
<TargetFramework Condition="'$(AppTargetFramework)' != ''">$(AppTargetFramework)</TargetFramework>
<RootNamespace>NMS.AMQP.Test</RootNamespace>
<AssemblyName>NMS.AMQP.Interop.Test</AssemblyName>
- <LangVersion>7.3</LangVersion>
+ <LangVersion>8</LangVersion>
</PropertyGroup>
<ItemGroup>
@@ -33,5 +33,12 @@
<ItemGroup>
<ProjectReference Include="..\..\src\NMS.AMQP\Apache-NMS-AMQP.csproj" />
+ <ProjectReference Include="..\Apache-NMS-AMQP-Test\Apache-NMS-AMQP-Test.csproj" />
+ </ItemGroup>
+
+ <ItemGroup>
+ <None Update="NLog.config">
+ <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+ </None>
</ItemGroup>
</Project>
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NLog.config b/test/Apache-NMS-AMQP-Interop-Test/NLog.config
new file mode 100644
index 0000000..c0f1581
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Interop-Test/NLog.config
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+ <targets>
+ <target name="logconsole" xsi:type="Console" layout="${time} | ${level} | ${callsite:includeNamespace=false:methodName=false} | ${message}" />
+ </targets>
+
+ <rules>
+ <logger name="*" minlevel="Debug" writeTo="logconsole" />
+ </rules>
+</nlog>
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
index 4529783..9f174d4 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
@@ -16,6 +16,11 @@
*/
using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
using Apache.NMS;
using NUnit.Framework;
@@ -84,6 +89,223 @@
Assert.IsNull(messageConsumer.Receive(TimeSpan.FromSeconds(1)));
}
+
+ [Test, Timeout(60_000)]
+ public void TestDurableSubscription()
+ {
+ Connection = CreateAmqpConnection();
+ Connection.Start();
+
+ int counter = 0;
+
+
+ using ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+
+ string subscriptionName = "mySubscriptionName";
+ ITopic topicProducer = sessionProducer.GetTopic(TestName);
+ using IMessageProducer producer = sessionProducer.CreateProducer(topicProducer);
+
+ // First durable consumer, reads message but does not unsubscribe
+ using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
+ using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ using ITopic topic = session.GetTopic(TestName);
+ using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+ {
+ // Purge topic
+ PurgeConsumer(messageConsumer, TimeSpan.FromSeconds(0.5));
+
+ ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
+ producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+
+ var message = messageConsumer.Receive();
+ Assert.AreEqual("text0", message.Body<string>());
+ }
+ }
+
+ // Write some more messages while subscription is closed
+ for (int t = 0; t < 3; t++)
+ {
+ ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
+ producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+ }
+
+ // Second durable consumer, reads message that were send during no-subscription and unsubscribe
+ using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
+ using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ using ITopic topic = session.GetTopic(TestName);
+ using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+ {
+ for (int t = 1; t <= 3; t++)
+ {
+ var message = messageConsumer.Receive();
+ Assert.AreEqual("text" + t, message.Body<string>());
+ }
+
+ // Assert topic is empty after those msgs
+ var msgAtTheEnd = messageConsumer.Receive(TimeSpan.FromSeconds(1));
+ Assert.IsNull(msgAtTheEnd);
+
+ Assert.Throws<IllegalStateException>(() => session.Unsubscribe(subscriptionName)); // Error unsubscribing while consumer is on
+ }
+
+ session.Unsubscribe(subscriptionName);
+ }
+
+
+ // Send some messages again to verify we will not get them when create durable subscription
+ for (int t = 0; t < 3; t++)
+ {
+ ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
+ producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+ }
+
+ // Third durable subscriber, expect NOT to read messages during no-subscription period
+ using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
+ using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ using ITopic topic = session.GetTopic(TestName);
+ using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+ {
+ // Assert topic is empty
+ var msgAtTheEnd = messageConsumer.Receive(TimeSpan.FromSeconds(1));
+ Assert.IsNull(msgAtTheEnd);
+ }
+
+ // And unsubscribe again
+ session.Unsubscribe(subscriptionName);
+ }
+ }
+
+
+ [Test, Timeout(60_000)]
+ public void TestSharedSubscription()
+ {
+ IMessageConsumer GetConsumer(string subscriptionName, String clientId)
+ {
+ var connection = CreateAmqpConnectionStarted(clientId);
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ var topic = session.GetTopic(TestName);
+ var messageConsumer = session.CreateSharedConsumer(topic, subscriptionName);
+ return messageConsumer;
+ }
+
+ Connection = CreateAmqpConnection();
+ Connection.Start();
+
+ string subscriptionName = "mySubscriptionName";
+
+
+ var receivedMessages = new List<int>();
+
+ var messageConsumer1 = GetConsumer(subscriptionName, null);
+ var messageConsumer2 = GetConsumer(subscriptionName, null);
+ messageConsumer1.Listener += (msg) =>
+ {
+ receivedMessages.Add(1);
+ msg.Acknowledge();
+ };
+ messageConsumer2.Listener += (msg) =>
+ {
+ receivedMessages.Add(2);
+ msg.Acknowledge();
+ };
+
+ // Now send some messages
+ using ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ ITopic topicProducer = sessionProducer.GetTopic(TestName);
+ using IMessageProducer producer = sessionProducer.CreateProducer(topicProducer);
+ for (int t = 0; t < 10; t++)
+ {
+ ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + t);
+ producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+ }
+
+ // Give it some time to process
+ Thread.Sleep(TimeSpan.FromSeconds(2));
+
+ // Assert message was routed to multiple consumers
+ Assert.AreEqual(2, receivedMessages.Distinct().Count());
+ Assert.AreEqual(10, receivedMessages.Count);
+ }
+
+ [Test, Timeout(60_000)]
+ public void TestSharedDurableSubscription()
+ {
+ (IMessageConsumer,ISession,IConnection) GetConsumer(string subscriptionName, String clientId)
+ {
+ var connection = CreateAmqpConnection(clientId);
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ var topic = session.GetTopic(TestName);
+ var messageConsumer = session.CreateSharedDurableConsumer(topic, subscriptionName);
+ return (messageConsumer, session, connection);
+ }
+
+ Connection = CreateAmqpConnection();
+ Connection.Start();
+
+ string subscriptionName = "mySubscriptionName";
+ int messageSendCount = 1099;
+
+ var receivedMessages = new ConcurrentBag<int>();
+
+
+ IConnection connectionConsumer1, connectionConsumer2;
+ IMessageConsumer messageConsumer1, messageConsumer2;
+
+ (messageConsumer1, _, connectionConsumer1) = GetConsumer(subscriptionName, null);
+ (messageConsumer2, _, connectionConsumer2) = GetConsumer(subscriptionName, null);
+ connectionConsumer1.Start();
+ connectionConsumer2.Start();
+
+ messageConsumer1.Close();
+ messageConsumer2.Close();
+ connectionConsumer1.Close();
+ connectionConsumer2.Close();
+
+ // Now send some messages
+ using ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ ITopic topicProducer = sessionProducer.GetTopic(TestName);
+ using IMessageProducer producer = sessionProducer.CreateProducer(topicProducer);
+ for (int t = 0; t < messageSendCount; t++)
+ {
+ ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + t);
+ producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+ }
+
+ // Create consumers again and expect messages to be delivered to them
+ ISession sessionConsumer1, sessionConsumer2;
+ (messageConsumer1, sessionConsumer1, connectionConsumer1) = GetConsumer(subscriptionName, null);
+ (messageConsumer2, sessionConsumer2, connectionConsumer2) = GetConsumer(subscriptionName, null);
+ messageConsumer1.Listener += (msg) =>
+ {
+ receivedMessages.Add(1);
+ msg.Acknowledge();
+ };
+ messageConsumer2.Listener += (msg) =>
+ {
+ receivedMessages.Add(2);
+ msg.Acknowledge();
+ };
+ Task.Run(() => connectionConsumer1.Start()); // parallel to give both consumers chance to start at the same time
+ Task.Run(() => connectionConsumer2.Start());
+
+ // Give it some time to process
+ Thread.Sleep(TimeSpan.FromSeconds(5));
+
+ // Assert message was routed to multiple consumers
+ Assert.AreEqual(2, receivedMessages.Distinct().Count());
+ Assert.AreEqual(messageSendCount, receivedMessages.Count);
+
+ messageConsumer1.Close();
+ messageConsumer2.Close();
+ sessionConsumer1.Unsubscribe(subscriptionName);
+ sessionConsumer2.Unsubscribe(subscriptionName);
+
+ }
+
+
[Test, Timeout(60_000)]
public void TestSelectNoLocal()
{
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs
new file mode 100644
index 0000000..5bd182d
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.NMS;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test
+{
+ [TestFixture]
+ public class NmsMessageProducerTest : AmqpTestSupport
+ {
+ [Test, Timeout(60_000)]
+ public void TestDeliveryDelay()
+ {
+ PurgeQueue(TimeSpan.FromMilliseconds(500));
+
+ var deliveryDelay = TimeSpan.FromSeconds(7);
+
+ Connection = CreateAmqpConnection();
+ Connection.Start();
+
+ ISession session = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue(TestName);
+ IMessageProducer producer = session.CreateProducer(queue);
+ producer.DeliveryDelay = deliveryDelay;
+
+ DateTime? receivingTime = null;
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ var receivingTask = Task.Run(() =>
+ {
+ while (true)
+ {
+ var message = consumer.Receive(TimeSpan.FromMilliseconds(100));
+ if (message != null && message.Body<string>() == "Hello")
+ {
+ receivingTime = DateTime.Now;
+ return;
+ }
+ }
+ });
+
+
+ DateTime sendTime = DateTime.Now;
+ ITextMessage message = session.CreateTextMessage("Hello");
+ producer.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+
+ // Wait that delivery delay
+ Thread.Sleep(deliveryDelay);
+
+ receivingTask.Wait(TimeSpan.FromSeconds(20)); // make sure its done
+
+ var measuredDelay = (receivingTime.Value - sendTime);
+
+ Assert.Greater(measuredDelay.TotalMilliseconds, deliveryDelay.TotalMilliseconds* 0.5);
+ Assert.Less(measuredDelay.TotalMilliseconds, deliveryDelay.TotalMilliseconds*1.5);
+ }
+
+
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj b/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj
index 68c6937..2de2f86 100644
--- a/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj
+++ b/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj
@@ -16,7 +16,7 @@
-->
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
- <TargetFrameworks>net462;netcoreapp2.2</TargetFrameworks>
+ <TargetFrameworks>net462;netcoreapp3.1</TargetFrameworks>
<TargetFramework Condition="'$(AppTargetFramework)' != ''">$(AppTargetFramework)</TargetFramework>
<RootNamespace>NMS.AMQP.Test</RootNamespace>
<AssemblyName>NMS.AMQP.Test</AssemblyName>
diff --git a/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs b/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs
index 9a13133..fac9499 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs
@@ -50,6 +50,26 @@
return connection;
}
+ protected INMSContext EstablishNMSContext(TestAmqpPeer testPeer, string optionsString = null, Symbol[] serverCapabilities = null, Fields serverProperties = null, bool setClientId = true, AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge)
+ {
+ testPeer.ExpectSaslPlain("guest", "guest");
+ testPeer.ExpectOpen(serverCapabilities: serverCapabilities, serverProperties: serverProperties);
+
+ // Each connection creates a session for managing temporary destinations etc.
+ testPeer.ExpectBegin();
+
+ var remoteUri = BuildUri(testPeer, optionsString);
+ var connectionFactory = new NmsConnectionFactory(remoteUri);
+ var context = connectionFactory.CreateContext("guest", "guest", acknowledgementMode);
+ if (setClientId)
+ {
+ // Set a clientId to provoke the actual AMQP connection process to occur.
+ context.ClientId = "ClientName";
+ }
+
+ return context;
+ }
+
private static string BuildUri(TestAmqpPeer testPeer, string optionsString)
{
string baseUri = "amqp://127.0.0.1:" + testPeer.ServerPort.ToString();
diff --git a/test/Apache-NMS-AMQP-Test/Integration/MessageDeliveryTimeTest.cs b/test/Apache-NMS-AMQP-Test/Integration/MessageDeliveryTimeTest.cs
new file mode 100644
index 0000000..dbe936a
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/MessageDeliveryTimeTest.cs
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Amqp.Framing;
+using Amqp.Types;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+ [TestFixture]
+ public class MessageDeliveryTimeTest : IntegrationTestFixture
+ {
+ [Test, Timeout(20000)]
+ public void TestReceiveMessageWithoutDeliveryTimeSet()
+ {
+ DoReceiveMessageDeliveryTime(null, null);
+ }
+
+ [Test, Timeout(20000)]
+ public void TestDeliveryTimeIsDateTime()
+ {
+ DateTime deliveryTime = DateTimeOffset.FromUnixTimeMilliseconds(CurrentTimeInMillis() + 12345).DateTime.ToUniversalTime();
+ DoReceiveMessageDeliveryTime(deliveryTime, deliveryTime);
+ }
+
+ [Test, Timeout(20000)]
+ public void TestDeliveryTimeIsULong()
+ {
+ ulong deliveryTime = (ulong) (CurrentTimeInMillis() + 12345);
+ DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds((long) deliveryTime).DateTime);
+ }
+
+ [Test, Timeout(20000)]
+ public void TestDeliveryTimeIsLong()
+ {
+ long deliveryTime = (CurrentTimeInMillis() + 12345);
+ DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds(deliveryTime).DateTime);
+ }
+
+ [Test, Timeout(20000)]
+ public void TestDeliveryTimeIsInt()
+ {
+ int deliveryTime = (int) (CurrentTimeInMillis() + 12345);
+ DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds(deliveryTime).DateTime);
+ }
+
+ [Test, Timeout(20000)]
+ public void TestDeliveryTimeIsUInt()
+ {
+ uint deliveryTime = (uint) (CurrentTimeInMillis() + 12345);
+ DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds(deliveryTime).DateTime);
+ }
+
+ private long CurrentTimeInMillis()
+ {
+ return new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
+ }
+
+ private void DoReceiveMessageDeliveryTime(object setDeliveryTimeAnnotation, DateTime? expectedDeliveryTime)
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var connection = EstablishConnection(testPeer, "amqp.traceFrames=true");
+ connection.Start();
+ testPeer.ExpectBegin();
+ var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ var queue = session.GetQueue("myQueue");
+
+ var message = CreateMessageWithNullContent();
+ if (setDeliveryTimeAnnotation != null)
+ {
+ message.MessageAnnotations = message.MessageAnnotations ?? new MessageAnnotations();
+ message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME] = setDeliveryTimeAnnotation;
+ }
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message);
+ testPeer.ExpectDisposition(true, (deliveryState) => { });
+
+ DateTime startingTimeFrom = DateTime.UtcNow;
+ var messageConsumer = session.CreateConsumer(queue);
+ var receivedMessage = messageConsumer.Receive(TimeSpan.FromMilliseconds(3000));
+ DateTime receivingTime = DateTime.UtcNow;
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+
+ Assert.IsNotNull(receivedMessage);
+ if (expectedDeliveryTime != null)
+ {
+ Assert.AreEqual(receivedMessage.NMSDeliveryTime, expectedDeliveryTime.Value);
+ }
+ else
+ {
+ Assert.LessOrEqual(receivedMessage.NMSDeliveryTime, receivingTime);
+ Assert.GreaterOrEqual(receivedMessage.NMSDeliveryTime, startingTimeFrom);
+ }
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestDeliveryDelayNotSupportedThrowsException()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = base.EstablishConnection(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageProducer producer = session.CreateProducer(queue);
+ Assert.Throws<NotSupportedException>(() => producer.DeliveryDelay = TimeSpan.FromMinutes(17));
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestDeliveryDelayHasItsReflectionInAmqpAnnotations()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ // Determine current time
+ TimeSpan deliveryDelay = TimeSpan.FromMinutes(17);
+ long currentUnixEpochTime = new DateTimeOffset(DateTime.UtcNow + deliveryDelay).ToUnixTimeMilliseconds();
+ long currentUnixEpochTime2 = new DateTimeOffset(DateTime.UtcNow + deliveryDelay + deliveryDelay).ToUnixTimeMilliseconds();
+
+ IConnection connection = base.EstablishConnection(testPeer,
+ serverCapabilities: new Symbol[] {SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY, SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER});
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageProducer producer = session.CreateProducer(queue);
+ producer.DeliveryDelay = deliveryDelay;
+
+ // Create and transfer a new message
+ testPeer.ExpectTransfer(message =>
+ {
+ Assert.GreaterOrEqual((long) message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME], currentUnixEpochTime);
+ Assert.Less((long) message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME], currentUnixEpochTime2);
+
+ Assert.IsTrue(message.Header.Durable);
+ });
+ testPeer.ExpectClose();
+
+ ITextMessage textMessage = session.CreateTextMessage();
+
+ producer.Send(textMessage);
+ Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
new file mode 100644
index 0000000..c1bb33b
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
@@ -0,0 +1,975 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Amqp.Framing;
+using Apache.NMS;
+using Apache.NMS.AMQP.Message;
+using Apache.NMS.AMQP.Util;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+ // Adapted from ConsumerIntegrationTest to use NMSContext
+ [TestFixture]
+ public class NMSConsumerIntegrationTest : IntegrationTestFixture
+ {
+ [Test, Timeout(20_000)]
+ public void TestCloseConsumer()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow();
+
+ IQueue queue = context.GetQueue("myQueue");
+ var consumer = context.CreateConsumer(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+ consumer.Close();
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ // TODO No connection Listener in context
+ // [Test, Timeout(20_000)]
+ // public void TestRemotelyCloseConsumer()
+ // {
+ // Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
+ // string errorMessage = "buba";
+ //
+ // using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ // {
+ // ManualResetEvent consumerClosed = new ManualResetEvent(false);
+ // ManualResetEvent exceptionFired = new ManualResetEvent(false);
+ //
+ // mockConnectionListener
+ // .Setup(listener => listener.OnConsumerClosed(It.IsAny<IMessageConsumer>(), It.IsAny<Exception>()))
+ // .Callback(() => consumerClosed.Set());
+ //
+ // var context = (NmsContext) EstablishNMSContext(testPeer, "amqp.traceFrames=true");
+ // context.ConnectionInterruptedListener += () => { consumerClosed.Set(); };// AddConnectionListener(mockConnectionListener.Object);}
+ // // context.list ConnectionInterruptedListener += () => { consumerClosed.Set(); };// AddConnectionListener(mockConnectionListener.Object);}
+ // context.ExceptionListener += exception => { exceptionFired.Set(); };
+ //
+ // testPeer.ExpectBegin();
+ // // ISession session = context.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ //
+ // // Create a consumer, then remotely end it afterwards.
+ // testPeer.ExpectReceiverAttach();
+ // testPeer.ExpectLinkFlow();
+ // testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, errorMessage: errorMessage, delayBeforeSend: 400);
+ //
+ // IQueue queue = context.GetQueue("myQueue");
+ // var consumer = context.CreateConsumer(queue);
+ //
+ //
+ // // Verify the consumer gets marked closed
+ // testPeer.WaitForAllMatchersToComplete(1000);
+ //
+ // Assert.True(consumerClosed.WaitOne(2000), "Consumer closed callback didn't trigger");
+ // Assert.False(exceptionFired.WaitOne(20), "Exception listener shouldn't fire with no MessageListener");
+ //
+ // // Try closing it explicitly, should effectively no-op in client.
+ // // The test peer will throw during close if it sends anything.
+ // consumer.Close();
+ // }
+ // }
+
+ // [Test, Timeout(20_000)]
+ // public void TestRemotelyCloseConsumerWithMessageListenerFiresExceptionListener()
+ // {
+ // Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
+ // string errorMessage = "buba";
+ //
+ // using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ // {
+ // ManualResetEvent consumerClosed = new ManualResetEvent(false);
+ // ManualResetEvent exceptionFired = new ManualResetEvent(false);
+ //
+ // mockConnectionListener
+ // .Setup(listener => listener.OnConsumerClosed(It.IsAny<IMessageConsumer>(), It.IsAny<Exception>()))
+ // .Callback(() => consumerClosed.Set());
+ //
+ // NmsConnection connection = (NmsConnection) EstablishConnection(testPeer);
+ // connection.AddConnectionListener(mockConnectionListener.Object);
+ // connection.ExceptionListener += exception => { exceptionFired.Set(); };
+ //
+ // testPeer.ExpectBegin();
+ // ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ //
+ // // Create a consumer, then remotely end it afterwards.
+ // testPeer.ExpectReceiverAttach();
+ // testPeer.ExpectLinkFlow();
+ // testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, errorMessage: errorMessage, 10);
+ //
+ // IQueue queue = session.GetQueue("myQueue");
+ // IMessageConsumer consumer = session.CreateConsumer(queue);
+ //
+ // consumer.Listener += message => { };
+ //
+ // // Verify the consumer gets marked closed
+ // testPeer.WaitForAllMatchersToComplete(1000);
+ //
+ // Assert.True(consumerClosed.WaitOne(2000), "Consumer closed callback didn't trigger");
+ // Assert.True(exceptionFired.WaitOne(2000), "Exception listener should have fired with a MessageListener");
+ //
+ // // Try closing it explicitly, should effectively no-op in client.
+ // // The test peer will throw during close if it sends anything.
+ // consumer.Close();
+ // }
+ // }
+
+ [Test, Timeout(20_000)]
+ public void TestReceiveMessageWithReceiveZeroTimeout()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ IQueue queue = context.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = null } }, count: 1);
+ testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+ var consumer = context.CreateConsumer(queue);
+ IMessage message = consumer.Receive();
+ Assert.NotNull(message, "A message should have been received");
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(10000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestExceptionInOnMessageReleasesInAutoAckMode()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ IQueue queue = context.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = null } }, count: 1);
+ testPeer.ExpectDispositionThatIsReleasedAndSettled();
+
+ var consumer = context.CreateConsumer(queue);
+ consumer.Listener += message => throw new Exception();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(10000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCloseDurableTopicSubscriberDetachesWithCloseFalse()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ string topicName = "myTopic";
+ string subscriptionName = "mySubscription";
+ ITopic topic = context.GetTopic(topicName);
+
+ testPeer.ExpectDurableSubscriberAttach(topicName, subscriptionName);
+ testPeer.ExpectLinkFlow();
+
+ var durableConsumer = context.CreateDurableConsumer(topic, subscriptionName, null, false);
+
+ testPeer.ExpectDetach(expectClosed: false, sendResponse: true, replyClosed: false);
+ durableConsumer.Close();
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerReceiveThrowsIfConnectionLost()
+ {
+ DoTestConsumerReceiveThrowsIfConnectionLost(false);
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerTimedReceiveThrowsIfConnectionLost()
+ {
+ DoTestConsumerReceiveThrowsIfConnectionLost(true);
+ }
+
+ private void DoTestConsumerReceiveThrowsIfConnectionLost(bool useTimeout)
+ {
+ ManualResetEvent consumerReady = new ManualResetEvent(false);
+
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+
+ testPeer.ExpectBegin();
+
+ IQueue queue = context.GetQueue("queue");
+ context.Start();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow();
+ testPeer.RunAfterLastHandler(() => { consumerReady.WaitOne(2000); });
+ testPeer.DropAfterLastMatcher(delay: 10);
+
+ var consumer = context.CreateConsumer(queue);
+ consumerReady.Set();
+
+ try
+ {
+ if (useTimeout)
+ {
+ consumer.Receive(TimeSpan.FromMilliseconds(10_0000));
+ }
+ else
+ {
+ consumer.Receive();
+ }
+
+
+ Assert.Fail("An exception should have been thrown");
+ }
+ catch (NMSException)
+ {
+ // Expected
+ }
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ // TODO No connection Listener in context
+ // [Test, Timeout(20_000)]
+ // public void TestConsumerReceiveNoWaitThrowsIfConnectionLost()
+ // {
+ // ManualResetEvent disconnected = new ManualResetEvent(false);
+ //
+ // using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ // {
+ // NmsContext context = (NmsContext) EstablishNMSContext(testPeer);
+ // Mock<INmsConnectionListener> connectionListener = new Mock<INmsConnectionListener>();
+ //
+ // connectionListener
+ // .Setup(listener => listener.OnConnectionFailure(It.IsAny<NMSException>()))
+ // .Callback(() => { disconnected.Set(); });
+ //
+ // context.AddConnectionListener(connectionListener.Object);
+ //
+ // context.Start();
+ //
+ // testPeer.ExpectBegin();
+ //
+ // IQueue queue = context.GetQueue("queue");
+ //
+ // testPeer.ExpectReceiverAttach();
+ // testPeer.ExpectLinkFlow();
+ // testPeer.RemotelyCloseConnection(expectCloseResponse: true, errorCondition: ConnectionError.CONNECTION_FORCED, errorMessage: "buba");
+ //
+ // var consumer = context.CreateConsumer(queue);
+ //
+ // Assert.True(disconnected.WaitOne(), "Connection should be disconnected");
+ //
+ // try
+ // {
+ // consumer.ReceiveNoWait();
+ // Assert.Fail("An exception should have been thrown");
+ // }
+ // catch (NMSException)
+ // {
+ // // Expected
+ // }
+ // }
+ // }
+
+ [Test, Timeout(20_000)]
+ public void TestSetMessageListenerAfterStartAndSend()
+ {
+ int messageCount = 4;
+ CountdownEvent latch = new CountdownEvent(messageCount);
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+ IQueue destination = context.GetQueue("myQueue");
+ context.Start();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), messageCount);
+
+ var consumer = context.CreateConsumer(destination);
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+ }
+
+ consumer.Listener += message => latch.Signal();
+
+ Assert.True(latch.Wait(4000), "Messages not received within given timeout. Count remaining: " + latch.CurrentCount);
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+ consumer.Close();
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+
+ // TODO Connection is started anyway when creating consumer
+ // [Test, Timeout(20_000)]
+ // public void TestNoReceivedMessagesWhenConnectionNotStarted()
+ // {
+ // using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ // {
+ // var context = EstablishNMSContext(testPeer);
+ //
+ // testPeer.ExpectBegin();
+ //
+ // IQueue destination = context.GetQueue("myQueue");
+ //
+ // testPeer.ExpectReceiverAttach();
+ // testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 3);
+ //
+ // // cREATING CONSUMER STARTS CONNECTION
+ // var consumer = context.CreateConsumer(destination);
+ //
+ // // Wait for a message to arrive then try and receive it, which should not happen
+ // // since the connection is not started.
+ // Assert.Null(consumer.Receive(TimeSpan.FromMilliseconds(100)));
+ //
+ // testPeer.ExpectEnd();
+ // testPeer.ExpectClose();
+ // context.Close();
+ //
+ // testPeer.WaitForAllMatchersToComplete(2000);
+ // }
+ // }
+
+ // TODO Connection is started anyway when creating consumer
+ // [Test, Timeout(20_000)]
+ // public void TestNoReceivedNoWaitMessagesWhenConnectionNotStarted()
+ // {
+ // using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ // {
+ // var context = EstablishNMSContext(testPeer);
+ //
+ // testPeer.ExpectBegin();
+ //
+ // IQueue destination = context.GetQueue("myQueue");
+ //
+ // testPeer.ExpectReceiverAttach();
+ // testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 3);
+ //
+ // var consumer = context.CreateConsumer(destination);
+ //
+ // // Wait for a message to arrive then try and receive it, which should not happen
+ // // since the connection is not started.
+ // Assert.Null(consumer.ReceiveNoWait());
+ //
+ // testPeer.ExpectEnd();
+ // testPeer.ExpectClose();
+ // context.Close();
+ //
+ // testPeer.WaitForAllMatchersToComplete(2000);
+ // }
+ // }
+
+ [Test, Timeout(20_000)]
+ public void TestSyncReceiveFailsWhenListenerSet()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+
+ testPeer.ExpectBegin();
+
+ IQueue destination = context.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow();
+
+ var consumer = context.CreateConsumer(destination);
+
+ consumer.Listener += message => { };
+
+ Assert.Catch<NMSException>(() => consumer.Receive(), "Should have thrown an exception.");
+ Assert.Catch<NMSException>(() => consumer.Receive(TimeSpan.FromMilliseconds(1000)), "Should have thrown an exception.");
+ Assert.Catch<NMSException>(() => consumer.ReceiveNoWait(), "Should have thrown an exception.");
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCreateProducerInOnMessage()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ IQueue destination = context.GetQueue("myQueue");
+ IQueue outbound = context.GetQueue("ForwardDest");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+ testPeer.ExpectSenderAttach();
+ testPeer.ExpectTransfer(messageMatcher: Assert.NotNull);
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+ testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+ var consumer = context.CreateConsumer(destination);
+
+ consumer.Listener += message =>
+ {
+ var producer = context.CreateProducer();
+ producer.Send(outbound, message);
+ producer.Close();
+ };
+
+ testPeer.WaitForAllMatchersToComplete(10_000);
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestMessageListenerCallsConnectionCloseThrowsIllegalStateException()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ IQueue destination = context.GetQueue("myQueue");
+ context.Start();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+ var consumer = context.CreateConsumer(destination);
+
+ testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+ ManualResetEvent latch = new ManualResetEvent(false);
+ Exception exception = null;
+ consumer.Listener += message =>
+ {
+ try
+ {
+ context.Close();
+ }
+ catch (Exception e)
+ {
+ exception = e;
+ }
+
+ latch.Set();
+ };
+
+ Assert.True(latch.WaitOne(4000), "Messages not received within given timeout.");
+ Assert.IsNotNull(exception);
+ Assert.IsInstanceOf<IllegalStateException>(exception);
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+ consumer.Close();
+
+ testPeer.ExpectEnd();
+ // testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestMessageListenerCallsConnectionStopThrowsIllegalStateException()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ IQueue destination = context.GetQueue("myQueue");
+ context.Start();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+ var consumer = context.CreateConsumer(destination);
+
+ testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+ ManualResetEvent latch = new ManualResetEvent(false);
+ Exception exception = null;
+ consumer.Listener += message =>
+ {
+ try
+ {
+ context.Stop();
+ }
+ catch (Exception e)
+ {
+ exception = e;
+ }
+
+ latch.Set();
+ };
+
+ Assert.True(latch.WaitOne(3000), "Messages not received within given timeout.");
+ Assert.IsNotNull(exception);
+ Assert.IsInstanceOf<IllegalStateException>(exception);
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+ consumer.Close();
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestMessageListenerCallsSessionCloseThrowsIllegalStateException()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ IQueue destination = context.GetQueue("myQueue");
+ context.Start();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+ var consumer = context.CreateConsumer(destination);
+
+ testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+ ManualResetEvent latch = new ManualResetEvent(false);
+ Exception exception = null;
+ consumer.Listener += message =>
+ {
+ try
+ {
+ context.Close();
+ }
+ catch (Exception e)
+ {
+ exception = e;
+ }
+
+ latch.Set();
+ };
+
+ Assert.True(latch.WaitOne(3000), "Messages not received within given timeout.");
+ Assert.IsNotNull(exception);
+ Assert.IsInstanceOf<IllegalStateException>(exception);
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+ consumer.Close();
+
+ testPeer.ExpectEnd();
+ // testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+
+ // TODO: To be fixed
+ [Test, Timeout(20_000), Ignore("Ignore")]
+ public void TestMessageListenerClosesItsConsumer()
+ {
+ var latch = new ManualResetEvent(false);
+ var exceptionListenerFired = new ManualResetEvent(false);
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ context.ExceptionListener += _ => exceptionListenerFired.Set();
+
+ testPeer.ExpectBegin();
+
+ IQueue destination = context.GetQueue("myQueue");
+ context.Start();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+ var consumer = context.CreateConsumer(destination);
+
+ testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true, creditMatcher: credit => Assert.AreEqual(99, credit)); // Not sure if expected credit is right
+ testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+ Exception exception = null;
+ consumer.Listener += message =>
+ {
+ try
+ {
+ consumer.Close();
+ latch.Set();
+ }
+ catch (Exception e)
+ {
+ exception = e;
+ }
+ };
+
+ Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(1000)), "Process not completed within given timeout");
+ Assert.IsNull(exception, "No error expected during close");
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ Assert.False(exceptionListenerFired.WaitOne(20), "Exception listener shouldn't have fired");
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestRecoverOrderingWithAsyncConsumer()
+ {
+ ManualResetEvent latch = new ManualResetEvent(false);
+ Exception asyncError = null;
+
+ int recoverCount = 5;
+ int messageCount = 8;
+ int testPayloadLength = 255;
+ string payload = Encoding.UTF8.GetString(new byte[testPayloadLength]);
+
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer, acknowledgementMode:AcknowledgementMode.ClientAcknowledge);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ IQueue destination = context.GetQueue("myQueue");
+ context.Start();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(
+ message: new Amqp.Message() { BodySection = new AmqpValue() { Value = payload } },
+ count: messageCount,
+ drain: false,
+ nextIncomingId: 1,
+ addMessageNumberProperty: true,
+ sendDrainFlowResponse: false,
+ sendSettled: false,
+ creditMatcher: credit => Assert.Greater(credit, messageCount)
+ );
+
+ var consumer = context.CreateConsumer(destination);
+
+ bool complete = false;
+ int messageSeen = 0;
+ int expectedIndex = 0;
+ consumer.Listener += message =>
+ {
+ if (complete)
+ {
+ return;
+ }
+
+ try
+ {
+ int actualIndex = message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER);
+ Assert.AreEqual(expectedIndex, actualIndex, "Received Message Out Of Order");
+
+ // don't ack the message until we receive it X times
+ if (messageSeen < recoverCount)
+ {
+ context.Recover();
+ messageSeen++;
+ }
+ else
+ {
+ messageSeen = 0;
+ expectedIndex++;
+
+ // Have the peer expect the accept the disposition (1-based, hence pre-incremented).
+ testPeer.ExpectDisposition(settled: true,
+ stateMatcher: state => Assert.AreEqual(state.Descriptor.Code, MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code
+ ));
+
+ message.Acknowledge();
+
+ if (expectedIndex == messageCount)
+ {
+ complete = true;
+ latch.Set();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ complete = true;
+ asyncError = e;
+ latch.Set();
+ }
+ };
+
+ Assert.True(latch.WaitOne(TimeSpan.FromSeconds(15)), "Messages not received within given timeout.");
+ Assert.IsNull(asyncError, "Unexpected exception");
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerCloseWaitsForAsyncDeliveryToComplete()
+ {
+ ManualResetEvent latch = new ManualResetEvent(false);
+
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ IQueue destination = context.GetQueue("myQueue");
+ context.Start();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1);
+
+ var consumer = context.CreateConsumer(destination);
+
+ testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+ consumer.Listener += _ =>
+ {
+ latch.Set();
+ Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+ };
+
+ Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout.");
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+ consumer.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSessionCloseWaitsForAsyncDeliveryToComplete()
+ {
+ ManualResetEvent latch = new ManualResetEvent(false);
+
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ IQueue destination = context.GetQueue("myQueue");
+ context.Start();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1);
+
+ var consumer = context.CreateConsumer(destination);
+
+ testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+ consumer.Listener += _ =>
+ {
+ latch.Set();
+ Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+ };
+
+ Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout.");
+
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConnectionCloseWaitsForAsyncDeliveryToComplete()
+ {
+ ManualResetEvent latch = new ManualResetEvent(false);
+
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ IQueue destination = context.GetQueue("myQueue");
+ context.Start();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1);
+
+ var consumer = context.CreateConsumer(destination);
+
+ testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+ consumer.Listener += _ =>
+ {
+ latch.Set();
+ Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+ };
+
+ Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout.");
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestRecoveredMessageShouldNotBeMutated()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer, acknowledgementMode:AcknowledgementMode.ClientAcknowledge);
+ context.Start();
+
+ testPeer.ExpectBegin();
+ IQueue destination = context.GetQueue("myQueue");
+ string originalPayload = "testMessage";
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message { BodySection = new AmqpValue() { Value = originalPayload } }, count: 1);
+
+ var consumer = context.CreateConsumer(destination);
+ NmsTextMessage message = consumer.Receive() as NmsTextMessage;
+ Assert.NotNull(message);
+ message.IsReadOnlyBody = false;
+ message.Text = message.Text + "Received";
+ context.Recover();
+
+ ITextMessage recoveredMessage = consumer.Receive() as ITextMessage;
+ Assert.IsNotNull(recoveredMessage);
+ Assert.AreNotEqual(message.Text, recoveredMessage.Text);
+ Assert.AreEqual(originalPayload, recoveredMessage.Text);
+ Assert.AreNotSame(message, recoveredMessage);
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/NMSContextIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/NMSContextIntegrationTest.cs
new file mode 100644
index 0000000..a1b43e8
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/NMSContextIntegrationTest.cs
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+ // Adapted from SessionIntegrationTest to use NMSContext
+ [TestFixture]
+ public class NMSContextIntegrationTest : IntegrationTestFixture
+ {
+ [Test, Timeout(20_000)]
+ public void TestClose()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ INMSContext context = EstablishNMSContext(testPeer);
+
+ testPeer.ExpectClose();
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCreateProducer()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+
+ testPeer.ExpectSenderAttach();
+
+ var producer = context.CreateProducer();
+
+ testPeer.ExpectDetach(true, true, true);
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ producer.Close();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCreateConsumer()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow();
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ var consumer = context.CreateConsumer(context.GetQueue("myQueue"));
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCreateConsumerWithEmptySelector()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow();
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ IQueue queue = context.GetQueue("myQueue");
+ context.CreateConsumer(queue, "");
+ context.CreateConsumer(queue, "", noLocal: false);
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCreateConsumerWithNullSelector()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow();
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ IQueue queue = context.GetQueue("myQueue");
+ context.CreateConsumer(queue, null);
+ context.CreateConsumer(queue, null, noLocal: false);
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCreateDurableConsumer()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ string topicName = "myTopic";
+ ITopic topic = context.GetTopic(topicName);
+ string subscriptionName = "mySubscription";
+
+ testPeer.ExpectDurableSubscriberAttach(topicName, subscriptionName);
+ testPeer.ExpectLinkFlow();
+
+ var durableConsumer = context.CreateDurableConsumer(topic, subscriptionName, null, false);
+ Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+
+ [Test, Timeout(20_000)]
+ public void TestCreateTemporaryQueue()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+
+ testPeer.ExpectBegin();
+
+ string dynamicAddress = "myTempQueueAddress";
+ testPeer.ExpectTempQueueCreationAttach(dynamicAddress);
+
+ ITemporaryQueue temporaryQueue = context.CreateTemporaryQueue();
+ Assert.NotNull(temporaryQueue, "TemporaryQueue object was null");
+ Assert.NotNull(temporaryQueue.QueueName, "TemporaryQueue queue name was null");
+ Assert.AreEqual(dynamicAddress, temporaryQueue.QueueName, "TemporaryQueue name not as expected");
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCreateTemporaryTopic()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+
+ testPeer.ExpectBegin();
+
+ string dynamicAddress = "myTempTopicAddress";
+ testPeer.ExpectTempTopicCreationAttach(dynamicAddress);
+
+ ITemporaryTopic temporaryTopic = context.CreateTemporaryTopic();
+ Assert.NotNull(temporaryTopic, "TemporaryTopic object was null");
+ Assert.NotNull(temporaryTopic.TopicName, "TemporaryTopic name was null");
+ Assert.AreEqual(dynamicAddress, temporaryTopic.TopicName, "TemporaryTopic name not as expected");
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCreateSharedConsumer()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ string topicName = "myTopic";
+ ITopic topic = context.GetTopic(topicName);
+ string subscriptionName = "mySubscription";
+
+ testPeer.ExpectSharedSubscriberAttach(topicName, subscriptionName);
+ testPeer.ExpectLinkFlow();
+
+ var durableConsumer = context.CreateSharedConsumer(topic, subscriptionName, null); //, false);
+ Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(20000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCreateSharedDurableConsumer()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+
+ string topicName = "myTopic";
+ ITopic topic = context.GetTopic(topicName);
+ string subscriptionName = "mySubscription";
+
+ testPeer.ExpectSharedDurableSubscriberAttach(topicName, subscriptionName);
+ testPeer.ExpectLinkFlow();
+
+ var durableConsumer = context.CreateSharedDurableConsumer(topic, subscriptionName, null); //, false);
+ Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/NMSProducerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/NMSProducerIntegrationTest.cs
new file mode 100644
index 0000000..22563a0
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/NMSProducerIntegrationTest.cs
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Amqp.Framing;
+using Amqp.Types;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+ // Adapted from ProducerIntegrationTest to use NMSContext
+ [TestFixture]
+ public class NMSProducerIntegrationTest : IntegrationTestFixture
+ {
+ private const long TICKS_PER_MILLISECOND = 10000;
+
+ [Test, Timeout(20_000)]
+ public void TestCloseSender()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = base.EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue queue = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ producer.Close();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSentTextMessageCanBeModified()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = base.EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue queue = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ testPeer.ExpectTransfer(x => Assert.AreEqual(text, (x.BodySection as AmqpValue).Value));
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ ITextMessage message = context.CreateTextMessage(text);
+ producer.Send(queue, message);
+
+ Assert.AreEqual(text, message.Text);
+ message.Text = text + text;
+ Assert.AreEqual(text + text, message.Text);
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestDefaultDeliveryModeProducesDurableMessages()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = base.EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue queue = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ // Create and transfer a new message
+ testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable));
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ ITextMessage textMessage = context.CreateTextMessage();
+
+ producer.Send(queue, textMessage);
+ Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestProducerOverridesMessageDeliveryMode()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = base.EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue queue = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ // Create and transfer a new message, explicitly setting the deliveryMode on the
+ // message (which applications shouldn't) to NON_PERSISTENT and sending it to check
+ // that the producer ignores this value and sends the message as PERSISTENT(/durable)
+ testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable));
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ ITextMessage textMessage = context.CreateTextMessage();
+ textMessage.NMSDeliveryMode = MsgDeliveryMode.NonPersistent;
+ Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode);
+
+ producer.Send(queue, textMessage);
+
+ Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+ context.Close();
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendingMessageNonPersistentProducerSetDurableFalse()
+ {
+ DoSendingMessageNonPersistentTestImpl(true);
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendingMessageNonPersistentProducerOmitsHeader()
+ {
+ DoSendingMessageNonPersistentTestImpl(false);
+ }
+
+ private void DoSendingMessageNonPersistentTestImpl(bool setPriority)
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ //Add capability to indicate support for ANONYMOUS-RELAY
+ Symbol[] serverCapabilities = {SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY};
+ var context = EstablishNMSContext(testPeer, serverCapabilities: serverCapabilities);
+ testPeer.ExpectBegin();
+
+ string queueName = "myQueue";
+ Action<object> targetMatcher = t =>
+ {
+ var target = t as Target;
+ Assert.IsNotNull(target);
+ };
+
+
+ testPeer.ExpectSenderAttach(targetMatcher: targetMatcher, sourceMatcher: Assert.NotNull, senderSettled: false);
+
+ IQueue queue = context.GetQueue(queueName);
+ INMSProducer producer = context.CreateProducer();
+
+ byte priority = 5;
+ String text = "myMessage";
+ testPeer.ExpectTransfer(messageMatcher: message =>
+ {
+ if (setPriority)
+ {
+ Assert.IsFalse(message.Header.Durable);
+ Assert.AreEqual(priority, message.Header.Priority);
+ }
+
+ Assert.AreEqual(text, (message.BodySection as AmqpValue).Value);
+ }, stateMatcher: Assert.IsNull,
+ settled: false,
+ sendResponseDisposition: true,
+ responseState: new Accepted(),
+ responseSettled: true);
+
+ ITextMessage textMessage = context.CreateTextMessage(text);
+
+ producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+ if (setPriority)
+ producer.Priority = (MsgPriority) priority;
+
+ producer.Send(queue, textMessage);
+
+ Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode, "Should have NonPersistent delivery mode set");
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendingMessageSetsNMSDestination()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue destination = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ string text = "myMessage";
+ ITextMessage message = context.CreateTextMessage(text);
+
+ testPeer.ExpectTransfer(m => Assert.AreEqual(text, (m.BodySection as AmqpValue).Value));
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ Assert.IsNull(message.NMSDestination, "Should not yet have a NMSDestination");
+
+ producer.Send(destination, message);
+
+ Assert.AreEqual(destination, message.NMSDestination, "Should have had NMSDestination set");
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendingMessageSetsNMSTimestamp()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue destination = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ // Create matcher to expect the absolute-expiry-time field of the properties section to
+ // be set to a value greater than 'now'+ttl, within a delta.
+
+ DateTime creationLower = DateTime.UtcNow;
+ DateTime creationUpper = creationLower + TimeSpan.FromMilliseconds(3000);
+
+ var text = "myMessage";
+ testPeer.ExpectTransfer(m =>
+ {
+ Assert.IsTrue(m.Header.Durable);
+ Assert.That(m.Properties.CreationTime.Ticks, Is.GreaterThanOrEqualTo(creationLower.Ticks).Within(TICKS_PER_MILLISECOND));
+ Assert.That(m.Properties.CreationTime.Ticks, Is.LessThanOrEqualTo(creationUpper.Ticks).Within(TICKS_PER_MILLISECOND));
+ Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
+ });
+
+ ITextMessage message = context.CreateTextMessage(text);
+ producer.Send(destination, message);
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendingMessageSetsNMSExpirationRelatedAbsoluteExpiryAndTtlFields()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue destination = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ uint ttl = 100_000;
+ DateTime currentTime = DateTime.UtcNow;
+ DateTime expirationLower = currentTime + TimeSpan.FromMilliseconds(ttl);
+ DateTime expirationUpper = currentTime + TimeSpan.FromMilliseconds(ttl) + TimeSpan.FromMilliseconds(5000);
+
+ // Create matcher to expect the absolute-expiry-time field of the properties section to
+ // be set to a value greater than 'now'+ttl, within a delta.
+ string text = "myMessage";
+ testPeer.ExpectTransfer(m =>
+ {
+ Assert.IsTrue(m.Header.Durable);
+ Assert.AreEqual(ttl, m.Header.Ttl);
+ Assert.That(m.Properties.AbsoluteExpiryTime.Ticks, Is.GreaterThanOrEqualTo(expirationLower.Ticks).Within(TICKS_PER_MILLISECOND));
+ Assert.That(m.Properties.AbsoluteExpiryTime.Ticks, Is.LessThanOrEqualTo(expirationUpper.Ticks).Within(TICKS_PER_MILLISECOND));
+ Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
+ });
+
+ ITextMessage message = context.CreateTextMessage(text);
+ producer.TimeToLive = TimeSpan.FromMilliseconds(ttl);
+ producer.Priority = NMSConstants.defaultPriority;
+ producer.DeliveryMode = NMSConstants.defaultDeliveryMode;
+ producer.Send(destination, message);
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestMessagesAreProducedWithProperDefaultPriorityWhenNoPrioritySpecified()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue destination = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ byte priority = 4;
+
+ testPeer.ExpectTransfer(m => Assert.AreEqual(priority, m.Header.Priority));
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ ITextMessage message = context.CreateTextMessage();
+ Assert.AreEqual(MsgPriority.BelowNormal, message.NMSPriority);
+
+ producer.Send(destination, message);
+
+ Assert.AreEqual((MsgPriority) priority, message.NMSPriority);
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestNonDefaultPriorityProducesMessagesWithPriorityFieldAndSetsNMSPriority()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue destination = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ byte priority = 9;
+
+ testPeer.ExpectTransfer(m => Assert.AreEqual(priority, m.Header.Priority));
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ ITextMessage message = context.CreateTextMessage();
+ Assert.AreEqual(MsgPriority.BelowNormal, message.NMSPriority);
+
+ producer.DeliveryMode = MsgDeliveryMode.Persistent;
+ producer.Priority = (MsgPriority) priority;
+ producer.TimeToLive = NMSConstants.defaultTimeToLive;
+ producer.Send(destination, message);
+
+ Assert.AreEqual((MsgPriority) priority, message.NMSPriority);
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendingMessageSetsNMSMessageId()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue destination = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ string text = "myMessage";
+ string actualMessageId = null;
+ testPeer.ExpectTransfer(m =>
+ {
+ Assert.IsTrue(m.Header.Durable);
+ Assert.IsNotEmpty(m.Properties.MessageId);
+ actualMessageId = m.Properties.MessageId;
+ });
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ ITextMessage message = context.CreateTextMessage(text);
+ Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set");
+
+ producer.Send(destination, message);
+
+ Assert.IsNotNull(message.NMSMessageId);
+ Assert.IsNotEmpty(message.NMSMessageId, "NMSMessageId should be set");
+ Assert.IsTrue(message.NMSMessageId.StartsWith("ID:"), "MMS 'ID:' prefix not found");
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ // Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally
+ Assert.AreEqual(message.NMSMessageId, actualMessageId, "Expected NMSMessageId value to be present in AMQP message");
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendingMessageWithDisableMessageIdHint()
+ {
+ DoSendingMessageWithDisableMessageIdHintTestImpl(false);
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendingMessageWithDisableMessageIdHintAndExistingMessageId()
+ {
+ DoSendingMessageWithDisableMessageIdHintTestImpl(true);
+ }
+
+ private void DoSendingMessageWithDisableMessageIdHintTestImpl(bool existingId)
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue destination = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ string text = "myMessage";
+ testPeer.ExpectTransfer(m =>
+ {
+ Assert.IsTrue(m.Header.Durable);
+ Assert.IsNull(m.Properties.MessageId); // Check there is no message-id value;
+ Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
+ });
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ ITextMessage message = context.CreateTextMessage(text);
+
+ Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set");
+
+ if (existingId)
+ {
+ string existingMessageId = "ID:this-should-be-overwritten-in-send";
+ message.NMSMessageId = existingMessageId;
+ Assert.AreEqual(existingMessageId, message.NMSMessageId, "NMSMessageId should now be se");
+ }
+
+ producer.DisableMessageID = true;
+
+ producer.Send(destination, message);
+
+ Assert.IsNull(message.NMSMessageId, "NMSMessageID should be null");
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+ }
+ }
+
+ // TODO No connection listener in nms context
+ // [Test, Timeout(20_000)]
+ // public void TestRemotelyCloseProducer()
+ // {
+ // string breadCrumb = "ErrorMessageBreadCrumb";
+ //
+ // ManualResetEvent producerClosed = new ManualResetEvent(false);
+ // Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
+ // mockConnectionListener
+ // .Setup(listener => listener.OnProducerClosed(It.IsAny<NmsMessageProducer>(), It.IsAny<Exception>()))
+ // .Callback(() => { producerClosed.Set(); });
+ //
+ // using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ // {
+ // NmsContext context = (NmsContext) EstablishNMSContext(testPeer);
+ // context.AddConnectionListener(mockConnectionListener.Object);
+ //
+ // testPeer.ExpectBegin();
+ // ISession session = context.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ //
+ // // Create a producer, then remotely end it afterwards.
+ // testPeer.ExpectSenderAttach();
+ // testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, breadCrumb, delayBeforeSend: 10);
+ //
+ // IQueue destination = session.GetQueue("myQueue");
+ // IMessageProducer producer = session.CreateProducer(destination);
+ //
+ // // Verify the producer gets marked closed
+ // testPeer.WaitForAllMatchersToComplete(1000);
+ //
+ // Assert.True(producerClosed.WaitOne(TimeSpan.FromMilliseconds(1000)), "Producer closed callback didn't trigger");
+ // Assert.That(() => producer.DisableMessageID, Throws.Exception.InstanceOf<IllegalStateException>(), "Producer never closed");
+ //
+ // // Try closing it explicitly, should effectively no-op in client.
+ // // The test peer will throw during close if it sends anything.
+ // producer.Close();
+ // }
+ // }
+
+ [Test, Timeout(20_000)]
+ public void TestSendWhenLinkCreditIsZeroAndTimeout()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer, optionsString: "nms.sendTimeout=500");
+ testPeer.ExpectBegin();
+
+ IQueue queue = context.GetQueue("myQueue");
+
+ ITextMessage message = context.CreateTextMessage("text");
+
+ // Expect the producer to attach. Don't send any credit so that the client will
+ // block on a send and we can test our timeouts.
+ testPeer.ExpectSenderAttachWithoutGrantingCredit();
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ var producer = context.CreateProducer();
+
+ Assert.Catch<Exception>(() => producer.Send(queue, message), "Send should time out.");
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendTimesOutWhenNoDispositionArrives()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer, optionsString: "nms.sendTimeout=500");
+ testPeer.ExpectBegin();
+
+ IQueue queue = context.GetQueue("myQueue");
+
+ ITextMessage message = context.CreateTextMessage("text");
+
+ // Expect the producer to attach and grant it some credit, it should send
+ // a transfer which we will not send any response for which should cause the
+ // send operation to time out.
+ testPeer.ExpectSenderAttach();
+ testPeer.ExpectTransferButDoNotRespond(messageMatcher: Assert.NotNull);
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ var producer = context.CreateProducer();
+
+ Assert.Catch<Exception>(() => producer.Send(queue, message), "Send should time out.");
+
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendWorksWhenConnectionNotStarted()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue destination = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ testPeer.ExpectTransfer(Assert.IsNotNull);
+
+ producer.Send(destination, context.CreateMessage());
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+ producer.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendWorksAfterConnectionStopped()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue destination = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+
+ testPeer.ExpectTransfer(Assert.IsNotNull);
+
+ context.Stop();
+
+ producer.Send(destination, context.CreateMessage());
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+
+ producer.Close();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendingMessagePersistentSetsBatchableFalse()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue destination = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+ testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull,
+ stateMatcher: Assert.IsNull,
+ settled: false,
+ sendResponseDisposition: true,
+ responseState: new Accepted(),
+ responseSettled: true,
+ batchable: false);
+
+ IMessage message = context.CreateMessage();
+ producer.DeliveryMode = MsgDeliveryMode.Persistent;
+ producer.Priority = MsgPriority.Normal;
+ producer.TimeToLive = NMSConstants.defaultTimeToLive;
+ producer.Send(destination, message);
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendingMessageNonPersistentSetsBatchableFalse()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var context = EstablishNMSContext(testPeer);
+ context.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ IQueue destination = context.GetQueue("myQueue");
+ var producer = context.CreateProducer();
+ testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull,
+ stateMatcher: Assert.IsNull,
+ settled: false,
+ sendResponseDisposition: true,
+ responseState: new Accepted(),
+ responseSettled: true,
+ batchable: false);
+
+ IMessage message = context.CreateMessage();
+ producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+ producer.Priority = MsgPriority.Normal;
+ producer.TimeToLive = NMSConstants.defaultTimeToLive;
+ producer.Send(destination, message);
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationAsyncTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationAsyncTest.cs
new file mode 100644
index 0000000..88d05df
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationAsyncTest.cs
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading.Tasks;
+using Amqp.Framing;
+using Apache.NMS;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+ [TestFixture]
+ public class ProducerIntegrationAsyncTest : IntegrationTestFixture
+ {
+ [Test, Timeout(20_000)]
+ public void TestSentAsyncIsAsynchronous()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = base.EstablishConnection(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageProducer producer = session.CreateProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ testPeer.ExpectTransfer(messageMatcher: m => Assert.AreEqual(text, (m.BodySection as AmqpValue).Value),
+ settled: false,
+ sendResponseDisposition: true,
+ responseState: new Accepted(),
+ responseSettled: true,
+ stateMatcher: Assert.IsNull,
+ dispositionDelay: 10); // 10ms should be enough
+ testPeer.ExpectClose();
+
+ ITextMessage message = session.CreateTextMessage(text);
+ var sendTask = producer.SendAsync(message);
+ // Instantly check if its not completed yet, we want async, so it should not be completed right after
+ Assert.AreEqual(false, sendTask.IsCompleted);
+
+ // And now wait for task to complete
+ sendTask.Wait(20_000);
+
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public async Task TestSentAsync()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = base.EstablishConnection(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageProducer producer = session.CreateProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ testPeer.ExpectTransfer(messageMatcher: m => Assert.AreEqual(text, (m.BodySection as AmqpValue).Value),
+ settled: false,
+ sendResponseDisposition: true,
+ responseState: new Accepted(),
+ responseSettled: true,
+ stateMatcher: Assert.IsNull,
+ dispositionDelay: 10); // 10ms should be enough
+ testPeer.ExpectClose();
+
+ ITextMessage message = session.CreateTextMessage(text);
+ await producer.SendAsync(message);
+
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+
+ [Test, Timeout(20_000)]
+ public async Task TestProducerWorkWithAsyncAwait()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = base.EstablishConnection(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageProducer producer = session.CreateProducer(queue);
+
+ // Create and transfer a new message, explicitly setting the deliveryMode on the
+ // message (which applications shouldn't) to NON_PERSISTENT and sending it to check
+ // that the producer ignores this value and sends the message as PERSISTENT(/durable)
+ testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable));
+ testPeer.ExpectClose();
+
+ ITextMessage textMessage = session.CreateTextMessage();
+ textMessage.NMSDeliveryMode = MsgDeliveryMode.NonPersistent;
+ Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode);
+
+ await producer.SendAsync(textMessage);
+
+ Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+ connection.Close();
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/SessionIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/SessionIntegrationTest.cs
index 4fbdc29..da336e8 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/SessionIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/SessionIntegrationTest.cs
@@ -227,5 +227,62 @@
testPeer.WaitForAllMatchersToComplete(1000);
}
}
+
+ [Test, Timeout(20_000)]
+ public void TestCreateSharedConsumer()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+
+ string topicName = "myTopic";
+ ITopic topic = session.GetTopic(topicName);
+ string subscriptionName = "mySubscription";
+
+ testPeer.ExpectSharedSubscriberAttach(topicName, subscriptionName);
+ testPeer.ExpectLinkFlow();
+
+ IMessageConsumer durableConsumer = session.CreateSharedConsumer(topic, subscriptionName, null);//, false);
+ // IMessageConsumer durableConsumer = session.CreateDurableConsumer(topic, subscriptionName, null, false);
+ Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(20000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCreateSharedDurableConsumer()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+
+ string topicName = "myTopic";
+ ITopic topic = session.GetTopic(topicName);
+ string subscriptionName = "mySubscription";
+
+ testPeer.ExpectSharedDurableSubscriberAttach(topicName, subscriptionName);
+ testPeer.ExpectLinkFlow();
+
+ IMessageConsumer durableConsumer = session.CreateSharedDurableConsumer(topic, subscriptionName, null); //, false);
+ Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
}
}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs
index e8c3f6f..b5688d6 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs
@@ -20,7 +20,7 @@
namespace NMS.AMQP.Test.TestAmqp
{
- class NLogAdapter : ITrace
+ public class NLogAdapter : ITrace
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 164f141..eccf3e9 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -555,6 +555,7 @@
Handle = context.Command.Handle,
LinkName = context.Command.LinkName,
Target = context.Command.Target,
+ OfferedCapabilities = new []{ SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS }
};
if (refuseLink)
@@ -605,7 +606,49 @@
ExpectReceiverAttach(linkNameMatcher: linkNameMatcher, sourceMatcher: sourceMatcher, targetMatcher: targetMatcher, settled: false, errorType: null, errorMessage: null);
}
+
+ public void ExpectSharedDurableSubscriberAttach(string topicName, string subscriptionName)
+ {
+ Action<string> linkNameMatcher = linkName => Assert.AreEqual(subscriptionName, linkName);
+ Action<object> sourceMatcher = o =>
+ {
+ Assert.IsInstanceOf<Source>(o);
+ var source = (Source) o;
+ Assert.AreEqual(topicName, source.Address);
+ Assert.IsFalse(source.Dynamic);
+ Assert.AreEqual(TerminusDurability.UNSETTLED_STATE, (TerminusDurability) source.Durable);
+ Assert.AreEqual(TerminusExpiryPolicy.NEVER, source.ExpiryPolicy);
+ CollectionAssert.Contains(source.Capabilities, SymbolUtil.ATTACH_CAPABILITIES_TOPIC);
+ CollectionAssert.Contains(source.Capabilities, SymbolUtil.SHARED);
+ };
+
+ Action<Target> targetMatcher = Assert.IsNotNull;
+
+ ExpectReceiverAttach(linkNameMatcher: linkNameMatcher, sourceMatcher: sourceMatcher, targetMatcher: targetMatcher, settled: false, errorType: null, errorMessage: null);
+ }
+
+ public void ExpectSharedSubscriberAttach(string topicName, string subscriptionName)
+ {
+ Action<string> linkNameMatcher = linkName => Assert.AreEqual(subscriptionName+"|volatile1", linkName);
+
+ Action<object> sourceMatcher = o =>
+ {
+ Assert.IsInstanceOf<Source>(o);
+ var source = (Source) o;
+ Assert.AreEqual(topicName, source.Address);
+ Assert.IsFalse(source.Dynamic);
+ // Assert.AreEqual(TerminusDurability.UNSETTLED_STATE, (TerminusDurability) source.Durable);
+ // Assert.AreEqual(TerminusExpiryPolicy.NEVER, source.ExpiryPolicy);
+ CollectionAssert.Contains(source.Capabilities, SymbolUtil.ATTACH_CAPABILITIES_TOPIC);
+ CollectionAssert.Contains(source.Capabilities, SymbolUtil.SHARED);
+ };
+
+ Action<Target> targetMatcher = Assert.IsNotNull;
+
+ ExpectReceiverAttach(linkNameMatcher: linkNameMatcher, sourceMatcher: sourceMatcher, targetMatcher: targetMatcher, settled: false, errorType: null, errorMessage: null);
+ }
+
public void ExpectDetach(bool expectClosed, bool sendResponse, bool replyClosed, Symbol errorType = null, String errorMessage = null)
{
var detachMatcher = new FrameMatcher<Detach>()