blob: 68eee90b6420404541b86ca93c0aa4ad2a6c39e6 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Threading;
using log4net;
using NUnit.Framework;
using Qpid.Messaging;
namespace Qpid.Client.Tests
{
[TestFixture]
public class ServiceRequestingClient : BaseMessagingTestFixture
{
private const int MESSAGE_SIZE = 1024;
private static string MESSAGE_DATA = new string('x', MESSAGE_SIZE);
private const int NUM_MESSAGES = 10000;
private static ILog _log = LogManager.GetLogger(typeof(ServiceRequestingClient));
AutoResetEvent _finishedEvent = new AutoResetEvent(false);
private int _expectedMessageCount = NUM_MESSAGES;
private long _startTime;
private string _commandQueueName = "ServiceQ1";
private IMessagePublisher _publisher;
Avergager averager = new Avergager();
private void InitialiseProducer()
{
try
{
_publisher = _channel.CreatePublisherBuilder()
.WithRoutingKey(_commandQueueName)
.Create();
_publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this in builder?
_publisher.DeliveryMode = DeliveryMode.NonPersistent; // XXX: need a "with" for this in builder?
}
catch (QpidException e)
{
_log.Error("Error: " + e, e);
}
}
/*[Test]
public void SendMessages()
{
InitialiseProducer();
string replyQueueName = _channel.GenerateUniqueName();
_channel.DeclareQueue(replyQueueName, false, true, true);
IMessageConsumer messageConsumer = _channel.CreateConsumerBuilder(replyQueueName)
.WithPrefetchLow(100)
.WithPrefetchHigh(200)
.WithNoLocal(true)
.WithExclusive(true).Create();
_startTime = DateTime.Now.Ticks;
messageConsumer.OnMessage = new MessageReceivedDelegate(OnMessage);
_connection.Start();
for (int i = 0; i < _expectedMessageCount; i++)
{
ITextMessage msg;
try
{
msg = _channel.CreateTextMessage(MESSAGE_DATA + i);
}
catch (Exception e)
{
_log.Error("Error creating message: " + e, e);
break;
}
msg.ReplyToRoutingKey = replyQueueName;
// Added timestamp.
long timeNow = DateTime.Now.Ticks;
string timeSentString = String.Format("{0:G}", timeNow);
// _log.Info(String.Format("timeSent={0} timeSentString={1}", timeNow, timeSentString));
msg.Headers.SetString("timeSent", timeSentString);
//msg.Headers.SetLong("sentAt", timeNow);
try
{
_publisher.Send(msg);
}
catch (Exception e)
{
_log.Error("Error sending message: " + e, e);
//base._port = 5673;
_log.Info("Reconnecting but on port 5673");
try
{
base.Init();
InitialiseProducer();
// cheesy but a quick test
_log.Info("Calling SendMessages again");
SendMessages();
}
catch (Exception ex)
{
_log.Error("Totally busted: failed to reconnect: " + ex, ex);
}
}
}
// Assert that the test finishes within a reasonable amount of time.
const int waitSeconds = 10;
const int waitMilliseconds = waitSeconds * 1000;
_log.Info("Finished sending " + _expectedMessageCount + " messages");
_log.Info(String.Format("Waiting {0} seconds to receive last message...", waitSeconds));
Assert.IsTrue(_finishedEvent.WaitOne(waitMilliseconds, false),
String.Format("Expected to finish in {0} seconds", waitSeconds));
}*/
[Test]
public void TestFail()
{
Assert.Fail("Tests in this class do not run on autopilot, but hang forever, so commented out until can be fixed.");
}
public void OnMessage(IMessage m)
{
if (_log.IsDebugEnabled)
{
_log.Debug("Message received: " + m);
}
//if (m.Headers.Contains("sentAt"))
if (!m.Headers.Contains("timeSent"))
{
throw new Exception("Set timeSent!");
}
//long sentAt = m.Headers.GetLong("sentAt");
long sentAt = Int64.Parse(m.Headers.GetString("timeSent"));
long now = DateTime.Now.Ticks;
long latencyTicks = now - sentAt;
// _log.Info(String.Format("latency = {0} ticks ", latencyTicks));
long latencyMilliseconds = latencyTicks / TimeSpan.TicksPerMillisecond;
// _log.Info(String.Format("latency = {0} ms", latencyMilliseconds));
averager.Add(latencyMilliseconds);
// Output average every 1000 messages.
if (averager.Num % 1000 == 0)
{
_log.Info("Ticks per millisecond = " + TimeSpan.TicksPerMillisecond);
_log.Info(String.Format("Average latency (ms) = {0}", averager));
_log.Info("Received message count: " + averager.Num);
}
if (averager.Num == _expectedMessageCount)
{
_log.Info(String.Format("Final average latency (ms) = {0}", averager));
double timeTakenSeconds = (DateTime.Now.Ticks - _startTime) * 1.0 / (TimeSpan.TicksPerMillisecond * 1000);
_log.Info("Total time taken to receive " + _expectedMessageCount + " messages was " +
timeTakenSeconds + "s, equivalent to " +
(_expectedMessageCount/timeTakenSeconds) + " messages per second");
_finishedEvent.Set(); // Notify main thread to quit.
}
}
/*public static void Main(String[] args)
{
ServiceRequestingClient c = new ServiceRequestingClient();
c.Init();
c.SendMessages();
}*/
}
class Avergager
{
long num = 0;
long sum = 0;
long min = long.MaxValue;
long max = long.MinValue;
public void Add(long item)
{
++num;
sum += item;
if (item < min) min = item;
if (item > max) max = item;
}
public long Average { get { return sum/num; }}
public long Num { get { return num; } }
public override string ToString()
{
return String.Format("average={0} min={1} max={2}", Average, min, max);
}
}
}