Merge pull request #62 from lukeabsent/AMQNET-637
AMQNET-637 Move LangVersion back to 7.3 in interop test
diff --git a/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj b/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj
index e167532..abbd2fc 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj
+++ b/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj
@@ -20,7 +20,7 @@
<TargetFramework Condition="'$(AppTargetFramework)' != ''">$(AppTargetFramework)</TargetFramework>
<RootNamespace>NMS.AMQP.Test</RootNamespace>
<AssemblyName>NMS.AMQP.Interop.Test</AssemblyName>
- <LangVersion>8</LangVersion>
+ <LangVersion>7.3</LangVersion>
</PropertyGroup>
<ItemGroup>
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
index 9f174d4..42e74eb 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
@@ -33,7 +33,7 @@
public void TestSelectors()
{
PurgeQueue(TimeSpan.FromMilliseconds(500));
-
+
Connection = CreateAmqpConnection();
Connection.Start();
@@ -47,7 +47,7 @@
string text = "Hello + 9";
message = session.CreateTextMessage(text);
producer.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Highest, TimeSpan.Zero);
-
+
producer.Close();
IMessageConsumer messageConsumer = session.CreateConsumer(queue, "JMSPriority > 8");
@@ -62,7 +62,7 @@
public void TestSelectorsWithJMSType()
{
PurgeQueue(TimeSpan.FromMilliseconds(500));
-
+
Connection = CreateAmqpConnection();
Connection.Start();
@@ -78,7 +78,7 @@
ITextMessage message2 = session.CreateTextMessage(text);
message2.NMSType = type;
producer.Send(message2, MsgDeliveryMode.Persistent, MsgPriority.Highest, TimeSpan.Zero);
-
+
producer.Close();
IMessageConsumer messageConsumer = session.CreateConsumer(queue, $"JMSType = '{type}'");
@@ -99,82 +99,84 @@
int counter = 0;
- using ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-
- string subscriptionName = "mySubscriptionName";
- ITopic topicProducer = sessionProducer.GetTopic(TestName);
- using IMessageProducer producer = sessionProducer.CreateProducer(topicProducer);
-
- // First durable consumer, reads message but does not unsubscribe
- using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
- using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using (ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
- using ITopic topic = session.GetTopic(TestName);
- using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+ string subscriptionName = "mySubscriptionName";
+ ITopic topicProducer = sessionProducer.GetTopic(TestName);
+ using (IMessageProducer producer = sessionProducer.CreateProducer(topicProducer))
{
- // Purge topic
- PurgeConsumer(messageConsumer, TimeSpan.FromSeconds(0.5));
-
- ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
- producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
-
- var message = messageConsumer.Receive();
- Assert.AreEqual("text0", message.Body<string>());
- }
- }
-
- // Write some more messages while subscription is closed
- for (int t = 0; t < 3; t++)
- {
- ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
- producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
- }
-
- // Second durable consumer, reads message that were send during no-subscription and unsubscribe
- using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
- using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
- {
- using ITopic topic = session.GetTopic(TestName);
- using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
- {
- for (int t = 1; t <= 3; t++)
+ // First durable consumer, reads message but does not unsubscribe
+ using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
+ using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
- var message = messageConsumer.Receive();
- Assert.AreEqual("text" + t, message.Body<string>());
+ using (ITopic topic = session.GetTopic(TestName))
+ using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+ {
+ // Purge topic
+ PurgeConsumer(messageConsumer, TimeSpan.FromSeconds(0.5));
+
+ ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
+ producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+
+ var message = messageConsumer.Receive();
+ Assert.AreEqual("text0", message.Body<string>());
+ }
}
- // Assert topic is empty after those msgs
- var msgAtTheEnd = messageConsumer.Receive(TimeSpan.FromSeconds(1));
- Assert.IsNull(msgAtTheEnd);
+ // Write some more messages while subscription is closed
+ for (int t = 0; t < 3; t++)
+ {
+ ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
+ producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+ }
- Assert.Throws<IllegalStateException>(() => session.Unsubscribe(subscriptionName)); // Error unsubscribing while consumer is on
+ // Second durable consumer, reads message that were send during no-subscription and unsubscribe
+ using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
+ using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ using (ITopic topic = session.GetTopic(TestName))
+ using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+ {
+ for (int t = 1; t <= 3; t++)
+ {
+ var message = messageConsumer.Receive();
+ Assert.AreEqual("text" + t, message.Body<string>());
+ }
+
+ // Assert topic is empty after those msgs
+ var msgAtTheEnd = messageConsumer.Receive(TimeSpan.FromSeconds(1));
+ Assert.IsNull(msgAtTheEnd);
+
+ Assert.Throws<IllegalStateException>(() => session.Unsubscribe(subscriptionName)); // Error unsubscribing while consumer is on
+ }
+
+ session.Unsubscribe(subscriptionName);
+ }
+
+
+ // Send some messages again to verify we will not get them when create durable subscription
+ for (int t = 0; t < 3; t++)
+ {
+ ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
+ producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+ }
+
+ // Third durable subscriber, expect NOT to read messages during no-subscription period
+ using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
+ using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ using (ITopic topic = session.GetTopic(TestName))
+ using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+ {
+ // Assert topic is empty
+ var msgAtTheEnd = messageConsumer.Receive(TimeSpan.FromSeconds(1));
+ Assert.IsNull(msgAtTheEnd);
+ }
+
+ // And unsubscribe again
+ session.Unsubscribe(subscriptionName);
+ }
}
-
- session.Unsubscribe(subscriptionName);
- }
-
-
- // Send some messages again to verify we will not get them when create durable subscription
- for (int t = 0; t < 3; t++)
- {
- ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
- producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
- }
-
- // Third durable subscriber, expect NOT to read messages during no-subscription period
- using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
- using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
- {
- using ITopic topic = session.GetTopic(TestName);
- using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
- {
- // Assert topic is empty
- var msgAtTheEnd = messageConsumer.Receive(TimeSpan.FromSeconds(1));
- Assert.IsNull(msgAtTheEnd);
- }
-
- // And unsubscribe again
- session.Unsubscribe(subscriptionName);
}
}
@@ -182,12 +184,12 @@
[Test, Timeout(60_000)]
public void TestSharedSubscription()
{
- IMessageConsumer GetConsumer(string subscriptionName, String clientId)
+ IMessageConsumer GetConsumer(string subName, String clientId)
{
var connection = CreateAmqpConnectionStarted(clientId);
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
var topic = session.GetTopic(TestName);
- var messageConsumer = session.CreateSharedConsumer(topic, subscriptionName);
+ var messageConsumer = session.CreateSharedConsumer(topic, subName);
return messageConsumer;
}
@@ -195,7 +197,7 @@
Connection.Start();
string subscriptionName = "mySubscriptionName";
-
+
var receivedMessages = new List<int>();
@@ -211,34 +213,38 @@
receivedMessages.Add(2);
msg.Acknowledge();
};
-
+
// Now send some messages
- using ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
- ITopic topicProducer = sessionProducer.GetTopic(TestName);
- using IMessageProducer producer = sessionProducer.CreateProducer(topicProducer);
- for (int t = 0; t < 10; t++)
+ using (ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
- ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + t);
- producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+ ITopic topicProducer = sessionProducer.GetTopic(TestName);
+ using (IMessageProducer producer = sessionProducer.CreateProducer(topicProducer))
+ {
+ for (int t = 0; t < 10; t++)
+ {
+ ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + t);
+ producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+ }
+ }
}
// Give it some time to process
Thread.Sleep(TimeSpan.FromSeconds(2));
-
+
// Assert message was routed to multiple consumers
Assert.AreEqual(2, receivedMessages.Distinct().Count());
Assert.AreEqual(10, receivedMessages.Count);
}
-
+
[Test, Timeout(60_000)]
public void TestSharedDurableSubscription()
{
- (IMessageConsumer,ISession,IConnection) GetConsumer(string subscriptionName, String clientId)
+ (IMessageConsumer, ISession, IConnection) GetConsumer(string subName, String clientId)
{
var connection = CreateAmqpConnection(clientId);
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
var topic = session.GetTopic(TestName);
- var messageConsumer = session.CreateSharedDurableConsumer(topic, subscriptionName);
+ var messageConsumer = session.CreateSharedDurableConsumer(topic, subName);
return (messageConsumer, session, connection);
}
@@ -246,63 +252,67 @@
Connection.Start();
string subscriptionName = "mySubscriptionName";
- int messageSendCount = 1099;
+ int messageSendCount = 1099;
var receivedMessages = new ConcurrentBag<int>();
-
+
IConnection connectionConsumer1, connectionConsumer2;
IMessageConsumer messageConsumer1, messageConsumer2;
-
+
(messageConsumer1, _, connectionConsumer1) = GetConsumer(subscriptionName, null);
(messageConsumer2, _, connectionConsumer2) = GetConsumer(subscriptionName, null);
connectionConsumer1.Start();
connectionConsumer2.Start();
-
+
messageConsumer1.Close();
messageConsumer2.Close();
connectionConsumer1.Close();
connectionConsumer2.Close();
-
- // Now send some messages
- using ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
- ITopic topicProducer = sessionProducer.GetTopic(TestName);
- using IMessageProducer producer = sessionProducer.CreateProducer(topicProducer);
- for (int t = 0; t < messageSendCount; t++)
- {
- ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + t);
- producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
- }
- // Create consumers again and expect messages to be delivered to them
- ISession sessionConsumer1, sessionConsumer2;
- (messageConsumer1, sessionConsumer1, connectionConsumer1) = GetConsumer(subscriptionName, null);
- (messageConsumer2, sessionConsumer2, connectionConsumer2) = GetConsumer(subscriptionName, null);
- messageConsumer1.Listener += (msg) =>
+ // Now send some messages
+ using (ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
- receivedMessages.Add(1);
- msg.Acknowledge();
- };
- messageConsumer2.Listener += (msg) =>
- {
- receivedMessages.Add(2);
- msg.Acknowledge();
- };
- Task.Run(() => connectionConsumer1.Start()); // parallel to give both consumers chance to start at the same time
- Task.Run(() => connectionConsumer2.Start());
-
- // Give it some time to process
- Thread.Sleep(TimeSpan.FromSeconds(5));
-
- // Assert message was routed to multiple consumers
- Assert.AreEqual(2, receivedMessages.Distinct().Count());
- Assert.AreEqual(messageSendCount, receivedMessages.Count);
-
- messageConsumer1.Close();
- messageConsumer2.Close();
- sessionConsumer1.Unsubscribe(subscriptionName);
- sessionConsumer2.Unsubscribe(subscriptionName);
-
+ ITopic topicProducer = sessionProducer.GetTopic(TestName);
+ using (IMessageProducer producer = sessionProducer.CreateProducer(topicProducer))
+ {
+ for (int t = 0; t < messageSendCount; t++)
+ {
+ ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + t);
+ producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+ }
+ }
+
+ // Create consumers again and expect messages to be delivered to them
+ ISession sessionConsumer1, sessionConsumer2;
+ (messageConsumer1, sessionConsumer1, connectionConsumer1) = GetConsumer(subscriptionName, null);
+ (messageConsumer2, sessionConsumer2, connectionConsumer2) = GetConsumer(subscriptionName, null);
+ messageConsumer1.Listener += (msg) =>
+ {
+ receivedMessages.Add(1);
+ msg.Acknowledge();
+ };
+ messageConsumer2.Listener += (msg) =>
+ {
+ receivedMessages.Add(2);
+ msg.Acknowledge();
+ };
+ Task.Run(() => connectionConsumer1.Start()); // parallel to give both consumers chance to start at the same time
+ Task.Run(() => connectionConsumer2.Start());
+
+
+ // Give it some time to process
+ Thread.Sleep(TimeSpan.FromSeconds(5));
+
+ // Assert message was routed to multiple consumers
+ Assert.AreEqual(2, receivedMessages.Distinct().Count());
+ Assert.AreEqual(messageSendCount, receivedMessages.Count);
+
+ messageConsumer1.Close();
+ messageConsumer2.Close();
+ sessionConsumer1.Unsubscribe(subscriptionName);
+ sessionConsumer2.Unsubscribe(subscriptionName);
+ }
}
@@ -310,13 +320,13 @@
public void TestSelectNoLocal()
{
PurgeTopic(TimeSpan.FromMilliseconds(500));
-
+
Connection = CreateAmqpConnection();
Connection.Start();
-
+
ISession session = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
ITopic topic = session.GetTopic(TestName);
- IMessageProducer producer = session.CreateProducer(topic);
+ IMessageProducer producer = session.CreateProducer(topic);
ITextMessage message = session.CreateTextMessage("text");
producer.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
IMessageConsumer messageConsumer = session.CreateConsumer(topic, null, noLocal: true);
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs
index 5bd182d..596e200 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs
@@ -58,8 +58,8 @@
DateTime sendTime = DateTime.Now;
- ITextMessage message = session.CreateTextMessage("Hello");
- producer.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+ ITextMessage messageToSend = session.CreateTextMessage("Hello");
+ producer.Send(messageToSend, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
// Wait that delivery delay
Thread.Sleep(deliveryDelay);