blob: 473b0f79e4cec8bc7771aaba0303c7d3f7e1edf7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Specialized;
using System.Collections.Generic;
using NUnit.Framework;
using NUnit.Framework.Interfaces;
using Apache.NMS;
using Apache.NMS.AMQP.Test.Util;
using Apache.NMS.AMQP.Test.Attribute;
using System.Text;
namespace Apache.NMS.AMQP.Test.TestCase
{
[TestFixture]
class ConsumerTest : BaseTestCase
{
const string CUSTOM_CLIENT_ID = "foobar";
public override void Setup()
{
base.Setup();
msgCount = 0;
asyncEx = null;
waiter = new System.Threading.ManualResetEvent(false);
}
protected void SendMessages(IMessageProducer producer, int messageCount)
{
ITextMessage message = producer.CreateTextMessage();
for (int i = 0; i < messageCount; i++)
{
message.Text = "num:" + i;
producer.Send(message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.Zero);
}
}
[Test]
[ConnectionSetup(null, "default")]
[SessionSetup("default", "default", AckMode = AcknowledgementMode.AutoAcknowledge)]
[TopicSetup("default", "testdest1", Name = "nms.test")]
[ConsumerSetup("default", "testdest1", "con1")]
[ProducerSetup("default", "testdest1", "pro1")]
public void TestReceiveMessageSync()
{
const int NUM_MSGS = 100;
const int CONUSMER_TIMEOUT = 25; // 25 ms to pull from consumer.
const int BACKOFF_SLEEP = 500; // 0.5 secs to sleep when consumer pull times out.
using (IConnection connection = this.GetConnection())
using (IMessageConsumer consumer = this.GetConsumer("con1"))
using (IMessageProducer producer = this.GetProducer("pro1"))
{
try
{
connection.ExceptionListener += DefaultExceptionListener;
connection.Start();
SendMessages(producer, NUM_MSGS);
ITextMessage textMessage = null;
IMessage message = null;
int backoffCount = 0;
DateTime start = DateTime.Now;
TimeSpan duration;
while (msgCount < NUM_MSGS && backoffCount < 3)
{
while ((message = consumer.Receive(TimeSpan.FromMilliseconds(CONUSMER_TIMEOUT))) != null)
{
textMessage = message as ITextMessage;
Assert.AreEqual("num:" + msgCount, textMessage.Text, "Received message out of order.");
msgCount++;
}
backoffCount++;
if (backoffCount < 3 && msgCount < NUM_MSGS)
{
System.Threading.Thread.Sleep(BACKOFF_SLEEP);
}
}
duration = (DateTime.Now - start);
int milis = Convert.ToInt32(duration.TotalMilliseconds);
Assert.AreEqual(NUM_MSGS, msgCount, "Received {0} messages out of {1} in {0}ms.", msgCount, NUM_MSGS, milis);
}
catch (Exception ex)
{
this.PrintTestFailureAndAssert(this.GetMethodName(), "Unexpected Exception.", ex);
}
}
}
[Test]
[ConnectionSetup(null, "default")]
[SessionSetup("default", "default", AckMode = AcknowledgementMode.AutoAcknowledge)]
[TopicSetup("default", "testdest1", Name = "nms.test")]
[ConsumerSetup("default", "testdest1", "con1")]
[ProducerSetup("default", "testdest1", "pro1", DeliveryMode = MsgDeliveryMode.NonPersistent)]
public void TestReceiveStopStartMessageAsync()
{
const int NUM_MSGS = 100;
const int MESSAGE_RECEIVE_TIMEOUT = 1000; // 1.0s
//Apache.NMS.Util.Atomic<bool> stopped = new Apache.NMS.Util.Atomic<bool>(false);
using (IConnection connection = this.GetConnection())
using (IMessageConsumer consumer = this.GetConsumer("con1"))
using (IMessageProducer producer = this.GetProducer("pro1"))
{
try
{
MessageListener ackcallback = CreateListener(NUM_MSGS);
MessageListener callback = (m) =>
{
if (!connection.IsStarted)
{
waiter.Set();
throw new Exception("Received Message " + msgCount + " on stopped connection.");
}
ackcallback(m);
Logger.Info("MsgCount : " + msgCount);
};
consumer.Listener += callback;
connection.ExceptionListener += DefaultExceptionListener;
connection.Start();
ITextMessage message = producer.CreateTextMessage();
for (int i = 0; i < NUM_MSGS; i++)
{
message.Text = "num:" + i;
producer.Send(message);
if (i == NUM_MSGS / 2)
{
connection.Stop();
Logger.Info("Stopping Connection. At message " + i + ".");
}
else if (i == NUM_MSGS - 1)
{
Logger.Info("Starting Connection. At message " + i + ".");
connection.Start();
}
}
if (!waiter.WaitOne(MESSAGE_RECEIVE_TIMEOUT))
{
Assert.IsNull(asyncEx, "OnExceptionListener Event when raised. Message : {0}.", asyncEx?.Message);
Assert.Fail("Received {0} of {1} messages in {2}ms.", msgCount, NUM_MSGS, MESSAGE_RECEIVE_TIMEOUT);
}
Assert.IsNull(asyncEx, "OnExceptionListener Event when raised. Message : {0}.", asyncEx?.Message);
Assert.AreEqual(NUM_MSGS, msgCount, "Failed to Received All Messages sent.");
}
catch (Exception ex)
{
Logger.Warn("Async execption: " + this.GetTestException(asyncEx));
this.PrintTestFailureAndAssert(this.GetMethodName(), "Unexpected Exception.", ex);
}
}
}
[Test]
//[Repeat(1000)]
[ConnectionSetup(null, "default")]
[SessionSetup("default", "default", AckMode = AcknowledgementMode.ClientAcknowledge)]
[TopicSetup("default", "testdest1", Name = "nms.test")]
[ConsumerSetup("default", "testdest1", "con1")]
[ProducerSetup("default", "testdest1", "pro1", DeliveryMode = MsgDeliveryMode.Persistent)]
public void TestReceiveMessageAsyncClientAck()
{
const int NUM_MSGS = 100;
const int MESSAGE_RECEIVE_TIMEOUT = 1000; // 1.0s
//Apache.NMS.Util.Atomic<bool> stopped = new Apache.NMS.Util.Atomic<bool>(false);
using (IConnection connection = this.GetConnection())
using (ISession session = this.GetSession("default"))
using (IMessageConsumer consumer = this.GetConsumer("con1"))
using (IMessageProducer producer = this.GetProducer("pro1"))
{
try
{
Assert.AreEqual(AcknowledgementMode.ClientAcknowledge, session.AcknowledgementMode, "Session acknowkedgement mode is not Client mode.");
MessageListener ackcallback = CreateListener(NUM_MSGS);
MessageListener callback = (m) =>
{
if (!connection.IsStarted)
{
waiter.Set();
throw new Exception("Received Message " + msgCount + " on stopped connection.");
}
m.Properties["NMS.AMQP.ACK.TYPE"] = 0; // AMQP Accepted
m.Acknowledge();
ackcallback(m);
Logger.Info("MsgCount : " + msgCount);
};
consumer.Listener += callback;
connection.ExceptionListener += DefaultExceptionListener;
connection.Start();
ITextMessage message = producer.CreateTextMessage();
for (int i = 0; i < NUM_MSGS; i++)
{
message.Text = "num:" + i;
producer.Send(message,
MsgDeliveryMode.NonPersistent,
MsgPriority.Normal,
TimeSpan.Zero);
}
if (!waiter.WaitOne(MESSAGE_RECEIVE_TIMEOUT))
{
Assert.IsNull(asyncEx, "OnExceptionListener Event when raised. Message : {0}.", asyncEx?.Message);
Assert.Fail("Received {0} of {1} messages in {2}ms.", msgCount, NUM_MSGS, MESSAGE_RECEIVE_TIMEOUT);
}
Assert.IsNull(asyncEx, "OnExceptionListener Event when raised. Message : {0}.", asyncEx?.Message);
Assert.AreEqual(NUM_MSGS, msgCount, "Failed to Received All Messages sent.");
}
catch (Exception ex)
{
if (asyncEx != null)
Logger.Warn("Async execption: " + this.GetTestException(asyncEx));
this.PrintTestFailureAndAssert(this.GetMethodName(), "Unexpected Exception.", ex);
}
}
}
[Test]
//[Repeat(1000)]
[ConnectionSetup(null, "default")]
[SessionSetup("default", "default", AckMode = AcknowledgementMode.AutoAcknowledge)]
[TopicSetup("default", "testdest1", Name = "nms.test")]
[ConsumerSetup("default", "testdest1", "con1")]
[ProducerSetup("default", "testdest1", "pro1", DeliveryMode = MsgDeliveryMode.NonPersistent)]
public void TestReceiveMessageAsync()
{
const int NUM_MSGS = 100;
const int MESSAGE_RECEIVE_TIMEOUT = 1000; // 1.0s
//Apache.NMS.Util.Atomic<bool> stopped = new Apache.NMS.Util.Atomic<bool>(false);
using (IConnection connection = this.GetConnection())
using (IMessageConsumer consumer = this.GetConsumer("con1"))
using (IMessageProducer producer = this.GetProducer("pro1"))
{
try
{
MessageListener ackcallback = CreateListener(NUM_MSGS);
MessageListener callback = (m) =>
{
if (!connection.IsStarted)
{
waiter.Set();
throw new Exception("Received Message " + msgCount + " on stopped connection.");
}
ackcallback(m);
Logger.Info("MsgCount : " + msgCount);
};
consumer.Listener += callback;
connection.ExceptionListener += DefaultExceptionListener;
connection.Start();
ITextMessage message = producer.CreateTextMessage();
for (int i = 0; i < NUM_MSGS; i++)
{
message.Text = "num:" + i;
producer.Send(message);
}
if (!waiter.WaitOne(MESSAGE_RECEIVE_TIMEOUT))
{
Assert.IsNull(asyncEx, "OnExceptionListener Event when raised. Message : {0}.", asyncEx?.Message);
Assert.Fail("Received {0} of {1} messages in {2}ms.", msgCount, NUM_MSGS, MESSAGE_RECEIVE_TIMEOUT);
}
Assert.IsNull(asyncEx, "OnExceptionListener Event when raised. Message : {0}.", asyncEx?.Message);
Assert.AreEqual(NUM_MSGS, msgCount, "Failed to Received All Messages sent.");
}
catch (Exception ex)
{
Logger.Warn("Async execption: " + this.GetTestException(asyncEx));
this.PrintTestFailureAndAssert(this.GetMethodName(), "Unexpected Exception.", ex);
}
}
}
private void LogSummary(long[] missingCount, long[] lastId, long Msgbatch)
{
long missingMsgs = 0;
long undeliveredMsgs = 0;
for (int i = 0; i < missingCount.Length; i++)
{
missingMsgs += missingCount[i];
undeliveredMsgs += Msgbatch - lastId[i] - 1;
}
long TotalMsgsDelivered = msgCount + missingMsgs + undeliveredMsgs;
Logger.Warn(string.Format("Msgs Received: {0}, Msgs Lost: {1}, Msgs Remaining Delivery: {2} TotalMsgs: {3}",
msgCount, missingMsgs, undeliveredMsgs, TotalMsgsDelivered));
}
[Test]
[ConnectionSetup(null, "default")]
[SessionSetup("default", "default", AckMode = AcknowledgementMode.AutoAcknowledge)]
[TopicSetup("default", "testdest1", Name = "nms.test.1")]
[TopicSetup("default", "testdest2", Name = "nms.test.2")]
[TopicSetup("default", "testdest3", Name = "nms.test.3")]
[TopicSetup("default", "testdest4", Name = "nms.test.4")]
[ConsumerSetup("default", "testdest1", "con1")]
[ConsumerSetup("default", "testdest2", "con2")]
[ConsumerSetup("default", "testdest3", "con3")]
[ConsumerSetup("default", "testdest4", "con4")]//*/
[ProducerSetup("default", "testdest1", "pro1")]
[ProducerSetup("default", "testdest2", "pro2")]
[ProducerSetup("default", "testdest3", "pro3")]
[ProducerSetup("default", "testdest4", "pro4")]//*/
public void TestMultipleConsumerMessageListenerReceive()
{
const int NUM_MSGS = 100000;
string[] conIds = new string[] { "con1", "con2", "con3", "con4" };
string[] proIds = new string[] { "pro1", "pro2", "pro3", "pro4" };
IList<IMessageConsumer> consumers = this.GetConsumers(conIds);
IList<IMessageProducer> producers = this.GetProducers(proIds);
int ProducerCount = producers.Count;
int ConsumerCount = consumers.Count;
long[] lastMsgId = new long[ConsumerCount];
long[] MissingMsgCount = new long[ConsumerCount];
long lostMsgCount = 0;
int TotalMsgs = NUM_MSGS * ProducerCount;
long undeliveredMsgCount = TotalMsgs;
int timeout = Math.Max(TotalMsgs / 1000, 1) * 1100;
MsgDeliveryMode mode = MsgDeliveryMode.NonPersistent;
using (IConnection connection = this.GetConnection("default"))
{
connection.ExceptionListener += DefaultExceptionListener;
for (int i = 0; i < ConsumerCount; i++)
{
lastMsgId[i] = -1;
MissingMsgCount[i] = 0;
}
try
{
MessageListener callback = CreateListener(TotalMsgs);
int index = 0;
foreach (IMessageConsumer consumer in consumers)
{
int num = index;
MessageListener countingCallback = (m) =>
{
long lastId = lastMsgId[num];
long msgId = ExtractMsgId(m.NMSMessageId);
if (msgId > lastId)
{
lastMsgId[num] = msgId;
if (lastId != -1)
{
MissingMsgCount[num] += (msgId - (lastId + 1));
lostMsgCount += (msgId - (lastId + 1));
}
}
callback(m);
// signal envent waiter when the last expected msg is delivered on all consumers
if (lostMsgCount + msgCount == TotalMsgs)
{
// signal if detected lost msgs from id gap and delivered msgs make the total msgs.
waiter?.Set();
}
else if (lastMsgId[num] == NUM_MSGS - 1)
{
// signal if final msg id on every consumer is detected.
undeliveredMsgCount -= NUM_MSGS;
if (undeliveredMsgCount <= 0)
waiter?.Set();
}
};
consumer.Listener += countingCallback;
index++;
}
connection.Start();
// send messages to Destinations
ITextMessage sendMsg = producers[0].CreateTextMessage();
for (int i = 0; i < NUM_MSGS; i++)
{
int link = 0;
foreach (IMessageProducer producer in producers)
{
sendMsg.Text = "Link: " + link + ", num:" + i;
link++;
producer.Send(sendMsg, mode, MsgPriority.Normal, TimeSpan.Zero);
}
}
if (!waiter.WaitOne(timeout))
{
if (mode.Equals(MsgDeliveryMode.NonPersistent))
{
if (msgCount != TotalMsgs)
{
Logger.Warn(string.Format("Only received {0} of {1} messages in {2}ms.", msgCount, TotalMsgs, timeout));
LogSummary(MissingMsgCount, lastMsgId, NUM_MSGS);
}
Assert.IsNull(asyncEx, "Received unexpected asynchronous exception. Message : {0}", asyncEx?.Message);
}
else
{
Assert.Fail("Only received {0} of {1} messages in {2}ms.", msgCount, TotalMsgs, timeout);
}
}
else
{
if (msgCount != TotalMsgs)
{
Logger.Warn(string.Format("Only received {0} of {1} messages in {2}ms.", msgCount, TotalMsgs, timeout));
LogSummary(MissingMsgCount, lastMsgId, NUM_MSGS);
}
Assert.IsNull(asyncEx, "Received unexpected asynchronous exception. Message : {0}", asyncEx?.Message);
long ActualMsgs = mode.Equals(MsgDeliveryMode.NonPersistent) ? msgCount + lostMsgCount : msgCount;
Assert.AreEqual(TotalMsgs, ActualMsgs, "Only received {0} (delivered = {1}, lost = {2}) of {3} messages in {4}ms.", ActualMsgs, msgCount, lostMsgCount, TotalMsgs, timeout);
}
}
catch (Exception ex)
{
this.PrintTestFailureAndAssert(GetTestMethodName(), "Unexpected Exception.", ex);
}
finally
{
// sleep for 2 seconds to allow for pending procuder acknowledgements to be received from the broker.
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));
}
}
}
[Test]
[ConnectionSetup(null, "default")]
[ConnectionSetup(null, "sender")]
[SessionSetup("sender", "s1", AckMode = AcknowledgementMode.AutoAcknowledge)]
[SessionSetup("default", "default", AckMode = AcknowledgementMode.AutoAcknowledge)]
[TopicSetup("default", "testdest1", Name = "nms.test")]
[ConsumerSetup("default", "testdest1", "con1")]
[ProducerSetup("s1", "testdest1", "pro1")]
public void TestMessageListenerThrowsOnCloseInMessageCallback()
{
const int NUM_MSGS = 2;
const int MESSAGE_RECEIVE_TIMEOUT = 5000; // 5.0s
using (IConnection connection = this.GetConnection("default"))
using (IMessageConsumer consumer = this.GetConsumer("con1"))
using (IMessageProducer producer = this.GetProducer("pro1"))
{
try
{
//bool close = false;
MessageListener ackcallback = CreateListener(NUM_MSGS);
MessageListener callback = (m) =>
{
try
{
connection.Close();
}
catch (NMSException ex)
{
asyncEx = ex;
}
ackcallback(m);
};
consumer.Listener += callback;
connection.ExceptionListener += DefaultExceptionListener;
connection.Start();
ITextMessage message = producer.CreateTextMessage();
// Send a couple of messages and verify that an exception is
// thrown if the listener trys to Close() the connection.
producer.Send(message);
producer.Send(message);
if (!waiter.WaitOne(MESSAGE_RECEIVE_TIMEOUT))
{
Assert.Fail("Received {0} of {1} messages in {2}ms.", msgCount, NUM_MSGS, MESSAGE_RECEIVE_TIMEOUT);
}
connection.Stop();
Assert.NotNull(asyncEx, "Failed to receive Exception on MessageListener after {0} messages.", msgCount);
Assert.True(asyncEx is IllegalStateException, "Failed to Recieve the correct IllegalStateException Exception, received : {0}", asyncEx);
}
catch (Exception ex)
{
this.PrintTestFailureAndAssert(this.GetMethodName(), "Unexpected Exception.", ex);
}
}
}
[Test]
[ConnectionSetup(null, "default")]
[SessionSetup("default", "default")]
[QueueSetup("default", "q1", Name = "nms.queue")]
[ConsumerSetup("default", "q1", new[] { "c1", "c2", "c3" })]
public void TestConsumerMessageListenerEventAddRemove()
{
IMessageConsumer consumer = null;
IMessage msg = null;
using (IConnection connection = GetConnection("default"))
{
consumer = GetConsumer("c1");
try
{
consumer.Listener += DefaultMessageListener;
}
catch (NMSException ex)
{
this.PrintTestFailureAndAssert(this.GetTestMethodName(), "Unexpected Exception.", ex);
}
try
{
msg = consumer.ReceiveNoWait();
Assert.Fail("Expected Exception when receiving synchously on asynchrous consumer.");
}
catch (NMSException ex)
{
Assert.IsTrue(ex is IllegalStateException, "Expected exception was not IllealStateException. Exception: {0}", ex);
Assert.IsTrue(
ex.Message.StartsWith("Cannot synchronously receive message on a synchronous consumer"),
"Exception Message does not match. Message: {0}", ex.Message
);
}
connection.Start();
consumer = GetConsumer("c2");
try
{
consumer.Listener += DefaultMessageListener;
Assert.Fail("Expected Exception when adding callback event on started connection.");
}
catch (NMSException ex)
{
Assert.IsTrue(ex is IllegalStateException, "Expected exception was not IllealStateException. Exception: {0}", ex);
Assert.IsTrue(
ex.Message.StartsWith("Cannot add MessageListener to consumer"),
"Exception Message does not match. Message: {0}", ex.Message
);
}
try
{
consumer.Listener -= DefaultMessageListener;
Assert.Fail("Expected Exception when removing callback event on started connection.");
}
catch (NMSException ex)
{
Assert.IsTrue(ex is IllegalStateException, "Expected exception was not IllealStateException. Exception: {0}", ex);
Assert.IsTrue(
ex.Message.StartsWith("Cannot remove MessageListener to consumer"),
"Exception Message does not match. Message: {0}", ex.Message
);
}
}
}
/*
* Test Durable Consumer Create (subscribe) and Unsubscribe capabilities.
*/
[Test]
[ConnectionSetup(null,"c1")]
[SessionSetup("c1", "s1")]
[TopicSetup("s1", "dt1", Name = DURABLE_TOPIC_NAME)]
public void TestDurableConsumerCreateUnsubscribe()
{
string name = DURABLE_SUBSRIPTION_NAME;
bool cleaned = false;
IMessageConsumer durableConsumer = null;
using (IConnection connection = GetConnection("c1"))
using (ISession session = GetSession("s1"))
{
try
{
ITopic topic = GetDestination("dt1") as ITopic;
durableConsumer = session.CreateDurableConsumer(topic, name, null, false);
try
{
IMessageConsumer other = session.CreateDurableConsumer(topic, name, null, false);
Assert.Fail("Expected exception for using the same durable consumer name {0} on the same connection", name);
}
catch (NMSException nmse)
{
Assert.AreEqual(nmse.ErrorCode, "nms:internal");
}
try
{
session.DeleteDurableConsumer(name);
Assert.Fail("Expected exception for deleting active durable consumer {0}.", name);
}
catch(IllegalStateException)
{
// pass
}
// make durable consumer inactive
durableConsumer.Close();
durableConsumer = null;
// unsubscribe the subscription
session.DeleteDurableConsumer(name);
cleaned = true;
try
{
session.DeleteDurableConsumer(name);
Assert.Fail("Expected InvalidDestinationException On Delete Consumer Operation for non-existent durable consumer with name {0}", name);
}
catch (InvalidDestinationException ide)
{
// pass
Logger.Info(ide.Message);
}
}
catch (Exception ex)
{
this.PrintTestFailureAndAssert(this.GetTestMethodName(), "Unexpected Exception.", ex);
}
finally
{
if(durableConsumer != null)
{
durableConsumer.Close();
}
if (!cleaned)
{
try
{
session.DeleteDurableConsumer(name);
}
catch (Exception ex)
{
Logger.Warn(string.Format("Failed to clean up Durable Consumer {0}. Cause : {1}", name, ex));
}
}
}
}
}
/*
* Tests the durability accross multiple connections. This test will create three connections where one
* connection creates the durable subscription and consumes from it, one the sends message after its created,
* and one where the consumer reactiviates the subscription and consumes message while the subscription
* was inactive. The test then deletes the subscription using the producer connection.
*/
[Test]
[ConnectionSetup(null, "c1", ClientId = CUSTOM_CLIENT_ID)]
[ConnectionSetup(null, "c2", "c3")]
[SessionSetup("c1", "s1")]
[SessionSetup("c3", "s2")]
[TopicSetup("s1", "t1", Name = DURABLE_TOPIC_NAME)]
[ProducerSetup("s2", "t1", "sender", DeliveryMode = MsgDeliveryMode.NonPersistent)]
public void TestSubscriptionDurabilityOverConnections()
{
const int MSG_BATCH_SIZE = 100;
const int TIMEOUT = 1000 * MSG_BATCH_SIZE;
const string MSG_BATCH_ID_PROP_KEY = "batch_id";
string subName = DURABLE_SUBSRIPTION_NAME;
ITopic topic = this.GetDestination("t1") as ITopic;
int msgSentCount = 0;
bool cleaned = false;
try
{
string clientId = null;
IMessageProducer producer = this.GetProducer("sender");
ITextMessage sendMsg = producer.CreateTextMessage();
using (IConnection initialConnection = this.GetConnection("c1"))
{
initialConnection.ExceptionListener += DefaultExceptionListener;
clientId = initialConnection.ClientId;
// create subscription
ISession subFactory = this.GetSession("s1");
IMessageConsumer durableConsumer = subFactory.CreateDurableConsumer(topic, subName, null, false);
durableConsumer.Listener += CreateListener(MSG_BATCH_SIZE);
// send messages to subscription
for (int i = 0; i < MSG_BATCH_SIZE; i++)
{
sendMsg.Text = String.Format("msg : {0}", msgSentCount);
sendMsg.Properties.SetInt(MSG_BATCH_ID_PROP_KEY, i);
producer.Send(sendMsg);
msgSentCount++;
}
initialConnection.Start();
// assert messages are received
Assert.IsTrue(this.waiter.WaitOne(TIMEOUT),
"Timed out waiting to receive messages, received {0} of {1} in {2}ms",
msgCount, msgSentCount, TIMEOUT);
Assert.IsNull(asyncEx, "Caught asynchronous exception. {0}", asyncEx?.ToString());
Assert.AreEqual(msgSentCount, msgCount, "Messages sent do not match messages received");
}
// initial connection is closed and the subscription is inactive.
// send more messages to the subscription.
for (int i = 0; i < MSG_BATCH_SIZE; i++)
{
sendMsg.Text = String.Format("msg : {0}", msgSentCount);
sendMsg.Properties.SetInt(MSG_BATCH_ID_PROP_KEY, i);
producer.Send(sendMsg);
msgSentCount++;
}
this.waiter.Reset();
// re-activate the subscription
using (IConnection connection = this.GetConnection("c2"))
{
connection.ClientId = clientId;
connection.ExceptionListener += DefaultExceptionListener;
ISession subscriptionFactory = connection.CreateSession();
IMessageConsumer durableConsumer = subscriptionFactory.CreateDurableConsumer(topic, subName, null, false);
durableConsumer.Listener += CreateListener(MSG_BATCH_SIZE * 2);
connection.Start();
// assert messages sent while inactive are received
Assert.IsTrue(this.waiter.WaitOne(TIMEOUT),
"Timed out waiting to receive messages, received {0} of {1} in {2}ms",
msgCount, msgSentCount, TIMEOUT);
Assert.IsNull(asyncEx, "Caught asynchronous exception. {0}", asyncEx?.ToString());
Assert.AreEqual(msgSentCount, msgCount, "Messages sent while inactive do not match messages received");
// unsubscribe
durableConsumer.Close();
subscriptionFactory.DeleteDurableConsumer(subName);
durableConsumer = null;
cleaned = true;
}
}
catch (Exception ex)
{
this.PrintTestFailureAndAssert(this.GetTestMethodName(), "Unexpected Exception.", ex);
}
finally
{
// clean up code to prevent leaving a durbale subscription on the test broker.
if (!cleaned)
{
try
{
this.GetSession("s2").DeleteDurableConsumer(subName);
}
catch (InvalidDestinationException ide)
{
Logger.Info(string.Format("Unable to unsubscribe from {0}, Cause : {1}", subName, ide));
}
catch (Exception ex)
{
Logger.Warn(string.Format("Caught unexpected failure while unsubscribing from {0}. Failure : {1}", subName, ex));
}
}
}
}
// next test
}
}