blob: da0f764bcd1d65bc04b143f47efb369589c7dd22 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Threading;
using log4net;
using NUnit.Framework;
using Apache.Qpid.Messaging;
namespace Apache.Qpid.Integration.Tests.testcases
{
public class ServiceRequestingClient : BaseMessagingTestFixture
{
private const int MESSAGE_SIZE = 1024;
private static string MESSAGE_DATA = new string('x', MESSAGE_SIZE);
private const int PACK = 100;
private const int NUM_MESSAGES = PACK*10; // increase when in standalone
private static ILog _log = LogManager.GetLogger(typeof(ServiceRequestingClient));
ManualResetEvent _finishedEvent = new ManualResetEvent(false);
private int _expectedMessageCount = NUM_MESSAGES;
private long _startTime = 0;
private string _commandQueueName = "ServiceQ1";
private IMessagePublisher _publisher;
Avergager averager = new Avergager();
private void InitialiseProducer()
{
try
{
_publisher = _channel.CreatePublisherBuilder()
.WithRoutingKey(_commandQueueName)
.WithDeliveryMode(DeliveryMode.NonPersistent)
.Create();
_publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this in builder?
}
catch (QpidException e)
{
_log.Error("Error: " + e, e);
}
}
[Test]
public void SendMessages()
{
InitialiseProducer();
string replyQueueName = _channel.GenerateUniqueName();
_channel.DeclareQueue(replyQueueName, false, true, true);
IMessageConsumer messageConsumer = _channel.CreateConsumerBuilder(replyQueueName)
.WithPrefetchLow(100)
.WithPrefetchHigh(200)
.WithNoLocal(true)
.WithExclusive(true).Create();
_startTime = DateTime.Now.Ticks;
messageConsumer.OnMessage = new MessageReceivedDelegate(OnMessage);
_connection.Start();
for (int i = 0; i < _expectedMessageCount; i++)
{
ITextMessage msg;
try
{
msg = _channel.CreateTextMessage(MESSAGE_DATA + i);
}
catch (Exception e)
{
_log.Error("Error creating message: " + e, e);
break;
}
msg.ReplyToRoutingKey = replyQueueName;
// Added timestamp.
long timeNow = DateTime.Now.Ticks;
string timeSentString = String.Format("{0:G}", timeNow);
msg.Headers.SetLong("timeSent", timeNow);
_publisher.Send(msg);
}
// Assert that the test finishes within a reasonable amount of time.
const int waitSeconds = 40;
const int waitMilliseconds = waitSeconds * 1000;
_log.Info("Finished sending " + _expectedMessageCount + " messages");
_log.Info(String.Format("Waiting {0} seconds to receive last message...", waitSeconds));
Assert.IsTrue(_finishedEvent.WaitOne(waitMilliseconds, false),
String.Format("Expected to finish in {0} seconds", waitSeconds));
}
public void OnMessage(IMessage m)
{
if (_log.IsDebugEnabled)
{
_log.Debug("Message received: " + m);
}
if (!m.Headers.Contains("timeSent"))
{
throw new Exception("Set timeSent!");
}
long sentAt = m.Headers.GetLong("timeSent");
long now = DateTime.Now.Ticks;
long latencyTicks = now - sentAt;
long latencyMilliseconds = latencyTicks / TimeSpan.TicksPerMillisecond;
averager.Add(latencyMilliseconds);
if (averager.Num % PACK == 0)
{
_log.Info("Ticks per millisecond = " + TimeSpan.TicksPerMillisecond);
_log.Info(String.Format("Average latency (ms) = {0}", averager));
_log.Info("Received message count: " + averager.Num);
}
if (averager.Num == _expectedMessageCount)
{
_log.Info(String.Format("Final average latency (ms) = {0}", averager));
double timeTakenSeconds = (DateTime.Now.Ticks - _startTime) * 1.0 / (TimeSpan.TicksPerMillisecond * 1000);
_log.Info("Total time taken to receive " + _expectedMessageCount + " messages was " +
timeTakenSeconds + "s, equivalent to " +
(_expectedMessageCount/timeTakenSeconds) + " messages per second");
_finishedEvent.Set(); // Notify main thread to quit.
}
}
}
class Avergager
{
long num = 0;
long sum = 0;
long min = long.MaxValue;
long max = long.MinValue;
public void Add(long item)
{
++num;
sum += item;
if (item < min) min = item;
if (item > max) max = item;
}
public long Average { get { return sum/num; }}
public long Num { get { return num; } }
public override string ToString()
{
return String.Format("average={0} min={1} max={2}", Average, min, max);
}
}
}