blob: dac3af7cb4bb0248e333ec43191f9ee83a5ad346 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using NUnit.Framework;
namespace Apache.NMS.ZMQ
{
[TestFixture]
public class ProducerConsumers : BaseTest
{
private int totalMsgCountToReceive = 0;
private class ConsumerTracker
{
public IMessageConsumer consumer;
public int msgCount = 0;
}
[Test]
public void TestMultipleProducersConsumer(
[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
string destination,
[Values(1, 3)]
int numProducers,
[Values(1, 3)]
int numConsumers)
{
IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
Assert.IsNotNull(factory, "Error creating connection factory.");
using(IConnection connection = factory.CreateConnection())
{
Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
using(ISession session = connection.CreateSession())
{
Assert.IsNotNull(session, "Error creating Session.");
IDestination testDestination = session.GetDestination(destination);
Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
// Track the number of messages we should receive
this.totalMsgCountToReceive = numProducers * numConsumers;
ConsumerTracker[] consumerTrackers = null;
IMessageProducer[] producers = null;
try
{
// Create the consumers
consumerTrackers = new ConsumerTracker[numConsumers];
for(int index = 0; index < numConsumers; index++)
{
ConsumerTracker tracker = new ConsumerTracker();
tracker.consumer = session.CreateConsumer(testDestination);
Assert.IsNotNull(tracker.consumer, "Error creating consumer #{0} on {1}", index, destination);
tracker.consumer.Listener += (message) =>
{
Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
ITextMessage textMsg = (ITextMessage) message;
Assert.AreEqual(textMsg.Text, "Zero Message.");
tracker.msgCount++;
};
consumerTrackers[index] = tracker;
}
// Create the producers
producers = new IMessageProducer[numProducers];
for(int index = 0; index < numProducers; index++)
{
producers[index] = session.CreateProducer(testDestination);
Assert.IsNotNull(producers[index], "Error creating producer #{0} on {1}", index, destination);
}
// Send the messages
for(int index = 0; index < numProducers; index++)
{
ITextMessage testMsg = producers[index].CreateTextMessage("Zero Message.");
Assert.IsNotNull(testMsg, "Error creating test message for producer #{0}.", index);
producers[index].Send(testMsg);
}
// Wait for the message
DateTime startWaitTime = DateTime.Now;
TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);
while(GetNumMsgsReceived(consumerTrackers) < this.totalMsgCountToReceive)
{
if((DateTime.Now - startWaitTime) > maxWaitTime)
{
Assert.Fail("Timeout waiting for message receive.");
}
Thread.Sleep(5);
}
// Sleep for an extra 2 seconds to see if any extra messages get delivered
Thread.Sleep(2 * 1000);
Assert.AreEqual(this.totalMsgCountToReceive, GetNumMsgsReceived(consumerTrackers), "Received too many messages.");
}
finally
{
// Clean up the producers
if(null != producers)
{
foreach(IMessageProducer producer in producers)
{
producer.Dispose();
}
}
// Clean up the consumers
if(null != consumerTrackers)
{
foreach(ConsumerTracker tracker in consumerTrackers)
{
tracker.consumer.Dispose();
}
}
}
}
}
}
private int GetNumMsgsReceived(ConsumerTracker[] consumerTrackers)
{
int numMsgs = 0;
foreach(ConsumerTracker tracker in consumerTrackers)
{
numMsgs += tracker.msgCount;
}
return numMsgs;
}
}
}