blob: 57388b02adaf95bde4fba0a3505f99bba877d48c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Text;
using System.Collections.Specialized;
using System.Collections.Generic;
using NUnit.Framework;
using NUnit.Framework.Interfaces;
using Apache.NMS;
using Apache.NMS.Util;
using Apache.NMS.AMQP.Test.Util;
using Apache.NMS.AMQP.Test.Attribute;
namespace Apache.NMS.AMQP.Test.TestCase
{
[TestFixture]
class ProducerTest : BaseTestCase
{
const int TIMEOUT = 15000; // 15 secs
public override void Setup()
{
base.Setup();
waiter = new System.Threading.ManualResetEvent(false);
}
[Test]
[ConnectionSetup(null, "c1")]
[SessionSetup("c1", "s1")]
[QueueSetup("s1","q1", Name = "nms.unique.queue")]
[ProducerSetup("s1", "q1","sender", DeliveryMode = MsgDeliveryMode.NonPersistent)]
public void TestProducerSend()
{
const int NUM_MSGS = 100;
try
{
using(IConnection connection = GetConnection("c1"))
using(IMessageProducer producer = GetProducer("sender"))
{
connection.ExceptionListener += DefaultExceptionListener;
ITextMessage textMessage = producer.CreateTextMessage();
for(int i=0; i<NUM_MSGS; i++)
{
textMessage.Text = "msg:" + i;
producer.Send(textMessage);
}
waiter.WaitOne(2000); // wait 2s for message to be received by broker.
Assert.IsNull(asyncEx, "Received asynchronous exception. Message : {0}", asyncEx?.Message);
}
}
catch(Exception ex)
{
this.PrintTestFailureAndAssert(GetTestMethodName(), "Unexpected exception.", ex);
}
}
[Test]
[ConnectionSetup(null, "c1")]
[SessionSetup("c1", "s1")]
[QueueSetup("s1", "q1", Name = "nmsQueue1")]
[QueueSetup("s1", "q2", Name = "nmsQueue2")]
[TopicSetup("s1", "t1", Name = "nmsTopic1")]
[TopicSetup("s1", "t2", Name = "nmsTopic2")]
[ConsumerSetup("s1", "q1", "cq1")]
[ConsumerSetup("s1", "q2", "cq2")]
[ConsumerSetup("s1", "t1", "ct1")]
[ConsumerSetup("s1", "t2", "ct2")]
public void TestAnonymousProducerSend()
{
const int NUM_MSGS = 100;
IList<IDestination> destinations = this.GetDestinations(new string[] { "q1", "q2", "t1", "t2" });
IList<IMessageConsumer> consumers = this.GetConsumers(new string[] { "cq1", "cq2", "ct1", "ct2" });
int DestinationPoolSize = destinations.Count;
int ConsumerPoolSize = consumers.Count;
IMessageProducer producer = null;
using (ISession session = GetSession("s1"))
using (IConnection connection = GetConnection("c1"))
{
try
{
connection.ExceptionListener += DefaultExceptionListener;
foreach(IMessageConsumer c in consumers)
{
c.Listener += CreateListener(NUM_MSGS);
}
connection.Start();
producer = session.CreateProducer();
producer.DeliveryMode = MsgDeliveryMode.Persistent;
ITextMessage textMessage = producer.CreateTextMessage();
for (int i = 0; i < NUM_MSGS; )
{
foreach (IDestination dest in destinations)
{
Logger.Info("Sending message " + i + " to destination " + dest.ToString());
textMessage.Text = "Num:" + dest.ToString() + ":" + i;
i++;
producer.Send(dest, textMessage);
}
}
Assert.IsTrue(waiter.WaitOne(TIMEOUT), "Failed to received all messages. Received {0} of {1} in {2}ms.", msgCount, NUM_MSGS, TIMEOUT);
}
catch (Exception ex)
{
this.PrintTestFailureAndAssert(GetTestMethodName(), "Unexpected exception.", ex);
}
}
}
[Test]
//[Repeat(25)]
[ConnectionSetup(null,"c1")]
[SessionSetup("c1","s1")]
[TopicSetup("s1","t1",Name = "nms.topic.test")]
[ConsumerSetup("s1","t1","drain")]
public void TestMultipleProducerCreateAndSend(
[Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
MsgDeliveryMode mode
)
{
const int MSG_TTL_MILLIS = 8500; // 8.5 secs
const int NUM_MSGS = 200;
const int NUM_PRODUCERS = 5;
const string MSG_ID_KEY = "MsgIndex";
const string PRODUCER_ID_KEY = "ProducerIndex";
const string PRODUCER_INDEXED_ID_KEY = "ProducerIndexedMsgId";
bool persistent = mode.Equals(MsgDeliveryMode.Persistent);
bool useMsgId = !persistent;
int msgIdWindow = 0;
string failureErr = null;
IMessageProducer producer = null;
IList<IMessageProducer> producers = null;
IList<int> lastProducerIndexedIds = null;
try
{
using (IConnection connection = this.GetConnection("c1"))
using (ISession session = this.GetSession("s1"))
using (IDestination destination = this.GetDestination("t1"))
using (IMessageConsumer drain = this.GetConsumer("drain"))
{
lastProducerIndexedIds = new List<int>();
MessageListener ackCallback = CreateListener(NUM_MSGS);
drain.Listener += (message) =>
{
if (failureErr == null)
{
ackCallback(message);
int id = message.Properties.GetInt(PRODUCER_INDEXED_ID_KEY);
int prodIndex = message.Properties.GetInt(PRODUCER_ID_KEY);
int lastId = lastProducerIndexedIds[prodIndex];
int advancedMsgs = id - lastId;
if (id < lastId)
{
failureErr = string.Format(
"Received message out of order." +
" Received, sent from producer {0} msg id {1} where last msg id {2}",
prodIndex,
id,
lastId
);
this.waiter.Set();
}
else if(persistent && advancedMsgs > 1)
{
failureErr = string.Format(
"Persistent Messages where drop." +
" Received, sent from producer {0} msg id {1} where last msg id {2}",
prodIndex,
id,
lastId
);
this.waiter.Set();
}
else
{
lastProducerIndexedIds[prodIndex] = id;
if (advancedMsgs > 1 && (Logger.IsInfoEnabled || Logger.IsDebugEnabled))
{
Logger.Info(string.Format(
"{0} Messages dropped for producer {1} from message id {2}",
advancedMsgs, prodIndex, lastId
));
}
msgIdWindow += advancedMsgs;
if (!persistent && msgIdWindow == NUM_MSGS)
{
this.waiter.Set();
}
}
}
};
connection.ExceptionListener += DefaultExceptionListener;
producers = new List<IMessageProducer>();
for (int i = 0; i < NUM_PRODUCERS; i++)
{
try
{
producer = session.CreateProducer(destination);
}
catch (Exception ex)
{
this.PrintTestFailureAndAssert(this.GetMethodName(), "Failed to Created Producer " + i, ex);
}
producer.DeliveryMode = mode;
producer.DisableMessageID = !useMsgId;
producer.TimeToLive = TimeSpan.FromMilliseconds(MSG_TTL_MILLIS);
producers.Add(producer);
lastProducerIndexedIds.Add(-1);
}
connection.Start();
Assert.AreEqual(NUM_PRODUCERS, producers.Count, "Did not create all producers.");
Assert.IsNull(asyncEx,
"Exception Listener Called While creating producers. With exception {0}.",
asyncEx);
ITextMessage msg = session.CreateTextMessage();
int producerIndex = -1;
for (int i = 0; i < NUM_MSGS; i++)
{
msg.Text = "Index:" + i;
msg.Properties[MSG_ID_KEY] = i;
msg.Properties[PRODUCER_INDEXED_ID_KEY] = i / NUM_PRODUCERS;
producerIndex = i % NUM_PRODUCERS;
msg.Properties[PRODUCER_ID_KEY] = producerIndex;
producers[producerIndex].Send(msg);
}
Assert.IsNull(asyncEx, "Exception Listener Called While sending messages. With exception {0}.", asyncEx);
Assert.IsTrue(waiter.WaitOne(TIMEOUT),
"Failed to received all messages in {0}ms. Received {1} of {2} messages",
TIMEOUT, msgCount, NUM_MSGS);
Assert.IsNull(failureErr,
"Received assertion failure from IMessageConsumer message Listener. Failure : {0}",
failureErr ?? "");
if (persistent)
{
Assert.AreEqual(NUM_MSGS, msgCount,
"Receive unexpected from messages sent. Message Window {0}", msgIdWindow);
}
else
{
int missedMsgs = (msgIdWindow - msgCount);
Assert.AreEqual(NUM_MSGS, msgIdWindow,
"Failed to receive all messages." +
" Received {0} of {1} messages, with missed messages {2}, in {3}ms",
msgCount, NUM_MSGS, missedMsgs, TIMEOUT
);
if(missedMsgs > 0)
{
System.Text.StringBuilder sb = new System.Text.StringBuilder();
const string SEPARATOR = ", ";
for(int i=0; i<NUM_PRODUCERS; i++)
{
sb.AppendFormat("Last received Producer {0} message id {1}{2}", i, lastProducerIndexedIds[i], SEPARATOR);
}
sb.Length = sb.Length - SEPARATOR.Length;
Logger.Warn(string.Format("Did not receive all Non Persistent messages. Received {0} of {1} messages. Where last received message ids = [{2}]", msgCount, NUM_MSGS, sb.ToString()));
}
}
Assert.IsNull(asyncEx, "Exception Listener Called While receiveing messages. With exception {0}.", asyncEx);
//
// Some brokers are sticklers for detail and actually honor the
// batchable flag that AMQPnetLite sets on all published messages. As
// a result all messages can be long received before the published
// messages are acknowledged. So to avoid a hand full of
// amqp:message:released outcomes, just pause a few seconds before
// closing the producer
System.Threading.Thread.Sleep(3000);
}
}
catch (Exception e)
{
this.PrintTestFailureAndAssert(this.GetMethodName(), "Unexpected Exception.", e);
}
finally
{
if(producers != null)
{
foreach(IMessageProducer p in producers)
{
p?.Close();
p?.Dispose();
}
producers.Clear();
}
}
}
#region Destination Tests
private void TestDestinationMessageDelivery(
IConnection connection,
ISession session,
IMessageProducer producer,
IDestination destination,
int msgPoolSize,
bool isDurable = false)
{
bool cleaned = !isDurable;
const string PROP_KEY = "send_msg_id";
string subName = DURABLE_SUBSRIPTION_NAME;
int TotalMsgSent = 0;
int TotalMsgRecv = 0;
MsgDeliveryMode initialMode = producer.DeliveryMode;
try
{
IMessageConsumer consumer = isDurable
?
session.CreateDurableConsumer(destination as ITopic, subName, null, false)
:
session.CreateConsumer(destination);
ITextMessage sendMessage = session.CreateTextMessage();
consumer.Listener += CreateListener(msgPoolSize);
connection.ExceptionListener += DefaultExceptionListener;
connection.Start();
for (int i = 0; i < msgPoolSize; i++)
{
sendMessage.Text = "Msg:" + i;
sendMessage.Properties.SetInt(PROP_KEY, TotalMsgSent);
producer.Send(sendMessage);
TotalMsgSent++;
}
bool signal = waiter.WaitOne(TIMEOUT);
TotalMsgRecv = msgCount;
Assert.IsTrue(signal,
"Timed out waiting to receive messages. Received {0} of {1} in {2}ms.",
msgCount, TotalMsgSent, TIMEOUT);
Assert.AreEqual(TotalMsgSent, msgCount,
"Failed to receive all messages. Received {0} of {1} in {2}ms.",
msgCount, TotalMsgSent, TIMEOUT);
// close consumer
consumer.Close();
// reset waiter
waiter.Reset();
for (int i = 0; i < msgPoolSize; i++)
{
sendMessage.Text = "Msg:" + i;
sendMessage.Properties.SetInt(PROP_KEY, TotalMsgSent);
producer.Send(sendMessage);
TotalMsgSent++;
if(isDurable || destination.IsQueue)
{
TotalMsgRecv++;
}
}
// Must stop connection before we can add a consumer
connection.Stop();
int expectedId = (isDurable || destination.IsQueue) ? msgPoolSize : TotalMsgSent;
// expectedMsgCount is 2 msgPoolSize groups for non-durable topics, one for initial send of pool size and one for final send of pool size.
// expedtedMsgCount is 3 msgPoolSize groups for queues and durable topics, same two groups for non-durable topic plus the group sent while there is no active consumer.
int expectedMsgCount = (isDurable || destination.IsQueue) ? 3 * msgPoolSize : 2 * msgPoolSize;
MessageListener callback = CreateListener(expectedMsgCount);
string errString = null;
consumer = consumer = isDurable
?
session.CreateDurableConsumer(destination as ITopic, subName, null, false)
:
session.CreateConsumer(destination);
consumer.Listener += (m) =>
{
int id = m.Properties.GetInt(PROP_KEY);
if (id != expectedId)
{
errString = string.Format("Received Message with unexpected msgId. Received msg : {0} Expected : {1}", id, expectedId);
waiter.Set();
return;
}
else
{
expectedId++;
}
callback(m);
};
// Start Connection
connection.Start();
for (int i = 0; i < msgPoolSize; i++)
{
sendMessage.Text = "Msg:" + i;
sendMessage.Properties.SetInt(PROP_KEY, TotalMsgSent);
producer.Send(sendMessage);
TotalMsgSent++;
TotalMsgRecv++;
}
signal = waiter.WaitOne(TIMEOUT);
Assert.IsNull(asyncEx, "Received asynchrounous exception. Message: {0}", asyncEx?.Message);
Assert.IsNull(errString, "Failure occured on Message Callback. Message : {0}", errString ?? "");
Assert.IsTrue(signal, "Timed out waiting for message receive. Received {0} of {1} in {2}ms.", msgCount, TotalMsgRecv, TIMEOUT);
Assert.AreEqual(TotalMsgRecv, msgCount,
"Failed to receive all messages. Received {0} of {1} in {2}ms.",
msgCount, TotalMsgRecv, TIMEOUT);
connection.Stop();
consumer.Close();
}
catch (Exception ex)
{
this.PrintTestFailureAndAssert(this.GetTestMethodName(), "Unexpected Exception", ex);
}
finally
{
if (!cleaned)
{
try
{
session.DeleteDurableConsumer(subName);
}
catch (InvalidDestinationException ide)
{
Logger.Info(string.Format("Unable to unsubscribe from {0}, Cause : {1}", subName, ide));
}
catch (Exception ex)
{
Logger.Warn(string.Format("Caught unexpected failure while unsubscribing from {0}. Failure : {1}", subName, ex));
}
}
}
}
#region Queue Destination Tests
[Test]
[Repeat(30)]
[ConnectionSetup(null, "c1")]
[SessionSetup("c1", "s1")]
[QueueSetup("s1", "q1", Name = "nms.queue")]
[ProducerSetup("s1", "q1", "sender", DeliveryMode = MsgDeliveryMode.NonPersistent)]
public void TestQueueMessageDelivery()
{
const int NUM_MSGS = 100;
IDestination destination = GetDestination("q1");
using (IConnection connection = GetConnection("c1"))
using (ISession session = GetSession("s1"))
using (IMessageProducer producer = GetProducer("sender"))
{
TestDestinationMessageDelivery(connection, session, producer, destination, NUM_MSGS);
}
}
#endregion // end queue tests
#region Topic Tests
/*
* Test topic Consumer message delivery reliability. This test expects the
* messages sent on a topic without an active consumer to be dropped.
*/
[Test]
[ConnectionSetup(null, "c1")]
[SessionSetup("c1", "s1")]
[TopicSetup("s1", "t1", Name = "nms.test")]
[ProducerSetup("s1", "t1", "sender", DeliveryMode = MsgDeliveryMode.NonPersistent)]
public void TestTopicMessageDelivery()
{
const int NUM_MSGS = 100;
IDestination destination = GetDestination("t1");
using (IConnection connection = GetConnection("c1"))
using (ISession session = GetSession("s1"))
using (IMessageProducer producer = GetProducer("sender"))
{
TestDestinationMessageDelivery(connection, session, producer, destination, NUM_MSGS);
}
}
/*
* Test Durable Topic Consumer message delivery reliability. This test expects the
* messages sent while the consumer is inactive to be retained and delivered to the
* consumer once the subscription is active again.
*/
[Test]
[Repeat(20)]
[ConnectionSetup(null, "c1")]
[SessionSetup("c1", "s1")]
[TopicSetup("s1", "t1", Name = DURABLE_TOPIC_NAME)]
[ProducerSetup("s1", "t1", "sender", DeliveryMode = MsgDeliveryMode.NonPersistent)]
public void TestDurableTopicMessageDelivery()
{
const int NUM_MSGS = 100;
IDestination destination = GetDestination("t1");
using (IConnection connection = GetConnection("c1"))
using (ISession session = GetSession("s1"))
using (IMessageProducer producer = GetProducer("sender"))
{
TestDestinationMessageDelivery(connection, session, producer, destination, NUM_MSGS, true);
}
}
#endregion // end topic tests
#region Temporary Destination Tests
[Test]
[ConnectionSetup(null, "c1")]
[SessionSetup("c1", "s1")]
[TemporaryQueueSetup("s1", "temp1")]
[ProducerSetup("s1", "temp1", "sender", DeliveryMode = MsgDeliveryMode.NonPersistent)]
[SkipTestOnRemoteBrokerProperties("c1", RemotePlatform = NMSTestConstants.NMS_SOLACE_PLATFORM)]
public void TestTemporaryQueueMessageDelivery()
{
const int NUM_MSGS = 100;
IDestination destination = GetDestination("temp1");
using (IConnection connection = GetConnection("c1"))
using (ISession session = GetSession("s1"))
using (IMessageProducer producer = GetProducer("sender"))
{
TestDestinationMessageDelivery(connection, session, producer, destination, NUM_MSGS);
}
}
[Test]
[ConnectionSetup(null, "c1")]
[SessionSetup("c1","s1")]
[TemporaryTopicSetup("s1", "temp1")]
[ProducerSetup("s1", "temp1", "sender", DeliveryMode = MsgDeliveryMode.NonPersistent)]
public void TestTemporaryTopicMessageDelivery()
{
const int NUM_MSGS = 100;
IDestination destination = GetDestination("temp1");
using (IConnection connection = GetConnection("c1"))
using (ISession session = GetSession("s1"))
using (IMessageProducer producer = GetProducer("sender"))
{
TestDestinationMessageDelivery(connection, session, producer, destination, NUM_MSGS);
}
}
[Test]
[ConnectionSetup(null, "c1")]
[SessionSetup("c1", "s1")]
[TemporaryTopicSetup("s1", "temp1")]
[ProducerSetup("s1", "temp1", "sender", DeliveryMode = MsgDeliveryMode.NonPersistent)]
public void TestCannotSendOnDeletedTemporaryTopic()
{
try
{
using (IConnection connection = GetConnection("c1"))
using (IDestination destination = GetDestination("temp1"))
using (IMessageProducer producer = GetProducer("sender"))
{
ITemporaryTopic tempTopic = destination as ITemporaryTopic;
Assert.NotNull(tempTopic, "Failed to Create Temporary Topic.");
IMessage msg = producer.CreateMessage();
tempTopic.Delete();
try
{
producer.Send(msg);
Assert.Fail("Expected Exception for sending message on deleted temporary topic.");
}
catch(NMSException nex)
{
Assert.IsTrue(nex is InvalidDestinationException, "Received Unexpected exception {0}", nex);
}
}
}
catch(Exception ex)
{
this.PrintTestFailureAndAssert(GetTestMethodName(), "Unexpected exception.", ex);
}
}
[Test]
[ConnectionSetup(null, "c1")]
[SessionSetup("c1", "s1", AckMode = AcknowledgementMode.IndividualAcknowledge)]
[SessionSetup("c1", "s2")]
[TopicSetup("s1", "t1", Name = "nms.t.temp.reply.to.topic")]
[TemporaryTopicSetup("s1", "temp1")]
[ProducerSetup("s1", "t1", "sender", DeliveryMode = MsgDeliveryMode.NonPersistent)]
[ConsumerSetup("s2", "t1", "receiver")]
[ProducerSetup("s2", "temp1", "replyer", DeliveryMode = MsgDeliveryMode.Persistent, TimeToLive = 2500)]
[ConsumerSetup("s1", "temp1", "listener")]
public void TestTemporaryTopicReplyTo()
{
const int NUM_MSGS = 100;
const string MSG_BODY = "num : ";
IDestination replyTo = GetDestination("temp1");
long repliedCount = 0;
long lastRepliedId = -1;
string errString = null;
CountDownLatch replierFinished = new CountDownLatch(NUM_MSGS);
using (IConnection connection = GetConnection("c1"))
using (IMessageConsumer receiver = GetConsumer("receiver"))
using (IMessageConsumer listener = GetConsumer("listener"))
using (IMessageProducer sender = GetProducer("sender"))
using (IMessageProducer replyer = GetProducer("replyer"))
{
try
{
connection.ExceptionListener += DefaultExceptionListener;
ITextMessage rmsg = null;
ITextMessage sendMsg = sender.CreateTextMessage();
sendMsg.NMSReplyTo = replyTo;
listener.Listener += (message) =>
{
if (errString == null)
{
repliedCount++;
long msgId = ExtractMsgId(message.NMSMessageId);
if (msgId != lastRepliedId + 1)
{
// Test failed release blocked thread for shutdown.
errString = String.Format("Received msg {0} out of order expected {1}", msgId, lastRepliedId + 1);
waiter.Set();
}
else
{
lastRepliedId = msgId;
if (msgId == NUM_MSGS - 1)
{
message.Acknowledge();
// test done signal complete.
waiter.Set();
return;
}
message.Acknowledge();
}
}
};
receiver.Listener += (message) =>
{
if (errString == null)
{
msgCount++;
rmsg = message as ITextMessage;
if (rmsg == null)
{
// test failure
errString = string.Format(
"Received message, id = {2}, body of type {0}, expected {1}.",
message.GetType().Name,
typeof(ITextMessage).Name,
ExtractMsgId(message.NMSMessageId)
);
waiter.Set();
return;
}
IDestination replyDestination = message.NMSReplyTo;
if (!replyDestination.Equals(replyTo))
{
// test failure
errString = string.Format(
"Received message, id = {0}, with incorrect reply Destination. Expected : {1}, Actual : {2}.",
ExtractMsgId(message.NMSMessageId),
replyTo,
replyDestination
);
waiter.Set();
return;
}
else
{
ITextMessage reply = replyer.CreateTextMessage();
reply.Text = "Received:" + rmsg.Text;
try
{
replyer.Send(reply);
replierFinished.countDown();
}
catch (NMSException nEx)
{
Logger.Error("Failed to send message from replyer Cause : " + nEx);
throw nEx;
}
}
}
};
connection.Start();
for(int i=0; i<NUM_MSGS; i++)
{
sendMsg.Text = MSG_BODY + i;
sender.Send(sendMsg);
}
// allow for two seconds for each message to be sent and replied to.
int timeout = 2000 * NUM_MSGS;
if(!waiter.WaitOne(timeout))
{
Assert.Fail("Timed out waiting on message delivery to complete. Received {1} of {0}, Replied {2} of {0}, Last Replied Msg Id {3}.", NUM_MSGS, msgCount, repliedCount, lastRepliedId);
}
else if(errString != null)
{
Assert.Fail("Asynchronous failure occurred. Cause : {0}", errString);
}
else
{
Assert.IsTrue(replierFinished.await(TimeSpan.FromMilliseconds(timeout)), "Replier thread has not finished sending messages. Remaining {0}", replierFinished.Remaining);
Assert.IsNull(asyncEx, "Received Exception Asynchronously. Cause : {0}", asyncEx);
Assert.AreEqual(NUM_MSGS, msgCount, "Failed to receive all messages.");
Assert.AreEqual(NUM_MSGS, repliedCount, "Failed to reply to all messages");
Assert.AreEqual(NUM_MSGS - 1, lastRepliedId, "Failed to receive the final message");
}
}
catch (Exception ex)
{
this.PrintTestFailureAndAssert(GetTestMethodName(), "Unexpected exception.", ex);
}
}
}
private int DrainDestination(
ISession consumerFactory,
IDestination destination,
int msgsToDrain = -1,
int timeout = TIMEOUT
)
{
using (IMessageConsumer drain = consumerFactory.CreateConsumer(destination))
{
return DrainDestination(drain, destination, msgsToDrain, timeout);
}
}
private int DrainDestination(IMessageConsumer drain, IDestination destination, int msgsToDrain = -1, int timeout = TIMEOUT )
{
int msgsDrained = 0;
try
{
if (msgsToDrain > 0)
{
while (msgsToDrain > msgsDrained)
{
IMessage msg = drain.Receive(TimeSpan.FromMilliseconds(timeout));
if (msg == null) break;
msgsDrained++;
}
}
else
{
IMessage msg = null;
while ((msg = drain.Receive(TimeSpan.FromMilliseconds(timeout))) != null)
{
msgsDrained++;
}
}
}
catch(Exception ex)
{
StringBuilder sb = new StringBuilder();
sb.AppendFormat("Failed to drain Destination {0}", destination.ToString());
if (msgsToDrain != -1)
{
sb.AppendFormat(", Drained {0} of {1} messages", msgsDrained, msgsToDrain);
}
else
{
sb.AppendFormat(", Drained {0} messages", msgsDrained);
}
if (timeout > 0)
{
sb.AppendFormat(" in {0}ms", timeout);
}
throw new Exception(sb.ToString(), ex);
}
return msgsDrained;
}
[Test]
[ConnectionSetup(null, "default")]
[SessionSetup("default", "s1")]
[SkipTestOnRemoteBrokerProperties("default", RemotePlatform = NMSTestConstants.NMS_SOLACE_PLATFORM)]
public void TestCreateTemporaryDestination()
{
const int NUM_MSGS = 10;
try
{
using (IConnection connection = GetConnection("default"))
using (ISession session = GetSession("s1"))
{
IStreamMessage msg = session.CreateStreamMessage();
IDestination temp = session.CreateTemporaryQueue();
IMessageProducer producer = session.CreateProducer(temp);
for (int i = 0; i < NUM_MSGS; i++)
{
msg.WriteObject("barfoo");
msg.WriteObject(i);
msg.Properties.SetInt("count", i);
producer.Send(msg);
msg.ClearBody();
}
// Queues do not require an active consumer to receive messages.
// Create consumer on queue after messages sent and receive messages.
IMessageConsumer drain = session.CreateConsumer(temp);
connection.Start();
int msgsReceived = DrainDestination(drain, temp, NUM_MSGS);
Assert.AreEqual(NUM_MSGS, msgsReceived, "Received {0} of {1} on temporary destination {2}.", msgsReceived, NUM_MSGS, temp.ToString());
temp = session.CreateTemporaryTopic();
// Topics require an active consumer to receive messages.
drain = session.CreateConsumer(temp);
producer = session.CreateProducer(temp);
for (int i = 0; i < NUM_MSGS; i++)
{
msg.WriteObject("foobar");
msg.WriteObject(i);
msg.Properties.SetInt("count", i);
producer.Send(msg);
msg.ClearBody();
}
msgsReceived = DrainDestination(drain, temp, NUM_MSGS);
Assert.AreEqual(NUM_MSGS, msgsReceived, "Received {0} of {1} on temporary destination {2}.", msgsReceived, NUM_MSGS, temp.ToString());
}
}
catch (Exception ex)
{
this.PrintTestFailureAndAssert(GetTestMethodName(), "Unexpected exception.", ex);
}
}
[Test]
[ConnectionSetup(null, "c1")]
[SessionSetup("c1", "s1", "s2", "tFactory")]
[TemporaryTopicSetup("tFactory", "temp")]
[ProducerSetup("s2", "temp", "sender", DeliveryMode = MsgDeliveryMode.Persistent)]
[ConsumerSetup("s1", "temp", "receiver")]
public void TestSendToTemporaryOnClosedSession()
{
const int NUM_MSGS = 100;
string errString = null;
const int TIMEOUT = NUM_MSGS * 100;
try
{
using (IConnection connection = GetConnection("c1"))
using (ISession tFactory = GetSession("tFactory"))
using (IMessageProducer producer = GetProducer("sender"))
using (IMessageConsumer consumer = GetConsumer("receiver"))
{
IDestination destination = GetDestination("temp");
ITextMessage sendMessage = producer.CreateTextMessage();
MessageListener ackCallback = CreateListener(NUM_MSGS);
MessageListener callback = (message) =>
{
if (errString == null)
{
if (!destination.Equals(message.NMSReplyTo))
{
errString = string.Format("Received message, id = {0}, has incorrect ReplyTo property.", ExtractMsgId(message.NMSMessageId));
waiter.Set();
}
ackCallback(message);
}
};
consumer.Listener += callback;
connection.ExceptionListener += DefaultExceptionListener;
sendMessage.NMSReplyTo = destination;
connection.Start();
// close session
tFactory.Close();
for(int i = 0; i < NUM_MSGS; i++)
{
sendMessage.Text = string.Format("Link:{0},count:{1}", "temp", i);
producer.Send(sendMessage);
sendMessage.ClearBody();
}
if (!waiter.WaitOne(TIMEOUT))
{
if(errString == null)
{
Assert.Fail("Timed out waiting messages. Received, {0} of {1} messages in {2}ms.", msgCount, NUM_MSGS, TIMEOUT);
}
else
{
Assert.Fail(errString);
}
}
Assert.AreEqual(NUM_MSGS, msgCount, "Did not receive expected number of messages.");
}
}
catch(Exception ex)
{
this.PrintTestFailureAndAssert(GetTestMethodName(), "Unexpected exception.", ex);
}
}
#endregion // end Temporary Destination tests
#endregion // end Destination tests
}
}