blob: f02a4847501f2fcff30ea2a139e87ed8cf33da11 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/sys/Runnable.h>
#include <qpid/sys/Thread.h>
#include <qpid/sys/Time.h>
#include <qpid/Options.h>
#include <iostream>
#include <string>
using namespace qpid::messaging;
using namespace qpid::types;
namespace qpid {
namespace tests {
struct Args : public qpid::Options
{
std::string url;
std::string address;
uint size;
uint rate;
bool durable;
uint receiverCapacity;
uint senderCapacity;
uint ackFrequency;
Args() :
url("amqp:tcp:127.0.0.1:5672"),
address("test-queue"),
size(512),
rate(1000),
durable(false),
receiverCapacity(0),
senderCapacity(0),
ackFrequency(1)
{
addOptions()
("url", qpid::optValue(url, "URL"), "Url to connect to.")
("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.")
("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).")
("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.")
("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.")
("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)")
("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)")
("ack-frequency", qpid::optValue(ackFrequency, "N"),
"Ack frequency (0 implies none of the messages will get accepted)");
}
};
Args opts;
const std::string TS = "ts";
uint64_t timestamp(const qpid::sys::AbsTime& time)
{
qpid::sys::Duration t(qpid::sys::EPOCH, time);
return t;
}
struct Client : qpid::sys::Runnable
{
virtual ~Client() {}
virtual void doWork(Session&) = 0;
void run()
{
Connection connection(opts.url);
try {
connection.open();
Session session = connection.createSession();
doWork(session);
session.close();
connection.close();
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
connection.close();
}
}
qpid::sys::Thread thread;
void start() { thread = qpid::sys::Thread(this); }
void join() { thread.join(); }
};
struct Publish : Client
{
void doWork(Session& session)
{
Sender sender = session.createSender(opts.address);
if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity);
Message msg(std::string(opts.size, 'X'));
uint64_t interval = qpid::sys::TIME_SEC / opts.rate;
uint64_t sent = 0, missedRate = 0;
qpid::sys::AbsTime start = qpid::sys::now();
while (true) {
qpid::sys::AbsTime sentAt = qpid::sys::now();
msg.getProperties()[TS] = timestamp(sentAt);
sender.send(msg);
++sent;
qpid::sys::AbsTime waitTill(start, sent*interval);
qpid::sys::Duration delay(sentAt, waitTill);
if (delay < 0) {
++missedRate;
} else {
qpid::sys::usleep(delay / qpid::sys::TIME_USEC);
}
}
}
};
struct Consume : Client
{
void doWork(Session& session)
{
Message msg;
uint64_t received = 0;
double minLatency = std::numeric_limits<double>::max();
double maxLatency = 0;
double totalLatency = 0;
Receiver receiver = session.createReceiver(opts.address);
if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity);
while (receiver.fetch(msg)) {
++received;
if (opts.ackFrequency && (received % opts.ackFrequency == 0)) {
session.acknowledge();
}
//calculate latency
uint64_t receivedAt = timestamp(qpid::sys::now());
uint64_t sentAt = msg.getProperties()[TS].asUint64();
double latency = ((double) (receivedAt - sentAt)) / qpid::sys::TIME_MSEC;
//update avg, min & max
minLatency = std::min(minLatency, latency);
maxLatency = std::max(maxLatency, latency);
totalLatency += latency;
if (received % opts.rate == 0) {
std::cout << "count=" << received
<< ", avg=" << (totalLatency/received)
<< ", min=" << minLatency
<< ", max=" << maxLatency << std::endl;
}
}
}
};
}} // namespace qpid::tests
using namespace qpid::tests;
int main(int argc, char** argv)
{
try {
opts.parse(argc, argv);
Publish publish;
Consume consume;
publish.start();
consume.start();
consume.join();
publish.join();
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
}
return 1;
}