blob: b0255644210e7e919331d18378562631aca099e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using NUnit.Framework;
namespace Apache.NMS.ZMQ
{
[TestFixture]
public class ZMQTest : BaseTest
{
private int receivedMsgCount = 0;
[Test]
public void TestConnection()
{
IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
Assert.IsNotNull(factory, "Error creating connection factory.");
using(IConnection connection = factory.CreateConnection())
{
Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
Assert.IsInstanceOf<Connection>(connection, "Wrong connection type.");
}
}
[Test]
public void TestSession()
{
IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
Assert.IsNotNull(factory, "Error creating connection factory.");
using(IConnection connection = factory.CreateConnection())
{
Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
using(ISession session = connection.CreateSession())
{
Assert.IsNotNull(session, "Error creating session.");
Assert.IsInstanceOf<Session>(session, "Wrong session type.");
}
}
}
[Test, Sequential]
public void TestDestinations(
[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
string destination,
[Values(typeof(Queue), typeof(Topic), typeof(TemporaryQueue), typeof(TemporaryTopic))]
Type destinationType)
{
IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
Assert.IsNotNull(factory, "Error creating connection factory.");
using(IConnection connection = factory.CreateConnection())
{
Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
using(ISession session = connection.CreateSession())
{
Assert.IsNotNull(session, "Error creating session.");
using(IDestination testDestination = session.GetDestination(destination))
{
Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
Assert.IsInstanceOf(destinationType, testDestination, "Wrong destintation type.");
}
}
}
}
[Test]
public void TestProducers(
[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
string destination)
{
IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
Assert.IsNotNull(factory, "Error creating connection factory.");
using(IConnection connection = factory.CreateConnection())
{
Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
using(ISession session = connection.CreateSession())
{
Assert.IsNotNull(session, "Error creating session.");
using(IDestination testDestination = session.GetDestination(destination))
{
Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
using(IMessageProducer producer = session.CreateProducer(testDestination))
{
Assert.IsNotNull(producer, "Error creating producer on {0}", destination);
Assert.IsInstanceOf<MessageProducer>(producer, "Wrong producer type.");
}
}
}
}
}
[Test]
public void TestConsumers(
[Values("queue://ZMQTestQueue:", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
string destination)
{
IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
Assert.IsNotNull(factory, "Error creating connection factory.");
using(IConnection connection = factory.CreateConnection())
{
Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
using(ISession session = connection.CreateSession())
{
Assert.IsNotNull(session, "Error creating session.");
using(IDestination testDestination = session.GetDestination(destination))
{
Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
using(IMessageConsumer consumer = session.CreateConsumer(testDestination))
{
Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
Assert.IsInstanceOf<MessageConsumer>(consumer, "Wrong consumer type.");
}
}
}
}
}
[Test]
public void TestSendReceive(
// inproc, ipc, tcp, pgm, or epgm
[Values("zmq:tcp://localhost:5556", "zmq:inproc://localhost:5557")]
string connectionName,
[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
string destinationName)
{
IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri(connectionName));
Assert.IsNotNull(factory, "Error creating connection factory.");
this.receivedMsgCount = 0;
using(IConnection connection = factory.CreateConnection())
{
Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
using(ISession session = connection.CreateSession())
{
Assert.IsNotNull(session, "Error creating Session.");
using(IDestination testDestination = session.GetDestination(destinationName))
{
Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destinationName);
using(IMessageConsumer consumer = session.CreateConsumer(testDestination))
{
Assert.IsNotNull(consumer, "Error creating consumer on {0}", destinationName);
int sendMsgCount = 0;
try
{
consumer.Listener += OnMessage;
using(IMessageProducer producer = session.CreateProducer(testDestination))
{
Assert.IsNotNull(consumer, "Error creating producer on {0}", destinationName);
ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
Assert.IsNotNull(testMsg, "Error creating test message.");
// Wait for the message
DateTime startWaitTime = DateTime.Now;
TimeSpan maxWaitTime = TimeSpan.FromSeconds(5);
// Continually send the message to compensate for the
// slow joiner problem inherent to spinning up the
// internal dispatching threads in ZeroMQ.
while(this.receivedMsgCount < 1)
{
++sendMsgCount;
producer.Send(testMsg);
if((DateTime.Now - startWaitTime) > maxWaitTime)
{
Assert.Fail("Timeout waiting for message receive.");
}
Thread.Sleep(1);
}
}
}
finally
{
consumer.Listener -= OnMessage;
Console.WriteLine("Sent {0} msgs.\nReceived {1} msgs", sendMsgCount, this.receivedMsgCount);
}
}
}
}
}
}
/// <summary>
/// Receive messages sent to consumer.
/// </summary>
/// <param name="message"></param>
private void OnMessage(IMessage message)
{
Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
ITextMessage textMsg = (ITextMessage) message;
Assert.AreEqual(textMsg.Text, "Zero Message.");
this.receivedMsgCount++;
}
}
}