/* | |
* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
* | |
*/ | |
using System; | |
using System.Threading; | |
using log4net; | |
using NUnit.Framework; | |
using Apache.Qpid.Messaging; | |
namespace Apache.Qpid.Integration.Tests.testcases | |
{ | |
public class ServiceRequestingClient : BaseMessagingTestFixture | |
{ | |
private const int MESSAGE_SIZE = 1024; | |
private static string MESSAGE_DATA = new string('x', MESSAGE_SIZE); | |
private const int PACK = 100; | |
private const int NUM_MESSAGES = PACK*10; // increase when in standalone | |
private static ILog _log = LogManager.GetLogger(typeof(ServiceRequestingClient)); | |
ManualResetEvent _finishedEvent = new ManualResetEvent(false); | |
private int _expectedMessageCount = NUM_MESSAGES; | |
private long _startTime = 0; | |
private string _commandQueueName = "ServiceQ1"; | |
private IMessagePublisher _publisher; | |
Avergager averager = new Avergager(); | |
private void InitialiseProducer() | |
{ | |
try | |
{ | |
_publisher = _channel.CreatePublisherBuilder() | |
.WithRoutingKey(_commandQueueName) | |
.WithDeliveryMode(DeliveryMode.NonPersistent) | |
.Create(); | |
_publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this in builder? | |
} | |
catch (QpidException e) | |
{ | |
_log.Error("Error: " + e, e); | |
} | |
} | |
[Test] | |
public void SendMessages() | |
{ | |
InitialiseProducer(); | |
string replyQueueName = _channel.GenerateUniqueName(); | |
_channel.DeclareQueue(replyQueueName, false, true, true); | |
IMessageConsumer messageConsumer = _channel.CreateConsumerBuilder(replyQueueName) | |
.WithPrefetchLow(100) | |
.WithPrefetchHigh(200) | |
.WithNoLocal(true) | |
.WithExclusive(true).Create(); | |
_startTime = DateTime.Now.Ticks; | |
messageConsumer.OnMessage = new MessageReceivedDelegate(OnMessage); | |
_connection.Start(); | |
for (int i = 0; i < _expectedMessageCount; i++) | |
{ | |
ITextMessage msg; | |
try | |
{ | |
msg = _channel.CreateTextMessage(MESSAGE_DATA + i); | |
} | |
catch (Exception e) | |
{ | |
_log.Error("Error creating message: " + e, e); | |
break; | |
} | |
msg.ReplyToRoutingKey = replyQueueName; | |
// Added timestamp. | |
long timeNow = DateTime.Now.Ticks; | |
string timeSentString = String.Format("{0:G}", timeNow); | |
msg.Headers.SetLong("timeSent", timeNow); | |
_publisher.Send(msg); | |
} | |
// Assert that the test finishes within a reasonable amount of time. | |
const int waitSeconds = 40; | |
const int waitMilliseconds = waitSeconds * 1000; | |
_log.Info("Finished sending " + _expectedMessageCount + " messages"); | |
_log.Info(String.Format("Waiting {0} seconds to receive last message...", waitSeconds)); | |
Assert.IsTrue(_finishedEvent.WaitOne(waitMilliseconds, false), | |
String.Format("Expected to finish in {0} seconds", waitSeconds)); | |
} | |
public void OnMessage(IMessage m) | |
{ | |
if (_log.IsDebugEnabled) | |
{ | |
_log.Debug("Message received: " + m); | |
} | |
if (!m.Headers.Contains("timeSent")) | |
{ | |
throw new Exception("Set timeSent!"); | |
} | |
long sentAt = m.Headers.GetLong("timeSent"); | |
long now = DateTime.Now.Ticks; | |
long latencyTicks = now - sentAt; | |
long latencyMilliseconds = latencyTicks / TimeSpan.TicksPerMillisecond; | |
averager.Add(latencyMilliseconds); | |
if (averager.Num % PACK == 0) | |
{ | |
_log.Info("Ticks per millisecond = " + TimeSpan.TicksPerMillisecond); | |
_log.Info(String.Format("Average latency (ms) = {0}", averager)); | |
_log.Info("Received message count: " + averager.Num); | |
} | |
if (averager.Num == _expectedMessageCount) | |
{ | |
_log.Info(String.Format("Final average latency (ms) = {0}", averager)); | |
double timeTakenSeconds = (DateTime.Now.Ticks - _startTime) * 1.0 / (TimeSpan.TicksPerMillisecond * 1000); | |
_log.Info("Total time taken to receive " + _expectedMessageCount + " messages was " + | |
timeTakenSeconds + "s, equivalent to " + | |
(_expectedMessageCount/timeTakenSeconds) + " messages per second"); | |
_finishedEvent.Set(); // Notify main thread to quit. | |
} | |
} | |
} | |
class Avergager | |
{ | |
long num = 0; | |
long sum = 0; | |
long min = long.MaxValue; | |
long max = long.MinValue; | |
public void Add(long item) | |
{ | |
++num; | |
sum += item; | |
if (item < min) min = item; | |
if (item > max) max = item; | |
} | |
public long Average { get { return sum/num; }} | |
public long Num { get { return num; } } | |
public override string ToString() | |
{ | |
return String.Format("average={0} min={1} max={2}", Average, min, max); | |
} | |
} | |
} |