blob: da0f764bcd1d65bc04b143f47efb369589c7dd22 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
using System;
using System.Threading;
using log4net;
using NUnit.Framework;
using Apache.Qpid.Messaging;
namespace Apache.Qpid.Integration.Tests.testcases
public class ServiceRequestingClient : BaseMessagingTestFixture
private const int MESSAGE_SIZE = 1024;
private static string MESSAGE_DATA = new string('x', MESSAGE_SIZE);
private const int PACK = 100;
private const int NUM_MESSAGES = PACK*10; // increase when in standalone
private static ILog _log = LogManager.GetLogger(typeof(ServiceRequestingClient));
ManualResetEvent _finishedEvent = new ManualResetEvent(false);
private int _expectedMessageCount = NUM_MESSAGES;
private long _startTime = 0;
private string _commandQueueName = "ServiceQ1";
private IMessagePublisher _publisher;
Avergager averager = new Avergager();
private void InitialiseProducer()
_publisher = _channel.CreatePublisherBuilder()
_publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this in builder?
catch (QpidException e)
_log.Error("Error: " + e, e);
public void SendMessages()
string replyQueueName = _channel.GenerateUniqueName();
_channel.DeclareQueue(replyQueueName, false, true, true);
IMessageConsumer messageConsumer = _channel.CreateConsumerBuilder(replyQueueName)
_startTime = DateTime.Now.Ticks;
messageConsumer.OnMessage = new MessageReceivedDelegate(OnMessage);
for (int i = 0; i < _expectedMessageCount; i++)
ITextMessage msg;
msg = _channel.CreateTextMessage(MESSAGE_DATA + i);
catch (Exception e)
_log.Error("Error creating message: " + e, e);
msg.ReplyToRoutingKey = replyQueueName;
// Added timestamp.
long timeNow = DateTime.Now.Ticks;
string timeSentString = String.Format("{0:G}", timeNow);
msg.Headers.SetLong("timeSent", timeNow);
// Assert that the test finishes within a reasonable amount of time.
const int waitSeconds = 40;
const int waitMilliseconds = waitSeconds * 1000;
_log.Info("Finished sending " + _expectedMessageCount + " messages");
_log.Info(String.Format("Waiting {0} seconds to receive last message...", waitSeconds));
Assert.IsTrue(_finishedEvent.WaitOne(waitMilliseconds, false),
String.Format("Expected to finish in {0} seconds", waitSeconds));
public void OnMessage(IMessage m)
if (_log.IsDebugEnabled)
_log.Debug("Message received: " + m);
if (!m.Headers.Contains("timeSent"))
throw new Exception("Set timeSent!");
long sentAt = m.Headers.GetLong("timeSent");
long now = DateTime.Now.Ticks;
long latencyTicks = now - sentAt;
long latencyMilliseconds = latencyTicks / TimeSpan.TicksPerMillisecond;
if (averager.Num % PACK == 0)
_log.Info("Ticks per millisecond = " + TimeSpan.TicksPerMillisecond);
_log.Info(String.Format("Average latency (ms) = {0}", averager));
_log.Info("Received message count: " + averager.Num);
if (averager.Num == _expectedMessageCount)
_log.Info(String.Format("Final average latency (ms) = {0}", averager));
double timeTakenSeconds = (DateTime.Now.Ticks - _startTime) * 1.0 / (TimeSpan.TicksPerMillisecond * 1000);
_log.Info("Total time taken to receive " + _expectedMessageCount + " messages was " +
timeTakenSeconds + "s, equivalent to " +
(_expectedMessageCount/timeTakenSeconds) + " messages per second");
_finishedEvent.Set(); // Notify main thread to quit.
class Avergager
long num = 0;
long sum = 0;
long min = long.MaxValue;
long max = long.MinValue;
public void Add(long item)
sum += item;
if (item < min) min = item;
if (item > max) max = item;
public long Average { get { return sum/num; }}
public long Num { get { return num; } }
public override string ToString()
return String.Format("average={0} min={1} max={2}", Average, min, max);