| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| /** |
| * This class provides an example of using AMQP for a request-response |
| * style system. 'Requests' are messages sent to a well known |
| * destination. A 'service' process consumes these message and |
| * responds by echoing the message back to the sender on a |
| * sender-specified private queue. |
| */ |
| |
| #include <QpidError.h> |
| #include <ClientChannel.h> |
| #include <Connection.h> |
| #include <ClientExchange.h> |
| #include <MessageListener.h> |
| #include <ClientQueue.h> |
| #include <sys/Time.h> |
| #include <iostream> |
| #include <sstream> |
| |
| using namespace qpid::client; |
| using namespace qpid::sys; |
| using std::string; |
| |
| |
| /** |
| * A message listener implementation representing the 'service', this |
| * will 'echo' any requests received. |
| */ |
| class EchoServer : public MessageListener{ |
| Channel* const channel; |
| public: |
| EchoServer(Channel* channel); |
| virtual void received(Message& msg); |
| }; |
| |
| /** |
| * A message listener implementation that merely prints received |
| * messages to the console. Used to report on 'echo' responses. |
| */ |
| class LoggingListener : public MessageListener{ |
| public: |
| virtual void received(Message& msg); |
| }; |
| |
| /** |
| * A utility class that manages the command line options needed to run |
| * the example confirgurably. |
| */ |
| class Args{ |
| string host; |
| int port; |
| bool trace; |
| bool help; |
| bool client; |
| public: |
| inline Args() : host("localhost"), port(5672), trace(false), help(false), client(false){} |
| void parse(int argc, char** argv); |
| void usage(); |
| |
| inline const string& getHost() const { return host;} |
| inline int getPort() const { return port; } |
| inline bool getTrace() const { return trace; } |
| inline bool getHelp() const { return help; } |
| inline bool getClient() const { return client; } |
| }; |
| |
| /** |
| * The main test path. There are two basic modes: 'client' and |
| * 'service'. First one or more services are started, then one or more |
| * clients are started and messages can be sent. |
| */ |
| int main(int argc, char** argv){ |
| const std::string echo_service("echo_service"); |
| Args args; |
| args.parse(argc, argv); |
| if (args.getHelp()) { |
| args.usage(); |
| } else if (args.getClient()) { |
| //we have been started in 'client' mode, i.e. we will send an |
| //echo requests and print responses received. |
| try { |
| //Create connection & open a channel |
| Connection connection(args.getTrace()); |
| connection.open(args.getHost(), args.getPort()); |
| Channel channel; |
| connection.openChannel(&channel); |
| |
| //Setup: declare the private 'response' queue and bind it |
| //to the direct exchange by its name which will be |
| //generated by the server |
| Queue response; |
| channel.declareQueue(response); |
| qpid::framing::FieldTable emptyArgs; |
| channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, response, response.getName(), emptyArgs); |
| |
| //Consume from the response queue, logging all echoed message to console: |
| LoggingListener listener; |
| std::string tag; |
| channel.consume(response, tag, &listener); |
| |
| //Process incoming requests on a new thread |
| channel.start(); |
| |
| //get messages from console and send them: |
| std::string text; |
| std::cout << "Enter text to send:" << std::endl; |
| while (std::getline(std::cin, text)) { |
| std::cout << "Sending " << text << " to echo server." << std::endl; |
| Message msg; |
| msg.getHeaders().setString("RESPONSE_QUEUE", response.getName()); |
| msg.setData(text); |
| channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service); |
| |
| std::cout << "Enter text to send:" << std::endl; |
| } |
| |
| connection.close(); |
| } catch(qpid::QpidError error) { |
| std::cout << error.what() << std::endl; |
| } |
| } else { |
| // we are in 'service' mode, i.e. we will consume messages |
| // from the request queue and echo each request back to the |
| // senders own private response queue. |
| try { |
| //Create connection & open a channel |
| Connection connection(args.getTrace()); |
| connection.open(args.getHost(), args.getPort()); |
| Channel channel; |
| connection.openChannel(&channel); |
| |
| //Setup: declare the 'request' queue and bind it to the direct exchange with a 'well known' name |
| Queue request("request"); |
| channel.declareQueue(request); |
| qpid::framing::FieldTable emptyArgs; |
| channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, request, echo_service, emptyArgs); |
| |
| //Consume from the request queue, echoing back all messages received to the client that sent them |
| EchoServer server(&channel); |
| std::string tag = "server_tag"; |
| channel.consume(request, tag, &server); |
| |
| //Process incoming requests on the main thread |
| channel.run(); |
| |
| connection.close(); |
| } catch(qpid::QpidError error) { |
| std::cout << error.what() << std::endl; |
| } |
| } |
| } |
| |
| EchoServer::EchoServer(Channel* _channel) : channel(_channel){} |
| |
| void EchoServer::received(Message& message) |
| { |
| //get name of response queues binding to the default direct exchange: |
| const std::string name = message.getHeaders().getString("RESPONSE_QUEUE"); |
| |
| if (name.empty()) { |
| std::cout << "Cannot echo " << message.getData() << ", no response queue specified." << std::endl; |
| } else { |
| //print message to console: |
| std::cout << "Echoing " << message.getData() << " back to " << name << std::endl; |
| |
| //'echo' the message back: |
| channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name); |
| } |
| } |
| |
| void LoggingListener::received(Message& message) |
| { |
| //print message to console: |
| std::cout << "Received echo: " << message.getData() << std::endl; |
| } |
| |
| |
| void Args::parse(int argc, char** argv){ |
| for(int i = 1; i < argc; i++){ |
| string name(argv[i]); |
| if("-help" == name){ |
| help = true; |
| break; |
| }else if("-host" == name){ |
| host = argv[++i]; |
| }else if("-port" == name){ |
| port = atoi(argv[++i]); |
| }else if("-trace" == name){ |
| trace = true; |
| }else if("-client" == name){ |
| client = true; |
| }else{ |
| std::cout << "Warning: unrecognised option " << name << std::endl; |
| } |
| } |
| } |
| |
| void Args::usage(){ |
| std::cout << "Options:" << std::endl; |
| std::cout << " -help" << std::endl; |
| std::cout << " Prints this usage message" << std::endl; |
| std::cout << " -host <host>" << std::endl; |
| std::cout << " Specifies host to connect to (default is localhost)" << std::endl; |
| std::cout << " -port <port>" << std::endl; |
| std::cout << " Specifies port to conect to (default is 5762)" << std::endl; |
| std::cout << " -trace" << std::endl; |
| std::cout << " Indicates that the frames sent and received should be logged" << std::endl; |
| std::cout << " -client" << std::endl; |
| std::cout << " Run as a client (else will run as a server)" << std::endl; |
| } |