| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "TestOptions.h" |
| #include "qpid/OptionsTemplates.h" |
| #include "qpid/client/AsyncSession.h" |
| #include "qpid/client/SubscriptionManager.h" |
| #include "qpid/client/Connection.h" |
| #include "qpid/client/Completion.h" |
| #include "qpid/client/Message.h" |
| #include "qpid/framing/FieldTable.h" |
| #include "qpid/sys/Time.h" |
| #include "qpid/sys/Thread.h" |
| |
| #include <boost/lexical_cast.hpp> |
| #include <boost/bind.hpp> |
| #include <boost/function.hpp> |
| #include <boost/ptr_container/ptr_vector.hpp> |
| |
| #include <iostream> |
| #include <sstream> |
| #include <numeric> |
| #include <algorithm> |
| #include <math.h> |
| |
| |
| using namespace std; |
| using namespace qpid; |
| using namespace client; |
| using namespace sys; |
| using boost::lexical_cast; |
| using boost::bind; |
| |
| namespace qpid { |
| namespace tests { |
| |
| enum Mode { SHARED, FANOUT, TOPIC }; |
| const char* modeNames[] = { "shared", "fanout", "topic" }; |
| |
| // istream/ostream ops so Options can read/display Mode. |
| istream& operator>>(istream& in, Mode& mode) { |
| string s; |
| in >> s; |
| int i = find(modeNames, modeNames+3, s) - modeNames; |
| if (i >= 3) throw Exception("Invalid mode: "+s); |
| mode = Mode(i); |
| return in; |
| } |
| |
| ostream& operator<<(ostream& out, Mode mode) { |
| return out << modeNames[mode]; |
| } |
| |
| struct Opts : public TestOptions { |
| |
| // Actions |
| bool setup, control, publish, subscribe; |
| |
| // Queue policy |
| uint32_t queueMaxCount; |
| uint64_t queueMaxSize; |
| std::string baseName; |
| bool queueDurable; |
| |
| // Publisher |
| size_t pubs; |
| size_t count ; |
| size_t size; |
| size_t headers; |
| bool confirm; |
| bool durable; |
| bool uniqueData; |
| bool syncPub; |
| |
| // Subscriber |
| size_t subs; |
| size_t ack; |
| |
| // General |
| size_t qt; |
| bool singleConnect; |
| size_t iterations; |
| Mode mode; |
| bool summary; |
| uint32_t intervalSub; |
| uint32_t intervalPub; |
| size_t tx; |
| size_t txPub; |
| size_t txSub; |
| bool commitAsync; |
| |
| static const std::string helpText; |
| |
| Opts() : |
| TestOptions(helpText), |
| setup(false), control(false), publish(false), subscribe(false), baseName("qpid-perftest"), |
| pubs(1), count(500000), size(1024), headers(0), confirm(true), durable(false), uniqueData(false), syncPub(false), |
| subs(1), ack(0), |
| qt(1),singleConnect(false), iterations(1), mode(SHARED), summary(false), |
| intervalSub(0), intervalPub(0), tx(0), txPub(0), txSub(0), commitAsync(false) |
| { |
| addOptions() |
| ("setup", optValue(setup), "Create shared queues.") |
| ("control", optValue(control), "Run test, print report.") |
| ("publish", optValue(publish), "Publish messages.") |
| ("subscribe", optValue(subscribe), "Subscribe for messages.") |
| |
| ("mode", optValue(mode, "shared|fanout|topic"), "Test mode." |
| "\nshared: --qt queues, --npubs publishers and --nsubs subscribers per queue.\n" |
| "\nfanout: --npubs publishers, --nsubs subscribers, fanout exchange." |
| "\ntopic: --qt topics, --npubs publishers and --nsubs subscribers per topic.\n") |
| |
| ("npubs", optValue(pubs, "N"), "Create N publishers.") |
| ("count", optValue(count, "N"), "Each publisher sends N messages.") |
| ("size", optValue(size, "BYTES"), "Size of messages in bytes.") |
| ("headers", optValue(headers, "N"), "Number of headers to add to each message.") |
| ("pub-confirm", optValue(confirm, "yes|no"), "Publisher use confirm-mode.") |
| ("durable", optValue(durable, "yes|no"), "Publish messages as durable.") |
| ("unique-data", optValue(uniqueData, "yes|no"), "Make data for each message unique.") |
| ("sync-publish", optValue(syncPub, "yes|no"), "Wait for confirmation of each message before sending the next one.") |
| |
| ("nsubs", optValue(subs, "N"), "Create N subscribers.") |
| ("sub-ack", optValue(ack, "N"), "N>0: Subscriber acks batches of N.\n" |
| "N==0: Subscriber uses unconfirmed mode") |
| |
| ("qt", optValue(qt, "N"), "Create N queues or topics.") |
| ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.") |
| |
| ("iterations", optValue(iterations, "N"), "Desired number of iterations of the test.") |
| ("summary,s", optValue(summary), "Summary output: pubs/sec subs/sec transfers/sec Mbytes/sec") |
| |
| ("queue-max-count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'") |
| ("queue-max-size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'") |
| ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics") |
| ("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)") |
| |
| ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume") |
| ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish") |
| |
| ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size for publishing and consuming") |
| ("pub-tx", optValue(txPub, "N"), "if non-zero, the transaction batch size for publishing") |
| ("async-commit", optValue(commitAsync, "yes|no"), "Don't wait for completion of commit") |
| ("sub-tx", optValue(txSub, "N"), "if non-zero, the transaction batch size for consuming"); |
| } |
| |
| // Computed values |
| size_t totalPubs; |
| size_t totalSubs; |
| size_t transfers; |
| size_t subQuota; |
| |
| void parse(int argc, char** argv) { |
| TestOptions::parse(argc, argv); |
| switch (mode) { |
| case SHARED: |
| if (subs != 0 && count % subs) { |
| count += subs - (count % subs); |
| cout << "WARNING: Adjusted --count to " << count |
| << " the next multiple of --nsubs" << endl; |
| } |
| totalPubs = pubs*qt; |
| totalSubs = subs*qt; |
| subQuota = subs == 0 ? subs:(pubs*count)/subs; |
| break; |
| case FANOUT: |
| if (qt != 1) cerr << "WARNING: Fanout mode, ignoring --qt=" |
| << qt << endl; |
| qt=1; |
| totalPubs = pubs; |
| totalSubs = subs; |
| subQuota = totalPubs*count; |
| break; |
| case TOPIC: |
| totalPubs = pubs*qt; |
| totalSubs = subs*qt; |
| subQuota = pubs*count; |
| break; |
| } |
| transfers=(totalPubs*count) + (totalSubs*subQuota); |
| if (tx) { |
| if (txPub) { |
| cerr << "WARNING: Using overriden tx value for publishers: " << txPub << std::endl; |
| } else { |
| txPub = tx; |
| } |
| if (txSub) { |
| cerr << "WARNING: Using overriden tx value for subscribers: " << txSub << std::endl; |
| } else { |
| txSub = tx; |
| } |
| } |
| } |
| }; |
| |
| const std::string Opts::helpText= |
| "There are two ways to use qpid-perftest: single process or multi-process.\n\n" |
| "If none of the --setup, --publish, --subscribe or --control options\n" |
| "are given qpid-perftest will run a single-process test.\n" |
| "For a multi-process test first run:\n" |
| " qpid-perftest --setup <other options>\n" |
| "and wait for it to complete. The remaining process should run concurrently::\n" |
| "Run --npubs times: qpid-perftest --publish <other options>\n" |
| "Run --nsubs times: qpid-perftest --subscribe <other options>\n" |
| "Run once: qpid-perftest --control <other options>\n" |
| "Note the <other options> must be identical for all processes.\n"; |
| |
| Opts opts; |
| Connection globalConnection; |
| |
| std::string fqn(const std::string& name) |
| { |
| ostringstream fqn; |
| fqn << opts.baseName << "_" << name; |
| return fqn.str(); |
| } |
| |
| struct Client : public Runnable { |
| Connection* connection; |
| Connection localConnection; |
| AsyncSession session; |
| Thread thread; |
| |
| Client() { |
| if (opts.singleConnect){ |
| connection = &globalConnection; |
| if (!globalConnection.isOpen()) opts.open(globalConnection); |
| }else{ |
| connection = &localConnection; |
| opts.open(localConnection); |
| } |
| session = connection->newSession(); |
| } |
| |
| ~Client() { |
| try { |
| if (connection->isOpen()) { |
| session.close(); |
| connection->close(); |
| } |
| } catch (const std::exception& e) { |
| std::cerr << "Error in shutdown: " << e.what() << std::endl; |
| } |
| } |
| }; |
| |
| struct Setup : public Client { |
| |
| void queueInit(string name, bool durable=false, const framing::FieldTable& settings=framing::FieldTable()) { |
| session.queueDeclare(arg::queue=name, arg::durable=durable, arg::arguments=settings); |
| session.queuePurge(arg::queue=name); |
| session.sync(); |
| } |
| |
| void run() { |
| queueInit(fqn("pub_start")); |
| queueInit(fqn("pub_done")); |
| queueInit(fqn("sub_ready")); |
| queueInit(fqn("sub_done")); |
| if (opts.iterations > 1) queueInit(fqn("sub_iteration")); |
| if (opts.mode==SHARED) { |
| framing::FieldTable settings;//queue policy settings |
| settings.setInt("qpid.max_count", opts.queueMaxCount); |
| settings.setInt("qpid.max_size", opts.queueMaxSize); |
| for (size_t i = 0; i < opts.qt; ++i) { |
| ostringstream qname; |
| qname << opts.baseName << i; |
| queueInit(qname.str(), opts.durable || opts.queueDurable, settings); |
| } |
| } |
| } |
| }; |
| |
| void expect(string actual, string expect) { |
| if (expect != actual) |
| throw Exception("Expecting "+expect+" but received "+actual); |
| |
| } |
| |
| double secs(Duration d) { return double(d)/TIME_SEC; } |
| double secs(AbsTime start, AbsTime finish) { |
| return secs(Duration(start,finish)); |
| } |
| |
| |
| // Collect rates & print stats. |
| class Stats { |
| vector<double> values; |
| double sum; |
| |
| public: |
| Stats() : sum(0) {} |
| |
| // Functor to collect rates. |
| void operator()(const string& data) { |
| try { |
| double d=lexical_cast<double>(data); |
| values.push_back(d); |
| sum += d; |
| } catch (const std::exception&) { |
| throw Exception("Bad report: "+data); |
| } |
| } |
| |
| double mean() const { |
| return sum/values.size(); |
| } |
| |
| double stdev() const { |
| if (values.size() <= 1) return 0; |
| double avg = mean(); |
| double ssq = 0; |
| for (vector<double>::const_iterator i = values.begin(); |
| i != values.end(); ++i) { |
| double x=*i; |
| x -= avg; |
| ssq += x*x; |
| } |
| return sqrt(ssq/(values.size()-1)); |
| } |
| |
| ostream& print(ostream& out) { |
| ostream_iterator<double> o(out, "\n"); |
| copy(values.begin(), values.end(), o); |
| out << "Average: " << mean(); |
| if (values.size() > 1) |
| out << " (std.dev. " << stdev() << ")"; |
| return out << endl; |
| } |
| }; |
| |
| |
| // Manage control queues, collect and print reports. |
| struct Controller : public Client { |
| |
| SubscriptionManager subs; |
| |
| Controller() : subs(session) {} |
| |
| /** Process messages from queue by applying a functor. */ |
| void process(size_t n, string queue, |
| boost::function<void (const string&)> msgFn) |
| { |
| if (!opts.summary) |
| cout << "Processing " << n << " messages from " |
| << queue << " " << flush; |
| LocalQueue lq; |
| subs.setFlowControl(n, SubscriptionManager::UNLIMITED, false); |
| subs.subscribe(lq, queue); |
| for (size_t i = 0; i < n; ++i) { |
| if (!opts.summary) cout << "." << flush; |
| msgFn(lq.pop().getData()); |
| } |
| if (!opts.summary) cout << " done." << endl; |
| } |
| |
| void process(size_t n, LocalQueue lq, string queue, |
| boost::function<void (const string&)> msgFn) |
| { |
| session.messageFlow(queue, 0, n); |
| if (!opts.summary) |
| cout << "Processing " << n << " messages from " |
| << queue << " " << flush; |
| for (size_t i = 0; i < n; ++i) { |
| if (!opts.summary) cout << "." << flush; |
| msgFn(lq.pop().getData()); |
| } |
| if (!opts.summary) cout << " done." << endl; |
| } |
| |
| void send(size_t n, string queue, string data) { |
| if (!opts.summary) |
| cout << "Sending " << data << " " << n << " times to " << queue |
| << endl; |
| Message msg(data, queue); |
| for (size_t i = 0; i < n; ++i) |
| session.messageTransfer(arg::content=msg, arg::acceptMode=1); |
| } |
| |
| void run() { // Controller |
| try { |
| // Wait for subscribers to be ready. |
| process(opts.totalSubs, fqn("sub_ready"), boost::bind(expect, _1, "ready")); |
| |
| LocalQueue pubDone; |
| LocalQueue subDone; |
| subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false); |
| subs.subscribe(pubDone, fqn("pub_done")); |
| subs.subscribe(subDone, fqn("sub_done")); |
| |
| double txrateTotal(0); |
| double mbytesTotal(0); |
| double pubRateTotal(0); |
| double subRateTotal(0); |
| |
| for (size_t j = 0; j < opts.iterations; ++j) { |
| AbsTime start=now(); |
| send(opts.totalPubs, fqn("pub_start"), "start"); // Start publishers |
| if (j) { |
| send(opts.totalSubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration |
| } |
| |
| Stats pubRates; |
| Stats subRates; |
| |
| process(opts.totalPubs, pubDone, fqn("pub_done"), boost::ref(pubRates)); |
| process(opts.totalSubs, subDone, fqn("sub_done"), boost::ref(subRates)); |
| |
| AbsTime end=now(); |
| double time=secs(start, end); |
| if (time <= 0.0) { |
| throw Exception("ERROR: Test completed in zero seconds. Try again with a larger message count."); |
| } |
| double txrate=opts.transfers/time; |
| double mbytes=(txrate*opts.size)/(1024*1024); |
| |
| if (!opts.summary) { |
| cout << endl << "Total " << opts.transfers << " transfers of " |
| << opts.size << " bytes in " |
| << time << " seconds." << endl; |
| cout << endl << "Publish transfers/sec: " << endl; |
| pubRates.print(cout); |
| cout << endl << "Subscribe transfers/sec: " << endl; |
| subRates.print(cout); |
| cout << endl |
| << "Total transfers/sec: " << txrate << endl |
| << "Total Mbytes/sec: " << mbytes << endl; |
| } |
| else { |
| cout << pubRates.mean() << "\t" |
| << subRates.mean() << "\t" |
| << txrate << "\t" |
| << mbytes << endl; |
| } |
| |
| txrateTotal += txrate; |
| mbytesTotal += mbytes; |
| pubRateTotal += pubRates.mean(); |
| subRateTotal += subRates.mean(); |
| } |
| if (opts.iterations > 1) { |
| cout << "Averages: "<< endl |
| << (pubRateTotal / opts.iterations) << "\t" |
| << (subRateTotal / opts.iterations) << "\t" |
| << (txrateTotal / opts.iterations) << "\t" |
| << (mbytesTotal / opts.iterations) << endl; |
| } |
| } |
| catch (const std::exception& e) { |
| cout << "Controller exception: " << e.what() << endl; |
| } |
| } |
| }; |
| |
| |
| struct PublishThread : public Client { |
| string destination; |
| string routingKey; |
| |
| PublishThread() {}; |
| |
| PublishThread(string key, string dest=string()) { |
| destination=dest; |
| routingKey=key; |
| } |
| |
| void run() { // Publisher |
| try { |
| string data; |
| size_t offset(0); |
| if (opts.uniqueData) { |
| offset = 5; |
| data += "data:";//marker (requested for latency testing tool scripts) |
| data += string(sizeof(size_t), 'X');//space for seq no |
| data += session.getId().str(); |
| if (opts.size > data.size()) { |
| data += string(opts.size - data.size(), 'X'); |
| } else if(opts.size < data.size()) { |
| cout << "WARNING: Increased --size to " << data.size() |
| << " to honour --unique-data" << endl; |
| } |
| } else { |
| size_t msgSize=max(opts.size, sizeof(size_t)); |
| data = string(msgSize, 'X'); |
| } |
| |
| Message msg(data, routingKey); |
| if (opts.durable) |
| msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); |
| if (opts.headers) { |
| for (size_t i = 0; i < opts.headers; ++i) { |
| std::stringstream h; |
| h << "hdr" << i; |
| msg.getMessageProperties().getApplicationHeaders().setString(h.str(), h.str()); |
| } |
| } |
| |
| if (opts.txPub){ |
| session.txSelect(); |
| } |
| SubscriptionManager subs(session); |
| LocalQueue lq; |
| subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false); |
| Subscription cs = subs.subscribe(lq, fqn("pub_start")); |
| |
| for (size_t j = 0; j < opts.iterations; ++j) { |
| cs.grantMessageCredit(1); |
| expect(lq.pop().getData(), "start"); |
| AbsTime start=now(); |
| for (size_t i=0; i<opts.count; i++) { |
| // Stamp the iteration into the message data, avoid |
| // any heap allocation. |
| const_cast<std::string&>(msg.getData()).replace(offset, sizeof(size_t), |
| reinterpret_cast<const char*>(&i), sizeof(size_t)); |
| if (opts.syncPub) { |
| sync(session).messageTransfer( |
| arg::destination=destination, |
| arg::content=msg, |
| arg::acceptMode=1); |
| } else { |
| session.messageTransfer( |
| arg::destination=destination, |
| arg::content=msg, |
| arg::acceptMode=1); |
| } |
| if (opts.txPub && ((i+1) % opts.txPub == 0)){ |
| if (opts.commitAsync){ |
| session.txCommit(); |
| } else { |
| sync(session).txCommit(); |
| } |
| } |
| if (opts.intervalPub) |
| qpid::sys::usleep(opts.intervalPub*1000); |
| } |
| if (opts.confirm) session.sync(); |
| AbsTime end=now(); |
| double time=secs(start,end); |
| if (time <= 0.0) { |
| throw Exception("ERROR: Test completed in zero seconds. Try again with a larger message count."); |
| } |
| |
| // Send result to controller. |
| Message report(lexical_cast<string>(opts.count/time), fqn("pub_done")); |
| session.messageTransfer(arg::content=report, arg::acceptMode=1); |
| if (opts.txPub){ |
| sync(session).txCommit(); |
| } |
| } |
| session.close(); |
| } |
| catch (const std::exception& e) { |
| cout << "PublishThread exception: " << e.what() << endl; |
| } |
| } |
| }; |
| |
| struct SubscribeThread : public Client { |
| |
| string queue; |
| |
| SubscribeThread() {} |
| |
| SubscribeThread(string q) { queue = q; } |
| |
| SubscribeThread(string key, string ex) { |
| queue=session.getId().str(); // Unique name. |
| session.queueDeclare(arg::queue=queue, |
| arg::exclusive=true, |
| arg::autoDelete=true, |
| arg::durable=opts.durable); |
| session.exchangeBind(arg::queue=queue, |
| arg::exchange=ex, |
| arg::bindingKey=key); |
| } |
| |
| void verify(bool cond, const char* test, uint32_t expect, uint32_t actual) { |
| if (!cond) { |
| Message error( |
| QPID_MSG("Sequence error: expected n" << test << expect << " but got " << actual), |
| "sub_done"); |
| session.messageTransfer(arg::content=error, arg::acceptMode=1); |
| throw Exception(error.getData()); |
| } |
| } |
| |
| void run() { // Subscribe |
| try { |
| if (opts.txSub) sync(session).txSelect(); |
| SubscriptionManager subs(session); |
| SubscriptionSettings settings; |
| settings.autoAck = opts.txSub ? opts.txSub : opts.ack; |
| settings.acceptMode = (opts.txSub || opts.ack ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE); |
| settings.flowControl = FlowControl::messageCredit(opts.subQuota); |
| LocalQueue lq; |
| Subscription subscription = subs.subscribe(lq, queue, settings); |
| // Notify controller we are ready. |
| session.messageTransfer(arg::content=Message("ready", fqn("sub_ready")), arg::acceptMode=1); |
| if (opts.txSub) { |
| if (opts.commitAsync) session.txCommit(); |
| else sync(session).txCommit(); |
| } |
| |
| LocalQueue iterationControl; |
| if (opts.iterations > 1) { |
| subs.subscribe(iterationControl, fqn("sub_iteration"), SubscriptionSettings(FlowControl::messageCredit(0))); |
| } |
| |
| for (size_t j = 0; j < opts.iterations; ++j) { |
| if (j > 0) { |
| //need to wait here until all subs are done |
| session.messageFlow(fqn("sub_iteration"), 0, 1); |
| iterationControl.pop(); |
| |
| //need to allocate some more credit for subscription |
| session.messageFlow(queue, 0, opts.subQuota); |
| } |
| Message msg; |
| AbsTime start=now(); |
| size_t expect=0; |
| for (size_t i = 0; i < opts.subQuota; ++i) { |
| msg=lq.pop(); |
| if (opts.txSub && ((i+1) % opts.txSub == 0)) { |
| if (opts.commitAsync) session.txCommit(); |
| else sync(session).txCommit(); |
| } |
| if (opts.intervalSub) |
| qpid::sys::usleep(opts.intervalSub*1000); |
| // TODO aconway 2007-11-23: check message order for. |
| // multiple publishers. Need an array of counters, |
| // one per publisher and a publisher ID in the |
| // message. Careful not to introduce a lot of overhead |
| // here, e.g. no std::map, std::string etc. |
| // |
| // For now verify order only for a single publisher. |
| size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ : 0; |
| size_t n; |
| memcpy (&n, reinterpret_cast<const char*>(msg.getData().data() + offset), |
| sizeof(n)); |
| if (opts.pubs == 1) { |
| if (opts.subs == 1 || opts.mode == FANOUT) verify(n==expect, "==", expect, n); |
| else verify(n>=expect, ">=", expect, n); |
| expect = n+1; |
| } |
| } |
| if (opts.txSub || opts.ack) |
| subscription.accept(subscription.getUnaccepted()); |
| if (opts.txSub) { |
| if (opts.commitAsync) session.txCommit(); |
| else sync(session).txCommit(); |
| } |
| AbsTime end=now(); |
| |
| // Report to publisher. |
| Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), |
| fqn("sub_done")); |
| session.messageTransfer(arg::content=result, arg::acceptMode=1); |
| if (opts.txSub) sync(session).txCommit(); |
| } |
| session.close(); |
| } |
| catch (const std::exception& e) { |
| cout << "SubscribeThread exception: " << e.what() << endl; |
| } |
| } |
| }; |
| |
| } |
| |
| template po::value_semantic* create_value(tests::Mode& val, const std::string& arg); |
| |
| } // namespace qpid::tests |
| |
| using namespace qpid::tests; |
| |
| int main(int argc, char** argv) { |
| int exitCode = 0; |
| boost::ptr_vector<Client> subs(opts.subs); |
| boost::ptr_vector<Client> pubs(opts.pubs); |
| |
| try { |
| opts.parse(argc, argv); |
| |
| string exchange; |
| switch (opts.mode) { |
| case FANOUT: exchange="amq.fanout"; break; |
| case TOPIC: exchange="amq.topic"; break; |
| case SHARED: break; |
| } |
| |
| bool singleProcess= |
| (!opts.setup && !opts.control && !opts.publish && !opts.subscribe); |
| if (singleProcess) |
| opts.setup = opts.control = opts.publish = opts.subscribe = true; |
| |
| if (opts.setup) Setup().run(); // Set up queues |
| |
| // Start pubs/subs for each queue/topic. |
| for (size_t i = 0; i < opts.qt; ++i) { |
| ostringstream key; |
| key << opts.baseName << i; // Queue or topic name. |
| if (opts.publish) { |
| size_t n = singleProcess ? opts.pubs : 1; |
| for (size_t j = 0; j < n; ++j) { |
| pubs.push_back(new PublishThread(key.str(), exchange)); |
| pubs.back().thread=Thread(pubs.back()); |
| } |
| } |
| if (opts.subscribe) { |
| size_t n = singleProcess ? opts.subs : 1; |
| for (size_t j = 0; j < n; ++j) { |
| if (opts.mode==SHARED) |
| subs.push_back(new SubscribeThread(key.str())); |
| else |
| subs.push_back(new SubscribeThread(key.str(),exchange)); |
| subs.back().thread=Thread(subs.back()); |
| } |
| } |
| } |
| |
| if (opts.control) Controller().run(); |
| } |
| catch (const std::exception& e) { |
| cout << endl << e.what() << endl; |
| exitCode = 1; |
| } |
| |
| // Wait for started threads. |
| if (opts.publish) { |
| for (boost::ptr_vector<Client>::iterator i=pubs.begin(); |
| i != pubs.end(); |
| ++i) |
| i->thread.join(); |
| } |
| |
| if (opts.subscribe) { |
| for (boost::ptr_vector<Client>::iterator i=subs.begin(); |
| i != subs.end(); |
| ++i) |
| i->thread.join(); |
| } |
| return exitCode; |
| } |