blob: 24b16e37ecd5bda60eef60159a5ce7c609531786 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Threading;
using log4net;
using NUnit.Framework;
using Qpid.Client.Qms;
using Qpid.Messaging;
namespace Qpid.Client.Tests.failover
{
[TestFixture]
public class FailoverTest : IConnectionListener
{
private static readonly ILog _logger = LogManager.GetLogger(typeof(FailoverTest));
private IConnection _connection;
private IChannel _channel;
private IMessagePublisher _publisher;
private int _count;
private IMessageConsumer _consumerOfResponse;
void DoFailoverTest(IConnectionInfo info)
{
DoFailoverTest(new AMQConnection(info));
}
void DoFailoverTest(IConnection connection)
{
AMQConnection amqConnection = (AMQConnection)connection;
amqConnection.ConnectionListener = this;
//Console.WriteLine("connection.url = " + amqConnection.ToURL());
_connection = connection;
_connection.ExceptionListener = new ExceptionListenerDelegate(OnConnectionException);
_channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge);
string exchangeName = ExchangeNameDefaults.TOPIC;
string routingKey = "topic1";
string queueName = DeclareAndBindTemporaryQueue(exchangeName, routingKey);
new MsgListener(_connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge), queueName);
IChannel channel = _channel;
string tempQueueName = channel.GenerateUniqueName();
channel.DeclareQueue(tempQueueName, false, true, true);
_consumerOfResponse = channel.CreateConsumerBuilder(tempQueueName).Create();
_consumerOfResponse.OnMessage = new MessageReceivedDelegate(OnMessage);
_connection.Start();
IMessage msg = _channel.CreateTextMessage("Init");
// FIXME: Leaving ReplyToExchangeName as default (i.e. the default exchange)
// FIXME: but the implementation might not like this as it defaults to null rather than "".
msg.ReplyToRoutingKey = tempQueueName;
// msg.ReplyTo = new ReplyToDestination("" /* i.e. the default exchange */, tempQueueName);
_logger.Info(String.Format("sending msg.Text={0}", ((ITextMessage)msg).Text));
// _publisher = _channel.CreatePublisher(exchangeName, exchangeClass, routingKey);
_publisher = _channel.CreatePublisherBuilder()
.WithRoutingKey(routingKey)
.WithExchangeName(exchangeName)
.Create();
_publisher.Send(msg);
}
public string DeclareAndBindTemporaryQueue(string exchangeName, string routingKey)
{
string queueName = _channel.GenerateUniqueName();
// Queue.Declare
_channel.DeclareQueue(queueName, false, true, true);
// Queue.Bind
_channel.Bind(queueName, exchangeName, routingKey);
return queueName;
}
private void OnConnectionException(Exception e)
{
_logger.Error("Connection exception occurred", e);
}
public void OnMessage(IMessage message)
{
try
{
_logger.Info("received message on temp queue msg.Text=" + ((ITextMessage)message).Text);
Thread.Sleep(1000);
_publisher.Send(_channel.CreateTextMessage("Message" + (++_count)));
}
catch (QpidException e)
{
error(e);
}
}
private void error(Exception e)
{
_logger.Error("exception received", e);
stop();
}
private void stop()
{
_logger.Info("Stopping...");
try
{
_connection.Dispose();
}
catch (QpidException e)
{
_logger.Error("Failed to shutdown", e);
}
}
public void BytesSent(long count)
{
}
public void BytesReceived(long count)
{
}
public bool PreFailover(bool redirect)
{
_logger.Info("preFailover(" + redirect + ") called");
return true;
}
public bool PreResubscribe()
{
_logger.Info("preResubscribe() called");
return true;
}
public void FailoverComplete()
{
_logger.Info("failoverComplete() called");
}
private class MsgListener
{
private IChannel _session;
private IMessagePublisher _publisher;
internal MsgListener(IChannel session, string queueName)
{
_session = session;
_session.CreateConsumerBuilder(queueName).Create().OnMessage =
new MessageReceivedDelegate(OnMessage);
}
public void OnMessage(IMessage message)
{
try
{
_logger.Info("Received: msg.Text = " + ((ITextMessage) message).Text);
if(_publisher == null)
{
_publisher = init(message);
}
reply(message);
}
catch (QpidException e)
{
// Error(e);
_logger.Error("yikes", e); // XXX
}
}
private void reply(IMessage message)
{
string msg = ((ITextMessage) message).Text;
_logger.Info("sending reply - " + msg);
_publisher.Send(_session.CreateTextMessage(msg));
}
private IMessagePublisher init(IMessage message)
{
_logger.Info(string.Format("creating reply producer with dest = '{0}:{1}'",
message.ReplyToExchangeName, message.ReplyToRoutingKey));
string exchangeName = message.ReplyToExchangeName;
string routingKey = message.ReplyToRoutingKey;
//return _channel.CreatePublisher(exchangeName, exchangeClass, routingKey);
return _session.CreatePublisherBuilder()
.WithExchangeName(exchangeName)
.WithRoutingKey(routingKey)
.Create();
}
}
[Test]
public void TestFail()
{
Assert.Fail("Tests in this class do not pass, but hang forever, so commented out until can be fixed.");
}
/*[Test]
public void TestWithBasicInfo()
{
Console.WriteLine("TestWithBasicInfo");
try
{
QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
DoFailoverTest(connectionInfo);
while (true)
{
Thread.Sleep(5000);
}
}
catch (Exception e)
{
_logger.Error("Exception caught", e);
}
}*/
// [Test]
// public void TestWithUrl()
// {
// String clientId = "failover" + DateTime.Now.Ticks;
// String defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
// "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
//
// _logger.Info("url = [" + defaultUrl + "]");
//
// // _logger.Info("connection url = [" + new AMQConnectionURL(defaultUrl) + "]");
//
// String broker = defaultUrl;
// //new FailoverTest(broker);
// }
}
}