| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include <algorithm> |
| #include <iomanip> |
| #include <iostream> |
| #include <memory> |
| #include <sstream> |
| #include <vector> |
| |
| #include "TestOptions.h" |
| #include "qpid/client/Connection.h" |
| #include "qpid/client/Message.h" |
| #include "qpid/client/AsyncSession.h" |
| #include "qpid/client/SubscriptionManager.h" |
| #include "qpid/framing/Array.h" |
| #include "qpid/framing/Buffer.h" |
| #include "qpid/framing/FieldValue.h" |
| #include "qpid/framing/Uuid.h" |
| #include "qpid/sys/Thread.h" |
| |
| using namespace qpid; |
| using namespace qpid::client; |
| using namespace qpid::sys; |
| using std::string; |
| |
| namespace qpid { |
| namespace tests { |
| |
| typedef std::vector<std::string> StringSet; |
| |
| struct Args : public qpid::TestOptions { |
| bool init, transfer, check;//actions |
| uint size; |
| bool durable; |
| uint queues; |
| string base; |
| uint msgsPerTx; |
| uint txCount; |
| uint totalMsgCount; |
| bool dtx; |
| bool quiet; |
| |
| Args() : init(true), transfer(true), check(true), |
| size(256), durable(true), queues(2), |
| base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10), |
| dtx(false), quiet(false) |
| { |
| addOptions() |
| |
| ("init", optValue(init, "yes|no"), "Declare queues and populate one with the initial set of messages.") |
| ("transfer", optValue(transfer, "yes|no"), "'Move' messages from one queue to another using transactions to ensure no message loss.") |
| ("check", optValue(check, "yes|no"), "Check that the initial messages are all still available.") |
| ("size", optValue(size, "N"), "message size") |
| ("durable", optValue(durable, "yes|no"), "use durable messages") |
| ("queues", optValue(queues, "N"), "number of queues") |
| ("queue-base-name", optValue(base, "<name>"), "base name for queues") |
| ("messages-per-tx", optValue(msgsPerTx, "N"), "number of messages transferred per transaction") |
| ("tx-count", optValue(txCount, "N"), "number of transactions per 'agent'") |
| ("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'") |
| ("dtx", optValue(dtx, "yes|no"), "use distributed transactions") |
| ("quiet", optValue(quiet), "reduce output from test"); |
| } |
| }; |
| |
| const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); |
| |
| std::string generateData(uint size) |
| { |
| if (size < chars.length()) { |
| return chars.substr(0, size); |
| } |
| std::string data; |
| for (uint i = 0; i < (size / chars.length()); i++) { |
| data += chars; |
| } |
| data += chars.substr(0, size % chars.length()); |
| return data; |
| } |
| |
| void generateSet(const std::string& base, uint count, StringSet& collection) |
| { |
| for (uint i = 0; i < count; i++) { |
| std::ostringstream out; |
| out << base << "-" << (i+1); |
| collection.push_back(out.str()); |
| } |
| } |
| |
| Args opts; |
| |
| struct Client |
| { |
| Connection connection; |
| AsyncSession session; |
| |
| Client() |
| { |
| opts.open(connection); |
| session = connection.newSession(); |
| } |
| |
| ~Client() |
| { |
| try{ |
| session.close(); |
| connection.close(); |
| } catch(const std::exception& e) { |
| std::cout << e.what() << std::endl; |
| } |
| } |
| }; |
| |
| struct Transfer : public Client, public Runnable |
| { |
| std::string src; |
| std::string dest; |
| Thread thread; |
| framing::Xid xid; |
| |
| Transfer(const std::string& to, const std::string& from) : src(to), dest(from), xid(0x4c414e47, "", from) {} |
| |
| void run() |
| { |
| try { |
| |
| if (opts.dtx) session.dtxSelect(); |
| else session.txSelect(); |
| SubscriptionManager subs(session); |
| |
| LocalQueue lq; |
| SubscriptionSettings settings(FlowControl::messageWindow(opts.msgsPerTx)); |
| settings.autoAck = 0; // Disabled |
| Subscription sub = subs.subscribe(lq, src, settings); |
| |
| for (uint t = 0; t < opts.txCount; t++) { |
| Message in; |
| Message out("", dest); |
| if (opts.dtx) { |
| setNewXid(xid); |
| session.dtxStart(arg::xid=xid); |
| } |
| for (uint m = 0; m < opts.msgsPerTx; m++) { |
| in = lq.pop(); |
| std::string& data = in.getData(); |
| if (data.size() != opts.size) { |
| std::ostringstream oss; |
| oss << "Message size incorrect: size=" << in.getData().size() << "; expected " << opts.size; |
| throw std::runtime_error(oss.str()); |
| } |
| out.setData(data); |
| out.getMessageProperties().setCorrelationId(in.getMessageProperties().getCorrelationId()); |
| out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode()); |
| session.messageTransfer(arg::content=out, arg::acceptMode=1); |
| } |
| sub.accept(sub.getUnaccepted()); |
| if (opts.dtx) { |
| session.dtxEnd(arg::xid=xid); |
| session.dtxPrepare(arg::xid=xid); |
| session.dtxCommit(arg::xid=xid); |
| } else { |
| session.txCommit(); |
| } |
| session.sync(); |
| } |
| } catch(const std::exception& e) { |
| std::cout << "Transfer interrupted: " << e.what() << std::endl; |
| } |
| } |
| |
| void setNewXid(framing::Xid& xid) { |
| framing::Uuid uuid(true); |
| xid.setGlobalId(uuid.str()); |
| } |
| }; |
| |
| struct Controller : public Client |
| { |
| StringSet ids; |
| StringSet queues; |
| |
| Controller() |
| { |
| generateSet(opts.base, opts.queues, queues); |
| generateSet("msg", opts.totalMsgCount, ids); |
| } |
| |
| void init() |
| { |
| //declare queues |
| for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { |
| session.queueDeclare(arg::queue=*i, arg::durable=opts.durable); |
| session.sync(); |
| } |
| |
| Message msg(generateData(opts.size), *queues.begin()); |
| if (opts.durable) { |
| msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); |
| } |
| |
| //publish messages |
| for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) { |
| msg.getMessageProperties().setCorrelationId(*i); |
| session.messageTransfer(arg::content=msg, arg::acceptMode=1); |
| } |
| } |
| |
| void transfer() |
| { |
| boost::ptr_vector<Transfer> agents(opts.queues); |
| //launch transfer agents |
| for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { |
| StringSet::iterator next = i + 1; |
| if (next == queues.end()) next = queues.begin(); |
| |
| if (!opts.quiet) std::cout << "Transfering from " << *i << " to " << *next << std::endl; |
| agents.push_back(new Transfer(*i, *next)); |
| agents.back().thread = Thread(agents.back()); |
| } |
| |
| for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) { |
| i->thread.join(); |
| } |
| } |
| |
| int check() |
| { |
| SubscriptionManager subs(session); |
| |
| // Recover DTX transactions (if any) |
| if (opts.dtx) { |
| framing::DtxRecoverResult dtxRes = session.dtxRecover().get(); |
| const framing::Array& xidArr = dtxRes.getInDoubt(); |
| std::vector<std::string> inDoubtXids(xidArr.size()); |
| std::transform(xidArr.begin(), xidArr.end(), inDoubtXids.begin(), framing::Array::get<std::string, framing::Array::ValuePtr>); |
| |
| if (inDoubtXids.size()) { |
| if (!opts.quiet) std::cout << "Recovering DTX in-doubt transaction(s):" << std::endl; |
| framing::StructHelper decoder; |
| framing::Xid xid; |
| // abort even, commit odd transactions |
| for (unsigned i = 0; i < inDoubtXids.size(); i++) { |
| decoder.decode(xid, inDoubtXids[i]); |
| if (!opts.quiet) std::cout << (i%2 ? " * aborting " : " * committing "); |
| xid.print(std::cout); |
| std::cout << std::endl; |
| if (i%2) { |
| session.dtxRollback(arg::xid=xid); |
| } else { |
| session.dtxCommit(arg::xid=xid); |
| } |
| } |
| } |
| } |
| |
| StringSet drained; |
| //drain each queue and verify the correct set of messages are available |
| for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { |
| //subscribe, allocate credit and flushn |
| LocalQueue lq; |
| SubscriptionSettings settings(FlowControl::unlimited(), ACCEPT_MODE_NONE); |
| subs.subscribe(lq, *i, settings); |
| session.messageFlush(arg::destination=*i); |
| session.sync(); |
| |
| uint count(0); |
| while (!lq.empty()) { |
| Message m = lq.pop(); |
| //add correlation ids of received messages to drained |
| drained.push_back(m.getMessageProperties().getCorrelationId()); |
| ++count; |
| } |
| if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl; |
| } |
| |
| sort(ids.begin(), ids.end()); |
| sort(drained.begin(), drained.end()); |
| |
| //check that drained == ids |
| StringSet missing; |
| set_difference(ids.begin(), ids.end(), drained.begin(), drained.end(), back_inserter(missing)); |
| |
| StringSet extra; |
| set_difference(drained.begin(), drained.end(), ids.begin(), ids.end(), back_inserter(extra)); |
| |
| if (missing.empty() && extra.empty()) { |
| std::cout << "All expected messages were retrieved." << std::endl; |
| return 0; |
| } else { |
| if (!missing.empty()) { |
| std::cout << "The following ids were missing:" << std::endl; |
| for (StringSet::iterator i = missing.begin(); i != missing.end(); i++) { |
| std::cout << " '" << *i << "'" << std::endl; |
| } |
| } |
| if (!extra.empty()) { |
| std::cout << "The following extra ids were encountered:" << std::endl; |
| for (StringSet::iterator i = extra.begin(); i != extra.end(); i++) { |
| std::cout << " '" << *i << "'" << std::endl; |
| } |
| } |
| return 1; |
| } |
| } |
| }; |
| |
| }} // namespace qpid::tests |
| |
| using namespace qpid::tests; |
| |
| int main(int argc, char** argv) |
| { |
| try { |
| opts.parse(argc, argv); |
| Controller controller; |
| if (opts.init) controller.init(); |
| if (opts.transfer) controller.transfer(); |
| if (opts.check) return controller.check(); |
| return 0; |
| } catch(const std::exception& e) { |
| std::cout << e.what() << std::endl; |
| } |
| return 2; |
| } |