blob: ca87197ff335491a5cb2239c090fa08e10635083 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/FailoverUpdates.h>
#include <qpid/Options.h>
#include <qpid/log/Logger.h>
#include <qpid/log/Options.h>
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/SystemInfo.h"
#include <iostream>
#include <memory>
#include <stdlib.h>
using namespace qpid::messaging;
using namespace qpid::types;
using namespace std;
namespace qpid {
namespace tests {
struct Options : public qpid::Options
{
bool help;
std::string url;
std::string address;
std::string connectionOptions;
uint messages;
uint capacity;
uint ackFrequency;
bool failoverUpdates;
qpid::log::Options log;
uint senders;
uint receivers;
uint groupSize;
bool printReport;
std::string groupKey;
bool durable;
bool allowDuplicates;
bool randomizeSize;
bool stickyConsumer;
uint timeout;
uint interleave;
std::string prefix;
uint sendRate;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
help(false),
url("amqp:tcp:127.0.0.1"),
messages(10000),
capacity(1000),
ackFrequency(100),
failoverUpdates(false),
log(argv0),
senders(2),
receivers(2),
groupSize(10),
printReport(false),
groupKey("qpid.no_group"),
durable(false),
allowDuplicates(false),
randomizeSize(false),
stickyConsumer(false),
timeout(10),
interleave(1),
sendRate(0)
{
addOptions()
("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
("address,a", qpid::optValue(address, "ADDRESS"), "address to send and receive from")
("allow-duplicates", qpid::optValue(allowDuplicates), "Ignore the delivery of duplicated messages")
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
("group-key", qpid::optValue(groupKey, "KEY"), "Key of the message header containing the group identifier.")
("group-prefix", qpid::optValue(prefix, "STRING"), "Add 'prefix' to the start of all generated group identifiers.")
("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group.")
("interleave", qpid::optValue(interleave, "N"), "Simultaineously interleave messages from N different groups.")
("messages,m", qpid::optValue(messages, "N"), "Number of messages to send per each sender.")
("receivers,r", qpid::optValue(receivers, "N"), "Number of message consumers.")
("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].")
("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.")
("senders,s", qpid::optValue(senders, "N"), "Number of message producers.")
("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].")
("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers remain idle for timeout seconds.")
("print-report", qpid::optValue(printReport), "Dump message group statistics to stdout.")
("help", qpid::optValue(help), "print this usage statement");
add(log);
//("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
//("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
//("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
}
bool parse(int argc, char** argv)
{
try {
qpid::Options::parse(argc, argv);
if (address.empty()) throw qpid::Exception("Address must be specified!");
if (senders == 0 && receivers == 0) throw qpid::Exception("No senders and No receivers?");
if (messages == 0) throw qpid::Exception("The message count cannot be zero.");
qpid::log::Logger::instance().configure(log);
if (help) {
std::cout << *this << std::endl << std::endl
<< "Verifies the behavior of grouped messages." << std::endl;
return false;
} else {
return true;
}
} catch (const std::exception& e) {
std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
return false;
}
}
};
const string EOS("eos");
const string SN("sn");
// class that monitors group state across all publishers and consumers. tracks the next
// expected sequence for each group, and total messages consumed.
class GroupChecker
{
qpid::sys::Mutex lock;
uint consumerCt;
uint producerCt;
uint totalMsgs;
uint totalMsgsConsumed;
uint totalMsgsPublished;
bool allowDuplicates;
uint duplicateMsgs;
typedef std::map<std::string, uint> SequenceMap;
SequenceMap sequenceMap;
// Statistics - for each group, store the names of all clients that consumed messages
// from that group, and the number of messages consumed per client.
typedef std::map<std::string, uint> ClientCounter;
typedef std::map<std::string, ClientCounter> GroupStatistics;
GroupStatistics statistics;
public:
GroupChecker( uint messages, uint consumers, uint producers, bool d) :
consumerCt(consumers), producerCt(producers),
totalMsgs(0), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d),
duplicateMsgs(0)
{
// if consumering only - we a draining a queue of 'messages' queued messages.
if (producerCt != 0) {
totalMsgs = producers * messages;
} else {
totalMsgs = messages;
}
}
bool checkSequence( const std::string& groupId,
uint sequence, const std::string& client )
{
qpid::sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, "Client " << client << " has received " << groupId << ":" << sequence);
GroupStatistics::iterator gs = statistics.find(groupId);
if (gs == statistics.end()) {
statistics[groupId][client] = 1;
} else {
gs->second[client]++;
}
// now verify
SequenceMap::iterator s = sequenceMap.find(groupId);
if (s == sequenceMap.end()) {
QPID_LOG(debug, "Client " << client << " thinks this is the first message from group " << groupId << ":" << sequence);
// if duplication allowed, it is possible that the last msg(s) of an old sequence are redelivered on reconnect.
// in this case, set the sequence from the first msg.
sequenceMap[groupId] = (allowDuplicates) ? sequence : 0;
s = sequenceMap.find(groupId);
} else if (sequence < s->second) {
duplicateMsgs++;
QPID_LOG(debug, "Client " << client << " thinks this message is a duplicate! " << groupId << ":" << sequence);
return allowDuplicates;
}
totalMsgsConsumed++;
return sequence == s->second++;
}
void sendingSequence( const std::string& groupId,
uint sequence, bool eos,
const std::string& client )
{
qpid::sys::Mutex::ScopedLock l(lock);
++totalMsgsPublished;
QPID_LOG(debug, "Client " << client << " sending " << groupId << ":" << sequence <<
((eos) ? " (last)" : ""));
}
bool eraseGroup( const std::string& groupId, const std::string& name )
{
qpid::sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, "Deleting group " << groupId << " (by client " << name << ")");
return sequenceMap.erase( groupId ) == 1;
}
uint getNextExpectedSequence( const std::string& groupId )
{
qpid::sys::Mutex::ScopedLock l(lock);
return sequenceMap[groupId];
}
bool allMsgsPublished() // true when done publishing msgs
{
qpid::sys::Mutex::ScopedLock l(lock);
return (producerCt == 0 || totalMsgsPublished >= totalMsgs);
}
bool allMsgsConsumed() // true when done consuming msgs
{
qpid::sys::Mutex::ScopedLock l(lock);
return (consumerCt == 0 ||
(totalMsgsConsumed >= totalMsgs && sequenceMap.size() == 0));
}
uint getTotalMessages()
{
return totalMsgs;
}
uint getConsumedTotal()
{
qpid::sys::Mutex::ScopedLock l(lock);
return totalMsgsConsumed;
}
uint getPublishedTotal()
{
qpid::sys::Mutex::ScopedLock l(lock);
return totalMsgsPublished;
}
ostream& print(ostream& out)
{
qpid::sys::Mutex::ScopedLock l(lock);
out << "Total Published: " << totalMsgsPublished << ", Total Consumed: " << totalMsgsConsumed <<
", Duplicates detected: " << duplicateMsgs << std::endl;
out << "Total Groups: " << statistics.size() << std::endl;
unsigned long consumers = 0;
for (GroupStatistics::iterator gs = statistics.begin(); gs != statistics.end(); ++gs) {
out << " GroupId: " << gs->first;
consumers += gs->second.size(); // # of consumers that processed this group
if (gs->second.size() == 1)
out << " completely consumed by a single client." << std::endl;
else
out << " consumed by " << gs->second.size() << " different clients." << std::endl;
for (ClientCounter::iterator cc = gs->second.begin(); cc != gs->second.end(); ++cc) {
out << " Client: " << cc->first << " consumed " << cc->second << " messages from the group." << std::endl;
}
}
out << "Average # of consumers per group: " << ((statistics.size() != 0) ? (double(consumers)/statistics.size()) : 0) << std::endl;
return out;
}
};
namespace {
// rand() is not thread safe. Create a singleton obj to hold a lock while calling
// rand() so it can be called safely by multiple concurrent clients.
class Randomizer {
qpid::sys::Mutex lock;
public:
uint operator()(uint max) {
qpid::sys::Mutex::ScopedLock l(lock);
return (rand() % max) + 1;
}
};
static Randomizer randomizer;
}
// tag each generated message with a group identifer
//
class GroupGenerator {
const std::string groupPrefix;
const uint groupSize;
const bool randomizeSize;
const uint interleave;
uint groupSuffix;
uint total;
struct GroupState {
std::string id;
const uint size;
uint count;
GroupState( const std::string& i, const uint s )
: id(i), size(s), count(0) {}
};
typedef std::list<GroupState> GroupList;
GroupList groups;
GroupList::iterator current;
// add a new group identifier to the list
void newGroup() {
std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate);
groupId << std::string(":") << groupSuffix++;
uint size = (randomizeSize) ? randomizer(groupSize) : groupSize;
QPID_LOG(trace, "New group: GROUPID=[" << groupId.str() << "] size=" << size << " this=" << this);
GroupState group( groupId.str(), size );
groups.push_back( group );
}
public:
GroupGenerator( const std::string& prefix,
const uint t,
const uint size,
const bool randomize,
const uint i)
: groupPrefix(prefix), groupSize(size),
randomizeSize(randomize), interleave(i), groupSuffix(0), total(t)
{
QPID_LOG(trace, "New group generator: PREFIX=[" << prefix << "] total=" << total << " size=" << size << " rand=" << randomize << " interleave=" << interleave << " this=" << this);
for (uint i = 0; i < 1 || i < interleave; ++i) {
newGroup();
}
current = groups.begin();
}
bool genGroup(std::string& groupId, uint& seq, bool& eos)
{
if (!total) return false;
--total;
if (current == groups.end())
current = groups.begin();
groupId = current->id;
seq = current->count++;
if (current->count == current->size) {
QPID_LOG(trace, "Last msg for " << current->id << ", " << current->count << " this=" << this);
eos = true;
if (total >= interleave) { // need a new group to replace this one
newGroup();
groups.erase(current++);
} else ++current;
} else {
++current;
eos = total < interleave; // mark eos on the last message of each group
}
QPID_LOG(trace, "SENDING GROUPID=[" << groupId << "] seq=" << seq << " eos=" << eos << " this=" << this);
return true;
}
};
class Client : public qpid::sys::Runnable
{
public:
typedef boost::shared_ptr<Client> shared_ptr;
enum State {ACTIVE, DONE, FAILURE};
Client( const std::string& n, const Options& o ) : name(n), opts(o), state(ACTIVE), stopped(false) {}
virtual ~Client() {}
State getState() { return state; }
void testFailed( const std::string& reason ) { state = FAILURE; error << "Client '" << name << "' failed: " << reason; }
void clientDone() { if (state == ACTIVE) state = DONE; }
qpid::sys::Thread& getThread() { return thread; }
const std::string getErrorMsg() { return error.str(); }
void stop() {stopped = true;}
const std::string& getName() { return name; }
protected:
const std::string name;
const Options& opts;
qpid::sys::Thread thread;
ostringstream error;
State state;
bool stopped;
};
class Consumer : public Client
{
GroupChecker& checker;
public:
Consumer(const std::string& n, const Options& o, GroupChecker& c ) : Client(n, o), checker(c) {};
virtual ~Consumer() {};
void run()
{
Connection connection;
try {
connection = Connection(opts.url, opts.connectionOptions);
connection.open();
std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = connection.createSession();
Receiver receiver = session.createReceiver(opts.address);
receiver.setCapacity(opts.capacity);
Message msg;
uint count = 0;
while (!stopped) {
if (receiver.fetch(msg, Duration::SECOND)) { // msg retrieved
qpid::types::Variant::Map& properties = msg.getProperties();
std::string groupId = properties[opts.groupKey];
uint groupSeq = properties[SN];
bool eof = properties[EOS];
QPID_LOG(trace, "RECVING GROUPID=[" << groupId << "] seq=" << groupSeq << " eos=" << eof << " name=" << name);
qpid::sys::usleep(10);
if (!checker.checkSequence( groupId, groupSeq, name )) {
ostringstream msg;
msg << "Check sequence failed. Group=" << groupId << " rcvd seq=" << groupSeq << " expected=" << checker.getNextExpectedSequence( groupId );
testFailed( msg.str() );
break;
} else if (eof) {
if (!checker.eraseGroup( groupId, name )) {
ostringstream msg;
msg << "Erase group failed. Group=" << groupId << " rcvd seq=" << groupSeq;
testFailed( msg.str() );
break;
}
}
++count;
if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
session.acknowledge();
}
// Clear out message properties & content for next iteration.
msg = Message(); // TODO aconway 2010-12-01: should be done by fetch
} else if (checker.allMsgsConsumed()) // timed out, nothing else to do?
break;
}
session.acknowledge();
session.close();
connection.close();
} catch(const std::exception& error) {
ostringstream msg;
msg << "consumer error: " << error.what();
testFailed( msg.str() );
connection.close();
}
clientDone();
QPID_LOG(trace, "Consuming client " << name << " completed.");
}
};
class Producer : public Client
{
GroupChecker& checker;
GroupGenerator generator;
public:
Producer(const std::string& n, const Options& o, GroupChecker& c)
: Client(n, o), checker(c),
generator( n, o.messages, o.groupSize, o.randomizeSize, o.interleave )
{};
virtual ~Producer() {};
void run()
{
Connection connection;
try {
connection = Connection(opts.url, opts.connectionOptions);
connection.open();
std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = connection.createSession();
Sender sender = session.createSender(opts.address);
if (opts.capacity) sender.setCapacity(opts.capacity);
Message msg;
msg.setDurable(opts.durable);
std::string groupId;
uint seq;
bool eos;
uint sent = 0;
qpid::sys::AbsTime start = qpid::sys::now();
int64_t interval = 0;
if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
while (!stopped && generator.genGroup(groupId, seq, eos)) {
msg.getProperties()[opts.groupKey] = groupId;
msg.getProperties()[SN] = seq;
msg.getProperties()[EOS] = eos;
checker.sendingSequence( groupId, seq, eos, name );
sender.send(msg);
++sent;
if (opts.sendRate) {
qpid::sys::AbsTime waitTill(start, sent*interval);
int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
}
}
session.sync();
session.close();
connection.close();
} catch(const std::exception& error) {
ostringstream msg;
msg << "producer '" << name << "' error: " << error.what();
testFailed(msg.str());
connection.close();
}
clientDone();
QPID_LOG(trace, "Producing client " << name << " completed.");
}
};
}} // namespace qpid::tests
using namespace qpid::tests;
int main(int argc, char ** argv)
{
int status = 0;
try {
Options opts;
if (opts.parse(argc, argv)) {
GroupChecker state( opts.messages,
opts.receivers,
opts.senders,
opts.allowDuplicates);
std::vector<Client::shared_ptr> clients;
if (opts.randomizeSize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId());
// fire off the producers && consumers
for (size_t j = 0; j < opts.senders; ++j) {
ostringstream name;
name << opts.prefix << "P_" << j;
clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state )));
clients.back()->getThread() = qpid::sys::Thread(*clients.back());
}
for (size_t j = 0; j < opts.receivers; ++j) {
ostringstream name;
name << opts.prefix << "C_" << j;
clients.push_back(Client::shared_ptr(new Consumer( name.str(), opts, state )));
clients.back()->getThread() = qpid::sys::Thread(*clients.back());
}
// wait for all pubs/subs to finish.... or for consumers to fail or stall.
uint stalledTime = 0;
bool clientFailed = false;
while (!clientFailed && (!state.allMsgsPublished() || !state.allMsgsConsumed())) {
uint lastCount;
lastCount = state.getConsumedTotal();
qpid::sys::usleep( 1000000 );
// check each client for failures
for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
i != clients.end(); ++i) {
QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState());
if ((*i)->getState() == Client::FAILURE) {
QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg());
clientFailed = true;
break; // exit test.
}
}
// check for stalled consumers
if (!clientFailed && !state.allMsgsConsumed()) {
if (lastCount == state.getConsumedTotal()) {
if (++stalledTime >= opts.timeout) {
clientFailed = true;
break; // exit test
}
} else {
stalledTime = 0;
}
}
QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() <<
" Published to date = " << state.getPublishedTotal() <<
" total=" << state.getTotalMessages());
}
if (clientFailed) {
if (stalledTime >= opts.timeout) {
QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." );
status = 2;
} else {
status = 1;
}
}
// Wait for started threads.
for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
i != clients.end(); ++i) {
(*i)->stop();
(*i)->getThread().join();
}
if (opts.printReport && !status) state.print(std::cout);
} else status = 4;
} catch(const std::exception& error) {
QPID_LOG(error, argv[0] << ": " << error.what());
status = 3;
}
QPID_LOG(trace, "TEST DONE [" << status << "]");
return status;
}