| #ifndef TESTS_MESSAGINGFIXTURE_H |
| #define TESTS_MESSAGINGFIXTURE_H |
| |
| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "BrokerFixture.h" |
| #include "unit_test.h" |
| #include "test_tools.h" |
| #include "qpid/client/Connection.h" |
| #include "qpid/client/Session.h" |
| #include "qpid/framing/Uuid.h" |
| #include "qpid/messaging/Address.h" |
| #include "qpid/messaging/Connection.h" |
| #include "qpid/messaging/Session.h" |
| #include "qpid/messaging/Sender.h" |
| #include "qpid/messaging/Receiver.h" |
| #include "qpid/messaging/Message.h" |
| #include "qpid/types/Variant.h" |
| |
| namespace qpid { |
| namespace tests { |
| |
| using qpid::types::Variant; |
| |
| struct BrokerAdmin |
| { |
| qpid::client::Connection connection; |
| qpid::client::Session session; |
| |
| BrokerAdmin(uint16_t port) |
| { |
| connection.open("localhost", port); |
| session = connection.newSession(); |
| } |
| |
| void createQueue(const std::string& name) |
| { |
| session.queueDeclare(qpid::client::arg::queue=name); |
| } |
| |
| void deleteQueue(const std::string& name) |
| { |
| session.queueDelete(qpid::client::arg::queue=name); |
| } |
| |
| void createExchange(const std::string& name, const std::string& type) |
| { |
| session.exchangeDeclare(qpid::client::arg::exchange=name, qpid::client::arg::type=type); |
| } |
| |
| void deleteExchange(const std::string& name) |
| { |
| session.exchangeDelete(qpid::client::arg::exchange=name); |
| } |
| |
| bool checkQueueExists(const std::string& name) |
| { |
| return session.queueQuery(name).getQueue() == name; |
| } |
| |
| bool checkExchangeExists(const std::string& name, std::string& type) |
| { |
| qpid::framing::ExchangeQueryResult result = session.exchangeQuery(name); |
| type = result.getType(); |
| return !result.getNotFound(); |
| } |
| |
| void send(qpid::client::Message& message, const std::string& exchange=std::string()) |
| { |
| session.messageTransfer(qpid::client::arg::destination=exchange, qpid::client::arg::content=message); |
| } |
| |
| ~BrokerAdmin() |
| { |
| session.close(); |
| connection.close(); |
| } |
| }; |
| |
| struct MessagingFixture : public BrokerFixture |
| { |
| messaging::Connection connection; |
| messaging::Session session; |
| BrokerAdmin admin; |
| |
| MessagingFixture(Broker::Options opts = Broker::Options(), bool mgmtEnabled=false) : |
| BrokerFixture(opts, mgmtEnabled), |
| connection(open(broker->getPort(Broker::TCP_TRANSPORT))), |
| session(connection.createSession()), |
| admin(broker->getPort(Broker::TCP_TRANSPORT)) |
| { |
| } |
| |
| static messaging::Connection open(uint16_t port) |
| { |
| messaging::Connection connection( |
| (boost::format("amqp:tcp:localhost:%1%") % (port)).str()); |
| connection.open(); |
| return connection; |
| } |
| |
| /** Open a connection to the broker. */ |
| qpid::messaging::Connection newConnection() |
| { |
| qpid::messaging::Connection connection( |
| (boost::format("amqp:tcp:localhost:%1%") % (broker->getPort(qpid::broker::Broker::TCP_TRANSPORT))).str()); |
| return connection; |
| } |
| |
| void ping(const qpid::messaging::Address& address) |
| { |
| messaging::Receiver r = session.createReceiver(address); |
| messaging::Sender s = session.createSender(address); |
| messaging::Message out(framing::Uuid(true).str()); |
| s.send(out); |
| messaging::Message in; |
| BOOST_CHECK(r.fetch(in, 5*messaging::Duration::SECOND)); |
| BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); |
| r.close(); |
| s.close(); |
| } |
| |
| ~MessagingFixture() |
| { |
| session.close(); |
| connection.close(); |
| } |
| }; |
| |
| struct QueueFixture : MessagingFixture |
| { |
| std::string queue; |
| |
| QueueFixture(const std::string& name = "test-queue") : queue(name) |
| { |
| admin.createQueue(queue); |
| } |
| |
| ~QueueFixture() |
| { |
| admin.deleteQueue(queue); |
| } |
| |
| }; |
| |
| struct TopicFixture : MessagingFixture |
| { |
| std::string topic; |
| |
| TopicFixture(const std::string& name = "test-topic", const std::string& type="fanout") : topic(name) |
| { |
| admin.createExchange(topic, type); |
| } |
| |
| ~TopicFixture() |
| { |
| admin.deleteExchange(topic); |
| } |
| |
| }; |
| |
| struct MultiQueueFixture : MessagingFixture |
| { |
| typedef std::vector<std::string>::const_iterator const_iterator; |
| std::vector<std::string> queues; |
| |
| MultiQueueFixture(const std::vector<std::string>& names = boost::assign::list_of<std::string>("q1")("q2")("q3")) : queues(names) |
| { |
| for (const_iterator i = queues.begin(); i != queues.end(); ++i) { |
| admin.createQueue(*i); |
| } |
| } |
| |
| ~MultiQueueFixture() |
| { |
| connection.close(); |
| for (const_iterator i = queues.begin(); i != queues.end(); ++i) { |
| admin.deleteQueue(*i); |
| } |
| } |
| |
| }; |
| |
| inline std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messaging::Duration timeout=messaging::Duration::SECOND*5) |
| { |
| std::vector<std::string> data; |
| messaging::Message message; |
| for (int i = 0; i < count && receiver.fetch(message, timeout); i++) { |
| data.push_back(message.getContent()); |
| } |
| return data; |
| } |
| |
| |
| inline void send(messaging::Sender& sender, uint count = 1, uint start = 1, |
| const std::string& base = "Message") |
| { |
| for (uint i = start; i < start + count; ++i) { |
| sender.send(messaging::Message((boost::format("%1%_%2%") % base % i).str())); |
| } |
| } |
| |
| inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = 1, |
| const std::string& base = "Message", |
| messaging::Duration timeout=messaging::Duration::SECOND*5) |
| { |
| for (uint i = start; i < start + count; ++i) { |
| BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str()); |
| } |
| } |
| |
| |
| class MethodInvoker |
| { |
| public: |
| MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"), |
| sender(session.createSender("qmf.default.direct/broker")), |
| receiver(session.createReceiver(replyTo)) {} |
| |
| void createExchange(const std::string& name, const std::string& type, bool durable=false) |
| { |
| Variant::Map params; |
| params["name"]=name; |
| params["type"]="exchange"; |
| params["properties"] = Variant::Map(); |
| params["properties"].asMap()["exchange-type"] = type; |
| params["properties"].asMap()["durable"] = durable; |
| methodRequest("create", params); |
| } |
| |
| void deleteExchange(const std::string& name) |
| { |
| Variant::Map params; |
| params["name"]=name; |
| params["type"]="exchange"; |
| methodRequest("delete", params); |
| } |
| |
| void createQueue(const std::string& name, bool durable=false, bool autodelete=false, |
| const Variant::Map& options=Variant::Map()) |
| { |
| Variant::Map params; |
| params["name"]=name; |
| params["type"]="queue"; |
| params["properties"] = options; |
| params["properties"].asMap()["durable"] = durable; |
| params["properties"].asMap()["auto-delete"] = autodelete; |
| methodRequest("create", params); |
| } |
| |
| void deleteQueue(const std::string& name) |
| { |
| Variant::Map params; |
| params["name"]=name; |
| params["type"]="queue"; |
| methodRequest("delete", params); |
| } |
| |
| void bind(const std::string& exchange, const std::string& queue, const std::string& key, |
| const Variant::Map& options=Variant::Map()) |
| { |
| Variant::Map params; |
| params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str(); |
| params["type"]="binding"; |
| params["properties"] = options; |
| methodRequest("create", params); |
| } |
| |
| void unbind(const std::string& exchange, const std::string& queue, const std::string& key) |
| { |
| Variant::Map params; |
| params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str(); |
| params["type"]="binding"; |
| methodRequest("delete", params); |
| } |
| |
| void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0) |
| { |
| Variant::Map content; |
| Variant::Map objectId; |
| objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker"; |
| content["_object_id"] = objectId; |
| content["_method_name"] = method; |
| content["_arguments"] = inParams; |
| |
| messaging::Message request; |
| request.setReplyTo(replyTo); |
| request.getProperties()["x-amqp-0-10.app-id"] = "qmf2"; |
| request.getProperties()["qmf.opcode"] = "_method_request"; |
| encode(content, request); |
| |
| sender.send(request); |
| |
| messaging::Message response; |
| if (receiver.fetch(response, messaging::Duration::SECOND*5)) { |
| if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") { |
| std::string opcode = response.getProperties()["qmf.opcode"]; |
| if (opcode == "_method_response") { |
| if (outParams) { |
| Variant::Map m; |
| decode(response, m); |
| *outParams = m["_arguments"].asMap(); |
| } |
| } else if (opcode == "_exception") { |
| Variant::Map m; |
| decode(response, m); |
| throw Exception(QPID_MSG("Error: " << m["_values"])); |
| } else { |
| throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode)); |
| } |
| } else { |
| throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id=" |
| << response.getProperties()["x-amqp-0-10.app-id"])); |
| } |
| } else { |
| throw Exception(QPID_MSG("No response received")); |
| } |
| } |
| private: |
| messaging::Address replyTo; |
| messaging::Sender sender; |
| messaging::Receiver receiver; |
| }; |
| |
| }} // namespace qpid::tests |
| |
| #endif /*!TESTS_MESSAGINGFIXTURE_H*/ |