/* | |
* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
* | |
*/ | |
using System; | |
using System.Text; | |
using log4net; | |
using NUnit.Framework; | |
using Apache.Qpid.Messaging; | |
using Apache.Qpid.Client.Qms; | |
using Apache.Qpid.Client; | |
namespace Apache.Qpid.Integration.Tests.testcases | |
{ | |
/// <summary> | |
/// Provides a basis for writing Unit tests that communicate with an AMQ protocol broker. By default it creates a connection | |
/// to a message broker running on localhost on the standard AMQ port, 5672, using guest:guest login credentials. It also | |
/// creates a standard auto-ack channel on this connection. | |
/// </summary> | |
public class BaseMessagingTestFixture | |
{ | |
private static ILog log = LogManager.GetLogger(typeof(BaseMessagingTestFixture)); | |
/// <summary> Used to build dummy data to fill test messages with. </summary> | |
private const string MESSAGE_DATA_BYTES = "-- Test Message -- Test Message -- Test Message -- Test Message -- Test Message "; | |
/// <summary> The default timeout in milliseconds to use on receives. </summary> | |
private const long RECEIVE_WAIT = 2000; | |
/// <summary> The default AMQ connection URL to use for tests. </summary> | |
public const string connectionUri = "amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672'"; | |
/// <summary> The default AMQ connection URL parsed as a connection info. </summary> | |
protected IConnectionInfo connectionInfo; | |
/// <summary> Holds an array of connections for building mutiple test end-points. </summary> | |
protected IConnection[] testConnection = new IConnection[10]; | |
/// <summary> Holds an array of channels for building mutiple test end-points. </summary> | |
protected IChannel[] testChannel = new IChannel[10]; | |
/// <summary> Holds an array of queues for building mutiple test end-points. </summary> | |
protected String[] testQueue = new String[10]; | |
/// <summary> Holds an array of producers for building mutiple test end-points. </summary> | |
protected IMessagePublisher[] testProducer = new IMessagePublisher[10]; | |
/// <summary> Holds an array of consumers for building mutiple test end-points. </summary> | |
protected IMessageConsumer[] testConsumer = new IMessageConsumer[10]; | |
/// <summary> A counter used to supply unique ids. </summary> | |
private static int uniqueId = 0; | |
/// <summary> Used to hold unique ids per test. </summary> | |
protected Guid testId; | |
/// <summary> Creates the test connection and channel. </summary> | |
[SetUp] | |
public virtual void Init() | |
{ | |
log.Debug("public virtual void Init(): called"); | |
// Set up a unique id for this test. | |
testId = System.Guid.NewGuid(); | |
} | |
/// <summary> | |
/// Disposes of the test connection. This is called manually because the connection is a field so dispose will not be automatically | |
/// called on it. | |
/// </summary> | |
[TearDown] | |
public virtual void Shutdown() | |
{ | |
log.Debug("public virtual void Shutdown(): called"); | |
} | |
/// <summary> Sets up the nth test end-point. </summary> | |
/// | |
/// <param name="n">The index of the test end-point to set up.</param> | |
/// <param name="producer"><tt>true</tt> to set up a producer on the end-point.</param> | |
/// <param name="consumer"><tt>true</tt> to set up a consumer on the end-point.</param> | |
/// <param name="routingKey">The routing key for the producer to send on.</param> | |
/// <param name="ackMode">The ack mode for the end-points channel.</param> | |
/// <param name="transacted"><tt>true</tt> to use transactions on the end-points channel.</param> | |
/// <param name="exchangeName">The exchange to produce or consume on.</param> | |
/// <param name="declareBind"><tt>true</tt> if the consumers queue should be declared and bound, <tt>false</tt> if it has already been.</param> | |
/// <param name="durable"><tt>true</tt> to declare the consumers queue as durable.</param> | |
/// <param name="subscriptionName">If durable is true, the fixed unique queue name to use.</param> | |
public void SetUpEndPoint(int n, bool producer, bool consumer, string routingKey, AcknowledgeMode ackMode, bool transacted, | |
string exchangeName, bool declareBind, bool durable, string subscriptionName) | |
{ | |
// Allow client id to be fixed, or undefined. | |
{ | |
// Use unique id for end point. | |
connectionInfo = QpidConnectionInfo.FromUrl(connectionUri); | |
connectionInfo.ClientName = "test" + n; | |
} | |
testConnection[n] = new AMQConnection(connectionInfo); | |
testConnection[n].Start(); | |
testChannel[n] = testConnection[n].CreateChannel(transacted, ackMode); | |
if (producer) | |
{ | |
testProducer[n] = testChannel[n].CreatePublisherBuilder() | |
.WithExchangeName(exchangeName) | |
.WithRoutingKey(routingKey) | |
.Create(); | |
} | |
if (consumer) | |
{ | |
string queueName; | |
// Use the subscription name as the queue name if the subscription is durable, otherwise use a generated name. | |
if (durable) | |
{ | |
// The durable queue is declared without auto-delete, and passively, in case it has already been declared. | |
queueName = subscriptionName; | |
if (declareBind) | |
{ | |
testChannel[n].DeclareQueue(queueName, durable, true, false); | |
testChannel[n].Bind(queueName, exchangeName, routingKey); | |
} | |
} | |
else | |
{ | |
queueName = testChannel[n].GenerateUniqueName(); | |
if (declareBind) | |
{ | |
if (durable) | |
{ | |
testQueue[n] = queueName; | |
} | |
testChannel[n].DeclareQueue(queueName, durable, true, true); | |
testChannel[n].Bind(queueName, exchangeName, routingKey); | |
} | |
} | |
testConsumer[n] = testChannel[n].CreateConsumerBuilder(queueName).Create(); | |
} | |
} | |
/// <summary> Closes down the nth test end-point. </summary> | |
public void CloseEndPoint(int n) | |
{ | |
log.Debug("public void CloseEndPoint(int n): called"); | |
if (testProducer[n] != null) | |
{ | |
testProducer[n].Close(); | |
testProducer[n].Dispose(); | |
testProducer[n] = null; | |
} | |
if (testConsumer[n] != null) | |
{ | |
if (testQueue[n] != null) | |
{ | |
testChannel[n].DeleteQueue(testQueue[n], false, false, true); | |
} | |
testConsumer[n].Close(); | |
testConsumer[n].Dispose(); | |
testConsumer[n] = null; | |
} | |
if (testConnection[n] != null) | |
{ | |
testConnection[n].Stop(); | |
testConnection[n].Close(); | |
testConnection[n].Dispose(); | |
testConnection[n] = null; | |
} | |
} | |
/// <summary> | |
/// Consumes n messages, checking that the n+1th is not available within a timeout, and that the consumed messages | |
/// are text messages with contents equal to the specified message body. | |
/// </summary> | |
/// | |
/// <param name="n">The number of messages to consume.</param> | |
/// <param name="body">The body text to match against all messages.</param> | |
/// <param name="consumer">The message consumer to recieve the messages on.</param> | |
public static void ConsumeNMessagesOnly(int n, string body, IMessageConsumer consumer) | |
{ | |
ConsumeNMessages(n, body, consumer); | |
// Check that one more than n cannot be received. | |
IMessage msg = consumer.Receive(RECEIVE_WAIT); | |
Assert.IsNull(msg, "Consumer got more messages than the number requested (" + n + ")."); | |
} | |
/// <summary> | |
/// Consumes n messages, checking that the n+1th is not available within a timeout, and that the consumed messages | |
/// are text messages with contents equal to the specified message body. | |
/// </summary> | |
/// | |
/// <param name="n">The number of messages to consume.</param> | |
/// <param name="body">The body text to match against all messages.</param> | |
/// <param name="consumer">The message consumer to recieve the messages on.</param> | |
public static void ConsumeNMessages(int n, string body, IMessageConsumer consumer) | |
{ | |
IMessage msg; | |
// Try to receive n messages. | |
for (int i = 0; i < n; i++) | |
{ | |
msg = consumer.Receive(RECEIVE_WAIT); | |
Assert.IsNotNull(msg, "Consumer did not receive message number: " + i); | |
Assert.AreEqual(body, ((ITextMessage)msg).Text, "Incorrect Message recevied on consumer1."); | |
} | |
} | |
/// <summary>Creates the requested number of bytes of dummy text. Usually used for filling test messages. </summary> | |
/// | |
/// <param name="size">The number of bytes of dummy text to generate.</param> | |
/// | |
/// <return>The requested number of bytes of dummy text.</return> | |
public static String GetData(int size) | |
{ | |
StringBuilder buf = new StringBuilder(size); | |
if (size > 0) | |
{ | |
int div = MESSAGE_DATA_BYTES.Length / size; | |
int mod = MESSAGE_DATA_BYTES.Length % size; | |
for (int i = 0; i < div; i++) | |
{ | |
buf.Append(MESSAGE_DATA_BYTES); | |
} | |
if (mod != 0) | |
{ | |
buf.Append(MESSAGE_DATA_BYTES, 0, mod); | |
} | |
} | |
return buf.ToString(); | |
} | |
} | |
} |