blob: ccb95b0c7db6f9bdce3b0e429152131db4f01266 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Runtime.InteropServices;
using System.Threading;
using log4net;
using NUnit.Framework;
using Qpid.Client.Qms;
using Qpid.Messaging;
namespace Qpid.Client.Tests.failover
{
[TestFixture]
public class FailoverTxTest : IConnectionListener
{
private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTxTest));
const int NUM_ITERATIONS = 10;
const int NUM_COMMITED_MESSAGES = 10;
const int NUM_ROLLEDBACK_MESSAGES = 3;
const int SLEEP_MILLIS = 50;
// AutoAcknowledge, ClientAcknowledge, DupsOkAcknowledge, NoAcknowledge, PreAcknowledge
AcknowledgeMode _acknowledgeMode = AcknowledgeMode.DupsOkAcknowledge;
const bool _noWait = true; // use Receive or ReceiveNoWait
AMQConnection _connection;
public void OnMessage(IMessage message)
{
try
{
_log.Info("Received: " + ((ITextMessage) message).Text);
if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
{
_log.Info("client acknowledging");
message.Acknowledge();
}
}
catch (QpidException e)
{
Error(e);
}
}
class NoWaitConsumer
{
FailoverTxTest _failoverTxTest;
IMessageConsumer _consumer;
private bool _noWait;
internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel, bool noWait)
{
_failoverTxTest = failoverTxTest;
_consumer = channel;
_noWait = noWait;
}
internal void Run()
{
int messages = 0;
while (messages < NUM_COMMITED_MESSAGES)
{
IMessage msg;
if (_noWait) msg = _consumer.ReceiveNoWait();
else msg = _consumer.Receive();
if (msg != null)
{
_log.Info("NoWait received message");
++messages;
_failoverTxTest.OnMessage(msg);
}
else
{
Thread.Sleep(1);
}
}
}
}
void DoFailoverTxTest(IConnectionInfo connectionInfo)
{
_connection = new AMQConnection(connectionInfo);
_connection.ConnectionListener = this;
_log.Info("connection = " + _connection);
_log.Info("connectionInfo = " + connectionInfo);
_log.Info("connection.AsUrl = " + _connection.toURL());
_log.Info("AcknowledgeMode is " + _acknowledgeMode);
IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
string queueName = receivingChannel.GenerateUniqueName();
// Queue.Declare
receivingChannel.DeclareQueue(queueName, false, true, true);
// No need to call Queue.Bind as automatically bound to default direct exchange.
receivingChannel.Bind(queueName, "amq.direct", queueName);
IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
.WithPrefetchLow(30)
.WithPrefetchHigh(60).Create();
bool useThread = false;
if (useThread)
{
NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer, _noWait);
new Thread(new ThreadStart(noWaitConsumer.Run)).Start();
}
else
{
consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
}
_connection.Start();
PublishInTx(queueName);
Thread.Sleep(2000); // Wait a while for last messages.
_connection.Close();
_log.Info("FailoverTxText complete");
}
private void PublishInTx(string routingKey)
{
_log.Info("sendInTx");
bool transacted = true;
IChannel publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
IMessagePublisher publisher = publishingChannel.CreatePublisherBuilder()
.WithRoutingKey(routingKey)
.Create();
for (int i = 1; i <= NUM_ITERATIONS; ++i)
{
for (int j = 1; j <= NUM_ROLLEDBACK_MESSAGES; ++j)
{
ITextMessage msg = publishingChannel.CreateTextMessage("Tx=" + i + " rolledBackMsg=" + j);
_log.Info("sending message = " + msg.Text);
publisher.Send(msg);
Thread.Sleep(SLEEP_MILLIS);
}
if (transacted) publishingChannel.Rollback();
for (int j = 1; j <= NUM_COMMITED_MESSAGES; ++j)
{
ITextMessage msg = publishingChannel.CreateTextMessage("Tx=" + i + " commitedMsg=" + j);
_log.Info("sending message = " + msg.Text);
publisher.Send(msg);
Thread.Sleep(SLEEP_MILLIS);
}
if (transacted) publishingChannel.Commit();
}
}
private void Error(Exception e)
{
_log.Fatal("Exception received. About to stop.", e);
Stop();
}
private void Stop()
{
_log.Info("Stopping...");
try
{
_connection.Close();
}
catch (QpidException e)
{
_log.Info("Failed to shutdown: ", e);
}
}
public void BytesSent(long count)
{
}
public void BytesReceived(long count)
{
}
public bool PreFailover(bool redirect)
{
_log.Info("preFailover(" + redirect + ") called");
return true;
}
public bool PreResubscribe()
{
_log.Info("preResubscribe() called");
return true;
}
public void FailoverComplete()
{
_log.Info("failoverComplete() called");
}
[Test]
public void TestFail()
{
Assert.Fail("Tests in this class do not pass, but hang forever, so commented out until can be fixed.");
}
/*[Test]
public void TestWithBasicInfo()
{
Console.WriteLine("TestWithBasicInfo");
Console.WriteLine(".NET Framework version: " + RuntimeEnvironment.GetSystemVersion());
QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
DoFailoverTxTest(connectionInfo);
}*/
/*[Test]
public void runTestWithUrl()
{
String clientId = "failover" + DateTime.Now.Ticks;
string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
"?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
_log.Info("url = [" + defaultUrl + "]");
IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
_log.Info("connection url = [" + connectionInfo + "]");
DoFailoverTxTest(connectionInfo);
}*/
}
}