| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| using System; |
| using System.Text; |
| using log4net; |
| using NUnit.Framework; |
| using Apache.Qpid.Messaging; |
| using Apache.Qpid.Client.Qms; |
| using Apache.Qpid.Client; |
| |
| namespace Apache.Qpid.Integration.Tests.testcases |
| { |
| /// <summary> |
| /// Provides a basis for writing Unit tests that communicate with an AMQ protocol broker. By default it creates a connection |
| /// to a message broker running on localhost on the standard AMQ port, 5672, using guest:guest login credentials. It also |
| /// creates a standard auto-ack channel on this connection. |
| /// </summary> |
| public class BaseMessagingTestFixture |
| { |
| private static ILog log = LogManager.GetLogger(typeof(BaseMessagingTestFixture)); |
| |
| /// <summary> Used to build dummy data to fill test messages with. </summary> |
| private const string MESSAGE_DATA_BYTES = "-- Test Message -- Test Message -- Test Message -- Test Message -- Test Message "; |
| |
| /// <summary> The default timeout in milliseconds to use on receives. </summary> |
| private const long RECEIVE_WAIT = 2000; |
| |
| /// <summary> The default AMQ connection URL to use for tests. </summary> |
| public const string connectionUri = "amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672'"; |
| |
| /// <summary> The default AMQ connection URL parsed as a connection info. </summary> |
| protected IConnectionInfo connectionInfo; |
| |
| /// <summary> Holds an array of connections for building mutiple test end-points. </summary> |
| protected IConnection[] testConnection = new IConnection[10]; |
| |
| /// <summary> Holds an array of channels for building mutiple test end-points. </summary> |
| protected IChannel[] testChannel = new IChannel[10]; |
| |
| /// <summary> Holds an array of queues for building mutiple test end-points. </summary> |
| protected String[] testQueue = new String[10]; |
| |
| /// <summary> Holds an array of producers for building mutiple test end-points. </summary> |
| protected IMessagePublisher[] testProducer = new IMessagePublisher[10]; |
| |
| /// <summary> Holds an array of consumers for building mutiple test end-points. </summary> |
| protected IMessageConsumer[] testConsumer = new IMessageConsumer[10]; |
| |
| /// <summary> A counter used to supply unique ids. </summary> |
| private static int uniqueId = 0; |
| |
| /// <summary> Used to hold unique ids per test. </summary> |
| protected Guid testId; |
| |
| /// <summary> Creates the test connection and channel. </summary> |
| [SetUp] |
| public virtual void Init() |
| { |
| log.Debug("public virtual void Init(): called"); |
| |
| // Set up a unique id for this test. |
| testId = System.Guid.NewGuid(); |
| } |
| |
| /// <summary> |
| /// Disposes of the test connection. This is called manually because the connection is a field so dispose will not be automatically |
| /// called on it. |
| /// </summary> |
| [TearDown] |
| public virtual void Shutdown() |
| { |
| log.Debug("public virtual void Shutdown(): called"); |
| } |
| |
| /// <summary> Sets up the nth test end-point. </summary> |
| /// |
| /// <param name="n">The index of the test end-point to set up.</param> |
| /// <param name="producer"><tt>true</tt> to set up a producer on the end-point.</param> |
| /// <param name="consumer"><tt>true</tt> to set up a consumer on the end-point.</param> |
| /// <param name="routingKey">The routing key for the producer to send on.</param> |
| /// <param name="ackMode">The ack mode for the end-points channel.</param> |
| /// <param name="transacted"><tt>true</tt> to use transactions on the end-points channel.</param> |
| /// <param name="exchangeName">The exchange to produce or consume on.</param> |
| /// <param name="declareBind"><tt>true</tt> if the consumers queue should be declared and bound, <tt>false</tt> if it has already been.</param> |
| /// <param name="durable"><tt>true</tt> to declare the consumers queue as durable.</param> |
| /// <param name="subscriptionName">If durable is true, the fixed unique queue name to use.</param> |
| public void SetUpEndPoint(int n, bool producer, bool consumer, string routingKey, AcknowledgeMode ackMode, bool transacted, |
| string exchangeName, bool declareBind, bool durable, string subscriptionName) |
| { |
| SetUpEndPoint(n, producer, consumer, routingKey, ackMode, transacted, exchangeName, declareBind, durable, subscriptionName, true, false); |
| } |
| /// <summary> Sets up the nth test end-point. </summary> |
| /// |
| /// <param name="n">The index of the test end-point to set up.</param> |
| /// <param name="producer"><tt>true</tt> to set up a producer on the end-point.</param> |
| /// <param name="consumer"><tt>true</tt> to set up a consumer on the end-point.</param> |
| /// <param name="routingKey">The routing key for the producer to send on.</param> |
| /// <param name="ackMode">The ack mode for the end-points channel.</param> |
| /// <param name="transacted"><tt>true</tt> to use transactions on the end-points channel.</param> |
| /// <param name="exchangeName">The exchange to produce or consume on.</param> |
| /// <param name="declareBind"><tt>true</tt> if the consumers queue should be declared and bound, <tt>false</tt> if it has already been.</param> |
| /// <param name="durable"><tt>true</tt> to declare the consumers queue as durable.</param> |
| /// <param name="subscriptionName">If durable is true, the fixed unique queue name to use.</param> |
| /// <param name="exclusive"><tt>true</tt> declare queue as exclusive.</param> |
| /// <param name="browse"><tt>true</tt> only browse, don''t consume.</param> |
| public void SetUpEndPoint(int n, bool producer, bool consumer, string routingKey, AcknowledgeMode ackMode, bool transacted, |
| string exchangeName, bool declareBind, bool durable, string subscriptionName, bool exclusive, bool browse) |
| { |
| // Allow client id to be fixed, or undefined. |
| { |
| // Use unique id for end point. |
| connectionInfo = QpidConnectionInfo.FromUrl(connectionUri); |
| |
| connectionInfo.ClientName = "test" + n; |
| } |
| |
| testConnection[n] = new AMQConnection(connectionInfo); |
| testConnection[n].Start(); |
| testChannel[n] = testConnection[n].CreateChannel(transacted, ackMode); |
| |
| if (producer) |
| { |
| testProducer[n] = testChannel[n].CreatePublisherBuilder() |
| .WithExchangeName(exchangeName) |
| .WithRoutingKey(routingKey) |
| .Create(); |
| } |
| |
| if (consumer) |
| { |
| string queueName; |
| |
| // Use the subscription name as the queue name if the subscription is durable, otherwise use a generated name. |
| if (durable) |
| { |
| // The durable queue is declared without auto-delete, and passively, in case it has already been declared. |
| queueName = subscriptionName; |
| |
| if (declareBind) |
| { |
| testChannel[n].DeclareQueue(queueName, durable, exclusive, false); |
| testChannel[n].Bind(queueName, exchangeName, routingKey); |
| } |
| } |
| else |
| { |
| queueName = testChannel[n].GenerateUniqueName(); |
| |
| if (declareBind) |
| { |
| if (durable) |
| { |
| testQueue[n] = queueName; |
| } |
| testChannel[n].DeclareQueue(queueName, durable, true, true); |
| testChannel[n].Bind(queueName, exchangeName, routingKey); |
| } |
| } |
| |
| testConsumer[n] = testChannel[n].CreateConsumerBuilder(queueName).WithBrowse(browse).Create(); |
| } |
| } |
| |
| /// <summary> Closes down the nth test end-point. </summary> |
| public void CloseEndPoint(int n) |
| { |
| log.Debug("public void CloseEndPoint(int n): called"); |
| |
| if (testProducer[n] != null) |
| { |
| testProducer[n].Close(); |
| testProducer[n].Dispose(); |
| testProducer[n] = null; |
| } |
| |
| if (testConsumer[n] != null) |
| { |
| if (testQueue[n] != null) |
| { |
| testChannel[n].DeleteQueue(testQueue[n], false, false, true); |
| } |
| testConsumer[n].Close(); |
| testConsumer[n].Dispose(); |
| testConsumer[n] = null; |
| } |
| |
| if (testConnection[n] != null) |
| { |
| testConnection[n].Stop(); |
| testConnection[n].Close(); |
| testConnection[n].Dispose(); |
| testConnection[n] = null; |
| } |
| } |
| |
| /// <summary> |
| /// Consumes n messages, checking that the n+1th is not available within a timeout, and that the consumed messages |
| /// are text messages with contents equal to the specified message body. |
| /// </summary> |
| /// |
| /// <param name="n">The number of messages to consume.</param> |
| /// <param name="body">The body text to match against all messages.</param> |
| /// <param name="consumer">The message consumer to recieve the messages on.</param> |
| public static void ConsumeNMessagesOnly(int n, string body, IMessageConsumer consumer) |
| { |
| ConsumeNMessages(n, body, consumer); |
| |
| // Check that one more than n cannot be received. |
| IMessage msg = consumer.Receive(RECEIVE_WAIT); |
| Assert.IsNull(msg, "Consumer got more messages than the number requested (" + n + ")."); |
| } |
| |
| /// <summary> |
| /// Consumes n messages, checking that the n+1th is not available within a timeout, and that the consumed messages |
| /// are text messages with contents equal to the specified message body. |
| /// </summary> |
| /// |
| /// <param name="n">The number of messages to consume.</param> |
| /// <param name="body">The body text to match against all messages.</param> |
| /// <param name="consumer">The message consumer to recieve the messages on.</param> |
| public static void ConsumeNMessages(int n, string body, IMessageConsumer consumer) |
| { |
| IMessage msg; |
| |
| // Try to receive n messages. |
| for (int i = 0; i < n; i++) |
| { |
| msg = consumer.Receive(RECEIVE_WAIT); |
| Assert.IsNotNull(msg, "Consumer did not receive message number: " + i); |
| Assert.AreEqual(body, ((ITextMessage)msg).Text, "Incorrect Message recevied on consumer1."); |
| } |
| } |
| |
| /// <summary>Creates the requested number of bytes of dummy text. Usually used for filling test messages. </summary> |
| /// |
| /// <param name="size">The number of bytes of dummy text to generate.</param> |
| /// |
| /// <return>The requested number of bytes of dummy text.</return> |
| public static String GetData(int size) |
| { |
| StringBuilder buf = new StringBuilder(size); |
| |
| if (size > 0) |
| { |
| int div = MESSAGE_DATA_BYTES.Length / size; |
| int mod = MESSAGE_DATA_BYTES.Length % size; |
| |
| for (int i = 0; i < div; i++) |
| { |
| buf.Append(MESSAGE_DATA_BYTES); |
| } |
| |
| if (mod != 0) |
| { |
| buf.Append(MESSAGE_DATA_BYTES, 0, mod); |
| } |
| } |
| |
| return buf.ToString(); |
| } |
| } |
| } |