blob: c94dd865d5f428e1af897e7e414cb78cf7984620 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using common.org.apache.qpid.transport.util;
using org.apache.qpid.client;
using org.apache.qpid.transport;
using org.apache.qpid.transport.util;
using Plossum.CommandLine;
namespace PerfTest
{
[CommandLineManager(ApplicationName = "Qpid Perf Tests", Copyright = "Apache Software Foundation")]
public class Options
{
[CommandLineOption(Description = "Displays this help text")]
public bool Help;
[CommandLineOption(Description = "Create shared queues.", MinOccurs = 0)]
public Boolean Setup;
[CommandLineOption(Description = "Run test, print report.", MinOccurs = 0)]
public Boolean Control;
[CommandLineOption(Description = "Publish messages.", MinOccurs = 0)]
public Boolean Publish;
[CommandLineOption(Description = "Subscribe for messages.", MinOccurs = 0)]
public Boolean Subscribe;
[CommandLineOption(Description = "Test mode: [shared|fanout|topic]", MinOccurs = 0)]
public string Mode
{
get { return _mMode; }
set
{
if (! value.Equals("shared") && ! value.Equals("fanout") && ! value.Equals("topic"))
throw new InvalidOptionValueException(
"The mode must not be shared|fanout|topic", false);
_mMode = value;
}
}
private string _mMode = "shared";
[CommandLineOption(Description = "Specifies the broler name", MinOccurs = 0)]
public string Broker
{
get { return _broker; }
set
{
if (String.IsNullOrEmpty(value))
throw new InvalidOptionValueException(
"The broker name must not be empty", false);
_broker = value;
}
}
private string _broker = "localhost";
[CommandLineOption(Description = "Specifies the port name", MinOccurs = 0)]
public int Port
{
get { return _port; }
set { _port = value; }
}
private int _port = 5672;
#region Publisher
[CommandLineOption(Description = "Create N publishers.", MinOccurs = 0)]
public int Pubs
{
get { return _pubs; }
set { _pubs = value; }
}
private int _pubs = 1;
[CommandLineOption(Description = "Each publisher sends N messages.", MinOccurs = 0)]
public double Count
{
get { return _count; }
set { _count = value; }
}
private double _count = 5000;
[CommandLineOption(Description = "Size of messages in bytes.", MinOccurs = 0)]
public long Size
{
get { return _size; }
set { _size = value; }
}
private long _size = 1024;
[CommandLineOption(Description = "Publisher use confirm-mode.", MinOccurs = 0)]
public Boolean Confirm = true;
[CommandLineOption(Description = "Publish messages as durable.", MinOccurs = 0)]
public Boolean Durable;
[CommandLineOption(Description = "Make data for each message unique.", MinOccurs = 0)]
public Boolean UniqueData;
[CommandLineOption(Description = "Wait for confirmation of each message before sending the next one.",
MinOccurs = 0)]
public Boolean SyncPub;
[CommandLineOption(Description = ">=0 delay between msg publish.", MinOccurs = 0)]
public double IntervalPub
{
get { return _interval_pub; }
set { _interval_pub = value; }
}
private double _interval_pub;
#endregion
#region Subscriber
[CommandLineOption(Description = "Create N subscribers.", MinOccurs = 0)]
public int Subs
{
get { return _subs; }
set { _subs = value; }
}
private int _subs = 1;
[CommandLineOption(Description = "N>0: Subscriber acks batches of N.\n N==0: Subscriber uses unconfirmed mode",
MinOccurs = 0)]
public int SubAck
{
get { return _suback; }
set { _suback = value; }
}
private int _suback;
[CommandLineOption(Description = ">=0 delay between msg consume", MinOccurs = 0)]
public double IntervalSub
{
get { return _interval_sub; }
set { _interval_sub = value; }
}
private double _interval_sub;
#endregion
[CommandLineOption(Description = "Create N queues.", MinOccurs = 0)]
public int Queues
{
get { return _qt; }
set { _qt = value; }
}
private int _qt = 1;
[CommandLineOption(Description = "Desired number of iterations of the test.", MinOccurs = 0)]
public int Iterations
{
get { return _iterations; }
set { _iterations = value; }
}
private int _iterations = 1;
[CommandLineOption(Description = "If non-zero, the transaction batch size.", MinOccurs = 0)]
public int Tx
{
get { return _tx; }
set { _tx = value; }
}
private int _tx;
[CommandLineOption(Description = "Make queue durable (implied if durable set.", MinOccurs = 0)]
public Boolean QueueDurable;
[CommandLineOption(Description = "Queue policy: count to trigger 'flow to disk'", MinOccurs = 0)]
public double QueueMaxCount
{
get { return _queueMaxCount; }
set { _queueMaxCount = value; }
}
private double _queueMaxCount;
[CommandLineOption(Description = "Queue policy: accumulated size to trigger 'flow to disk'", MinOccurs = 0)]
public double QueueMaxSize
{
get { return _queueMaxSize; }
set { _queueMaxSize = value; }
}
private double _queueMaxSize;
public double SubQuota
{
get { return _subQuota; }
set { _subQuota = value; }
}
private double _subQuota;
}
internal interface Startable
{
void Start();
}
public abstract class PerfTestClient : Startable
{
private readonly IClient _connection;
private readonly IClientSession _session;
private readonly Options _options;
public IClientSession Session
{
get { return _session; }
}
public Options Options
{
get { return _options; }
}
protected PerfTestClient(Options options)
{
_options = options;
_connection = new Client();
_connection.Connect(options.Broker, options.Port, "test", "guest", "guest");
_session = _connection.CreateSession(50000);
}
public abstract void Start();
}
public class SetupTest : PerfTestClient
{
public SetupTest(Options options)
: base(options)
{
}
private void queueInit(String name, Boolean durable, Dictionary<String, Object> arguments)
{
Session.QueueDeclare(name, null, arguments, durable ? Option.DURABLE : Option.NONE);
Session.QueuePurge(name);
Session.ExchangeBind(name, "amq.direct", name);
Session.Sync();
}
public override void Start()
{
queueInit("pub_start", false, null);
queueInit("pub_done", false, null);
queueInit("sub_ready", false, null);
queueInit("sub_done", false, null);
if (Options.Mode.Equals("shared"))
{
Dictionary<String, Object> settings = new Dictionary<string, object>();
if (Options.QueueMaxCount > 0)
settings.Add("qpid.max_count", Options.QueueMaxCount);
if (Options.QueueMaxSize > 0)
settings.Add("qpid.max_size", Options.QueueMaxSize);
for (int i = 0; i < Options.Queues; ++i)
{
string qname = "perftest" + i;
queueInit(qname, Options.Durable || Options.QueueDurable, settings);
}
}
}
}
public class SubscribeThread : PerfTestClient
{
private readonly string _queue;
public SubscribeThread(Options options, string key, string exchange)
: base(options)
{
_queue = "perftest" + (new UUID(10, 10));
Session.QueueDeclare(_queue, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE,
Options.Durable ? Option.DURABLE : Option.NONE);
Session.ExchangeBind(_queue, exchange, key);
}
public SubscribeThread(Options options, string key)
: base(options)
{
_queue = key;
}
public override void Start()
{
if (Options.Tx > 0)
{
Session.TxSelect();
Session.Sync();
}
CircularBuffer<IMessage> buffer = new CircularBuffer<IMessage>(100);
// Create a listener and subscribe it to the queue named "message_queue"
IMessageListener listener = new SyncListener(buffer);
string dest = "dest" + UUID.RandomUuid();
Session.AttachMessageListener(listener, dest);
Session.MessageSubscribe(_queue, dest,
Options.Tx > 0 || Options.SubAck > 0
? MessageAcceptMode.EXPLICIT
: MessageAcceptMode.NONE,
MessageAcquireMode.PRE_ACQUIRED, null, 0, null);
// issue credits
Session.MessageSetFlowMode(dest, MessageFlowMode.WINDOW);
Session.MessageFlow(dest, MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES);
// Notify controller we are ready.
IMessage message = new Message();
message.DeliveryProperties.SetRoutingKey("sub_ready");
message.AppendData(Encoding.UTF8.GetBytes("ready"));
Session.MessageTransfer("amq.direct", message);
if (Options.Tx > 0)
{
Session.TxCommit();
Session.Sync();
}
for (int j = 0; j < Options.Iterations; ++j)
{
//need to allocate some more credit
Session.MessageFlow(dest, MessageCreditUnit.MESSAGE, (long)Options.SubQuota);
RangeSet range = new RangeSet();
IMessage msg;
DateTime start = DateTime.Now;
for (long i = 0; i < Options.SubQuota; ++i)
{
msg = buffer.Dequeue();
if (Options.Tx > 0 && ((i + 1)%Options.Tx == 0))
{
Session.TxCommit();
Session.Sync();
}
if (Options.IntervalSub > 0)
{
Thread.Sleep((int) Options.IntervalSub*1000);
}
range.Add(msg.Id);
}
if (Options.Tx > 0 || Options.SubAck > 0)
Session.MessageAccept(range);
range.Clear();
if (Options.Tx > 0)
{
Session.TxSelect();
Session.Sync();
}
DateTime end = DateTime.Now;
// Report to publisher.
message.DeliveryProperties.SetRoutingKey("sub_done");
message.ClearData();
message.AppendData(BitConverter.GetBytes(Options.SubQuota / end.Subtract(start).TotalMilliseconds ));
Session.MessageTransfer("amq.direct", message);
if (Options.Tx > 0)
{
Session.TxSelect();
Session.Sync();
}
}
Session.Close();
}
}
public class SyncListener : IMessageListener
{
private readonly CircularBuffer<IMessage> _buffer;
public SyncListener(CircularBuffer<IMessage> buffer)
{
_buffer = buffer;
}
public void MessageTransfer(IMessage m)
{
_buffer.Enqueue(m);
}
}
public class PublishThread : PerfTestClient
{
private readonly string _exchange;
private readonly string _key;
public PublishThread(Options options, string key, string exchange)
: base(options)
{
_key = key;
_exchange = exchange;
}
public override void Start()
{
byte[] data = new byte[Options.Size];
// randomly populate data
Random r = new Random(34);
r.NextBytes(data);
IMessage message = new Message();
message.AppendData(data);
message.DeliveryProperties.SetRoutingKey(_key);
if (Options.Durable)
message.DeliveryProperties.SetDeliveryMode(MessageDeliveryMode.PERSISTENT);
if (Options.Tx > 0)
{
Session.TxSelect();
Session.Sync();
}
CircularBuffer<IMessage> buffer = new CircularBuffer<IMessage>(100);
// Create a listener and subscribe it to the queue named "pub_start"
IMessageListener listener = new SyncListener(buffer);
string localQueue = "localQueue-" + UUID.RandomUuid().ToString();
Session.QueueDeclare(localQueue, null, null, Option.AUTO_DELETE);
Session.ExchangeBind(localQueue, "amq.direct", "pub_start");
Session.AttachMessageListener(listener, localQueue);
Session.MessageSubscribe(localQueue);
if (Options.Tx > 0)
{
Session.TxCommit();
Session.Sync();
}
buffer.Dequeue();
for (int j = 0; j < Options.Iterations; ++j)
{
DateTime start = DateTime.Now;
for (long i = 0; i < Options.Count; ++i)
{
Session.MessageTransfer(_exchange, message);
if (Options.SyncPub)
{
Session.Sync();
}
if (Options.Tx > 0 && (i + 1)%Options.Tx == 0)
{
Session.TxSelect();
Session.Sync();
}
if (Options.IntervalPub > 0)
{
Thread.Sleep((int) Options.IntervalSub*1000);
}
}
Session.Sync();
DateTime end = DateTime.Now;
// Report to publisher.
message.DeliveryProperties.SetRoutingKey("pub_done");
message.ClearData();
double time = end.Subtract(start).TotalMilliseconds;
byte[] rate = BitConverter.GetBytes( Options.Count / time );
message.AppendData(rate);
Session.MessageTransfer("amq.direct", message);
if (Options.Tx > 0)
{
Session.TxSelect();
Session.Sync();
}
}
Session.Close();
}
}
public class Controller : PerfTestClient
{
public Controller(Options options)
: base(options)
{
}
private void process(int size, string queue)
{
CircularBuffer<IMessage> buffer = new CircularBuffer<IMessage>(100);
IMessageListener listener = new SyncListener(buffer);
string localQueue = "queue-" + UUID.RandomUuid();
Session.QueueDeclare(localQueue, null, null, Option.AUTO_DELETE);
Session.ExchangeBind(localQueue, "amq.direct", queue);
Session.AttachMessageListener(listener, localQueue);
Session.MessageSubscribe(localQueue);
for (int i = 0; i < size; ++i)
{
buffer.Dequeue();
}
}
private double processRate(int size, string queue)
{
CircularBuffer<IMessage> buffer = new CircularBuffer<IMessage>(100);
IMessageListener listener = new SyncListener(buffer);
string localQueue = "queue-" + UUID.RandomUuid();
Session.QueueDeclare(localQueue, null, null, Option.AUTO_DELETE);
Session.ExchangeBind(localQueue, "amq.direct", queue);
Session.AttachMessageListener(listener, localQueue);
Session.MessageSubscribe(localQueue);
double rate = 0;
RangeSet range = new RangeSet();
for (int i = 0; i < size; ++i)
{
IMessage m = buffer.Dequeue();
range.Add(m.Id);
BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8);
byte[] body = new byte[m.Body.Length - m.Body.Position];
reader.Read(body, 0, body.Length);
rate += BitConverter.ToDouble(body,0);
}
Session.MessageAccept(range);
return rate;
}
private void send(int size, string queue, string data)
{
IMessage message = new Message();
message.DeliveryProperties.SetRoutingKey(queue);
message.AppendData(Encoding.UTF8.GetBytes(data));
for (int i = 0; i < size; ++i)
{
Session.MessageTransfer("amq.direct", message);
}
}
public override void Start()
{
process(Options.Subs, "sub_ready");
for (int j = 0; j < Options.Iterations; ++j)
{
DateTime start = DateTime.Now;
send(Options.Pubs, "pub_start", "start"); // Start publishers
double pubrate = processRate(Options.Pubs, "pub_done");
double subrate = processRate(Options.Subs, "sub_done");
DateTime end = DateTime.Now;
double transfers = (Options.Pubs*Options.Count) + (Options.Subs*Options.SubQuota);
double time = end.Subtract(start).TotalSeconds;
double txrate = transfers/time;
double mbytes = (txrate*Options.Size) / (1024 * 1024) ;
Console.WriteLine("Total: " + transfers + " transfers of " + Options.Size + " bytes in "
+ time + " seconds.\n" +
"Publish transfers/sec: " + pubrate * 1000 + "\n" +
"Subscribe transfers/sec: " + subrate * 1000 + "\n" +
"Total transfers/sec: " + txrate + "\n" +
"Total Mbytes/sec: " + mbytes);
Console.WriteLine("done");
}
}
}
public class PerfTest
{
private static int Main(string[] args)
{
Options options = new Options();
CommandLineParser parser = new CommandLineParser(options);
parser.Parse();
if (parser.HasErrors)
{
Console.WriteLine(parser.UsageInfo.GetErrorsAsString(78));
return -1;
}
if (options.Help)
{
Console.WriteLine(parser.UsageInfo.GetOptionsAsString(78));
return 0;
}
bool singleProcess =
(!options.Setup && !options.Control && !options.Publish && !options.Subscribe);
if (singleProcess)
{
options.Setup = options.Control = options.Publish = true;
options.Subscribe = true;
}
string exchange = "amq.direct";
switch (options.Mode)
{
case "shared":
options.SubQuota = (options.Pubs*options.Count)/options.Subs;
break;
case "fanout":
options.SubQuota = (options.Pubs*options.Count);
exchange = "amq.fanout";
break;
case "topic":
options.SubQuota = (options.Pubs*options.Count);
exchange = "amq.topic";
break;
}
if (options.Setup)
{
SetupTest setup = new SetupTest(options);
setup.Start(); // Set up queues
}
Thread contT = null;
if ( options.Control)
{
Controller c = new Controller(options);
contT = new Thread(c.Start);
contT.Start();
}
Thread[] publishers = null;
Thread[] subscribers = null;
// Start pubs/subs for each queue/topic.
for (int i = 0; i < options.Queues; ++i)
{
string key = "perftest" + i; // Queue or topic name.
if (options.Publish)
{
int n = singleProcess ? options.Pubs : 1;
publishers = new Thread[n];
for (int j = 0; j < n; ++j)
{
PublishThread pt = new PublishThread(options, key, exchange);
publishers[i] = new Thread(pt.Start);
publishers[i].Start();
}
}
if ( options.Subscribe)
{
int n = singleProcess ? options.Subs : 1;
subscribers = new Thread[n];
for (int j = 0; j < n; ++j)
{
SubscribeThread st;
if (options.Mode.Equals("shared"))
st = new SubscribeThread(options, key);
else
st = new SubscribeThread(options, key, exchange);
subscribers[i] = new Thread(st.Start);
subscribers[i].Start();
}
}
}
if (options.Control)
{
contT.Join();
}
// Wait for started threads.
if (options.Publish)
{
foreach (Thread t in publishers)
{
t.Join();
}
}
if (options.Subscribe)
{
foreach (Thread t in subscribers)
{
t.Join();
}
}
return 0;
}
}
}