blob: 666c8cfa46589cc6a296c7855640f3ae79ff96f5 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/**
* This class provides an example of using AMQP for a request-response
* style system. 'Requests' are messages sent to a well known
* destination. A 'service' process consumes these message and
* responds by echoing the message back to the sender on a
* sender-specified private queue.
*/
#include <QpidError.h>
#include <ClientChannel.h>
#include <Connection.h>
#include <ClientExchange.h>
#include <MessageListener.h>
#include <ClientQueue.h>
#include <sys/Time.h>
#include <iostream>
#include <sstream>
using namespace qpid::client;
using namespace qpid::sys;
using std::string;
/**
* A message listener implementation representing the 'service', this
* will 'echo' any requests received.
*/
class EchoServer : public MessageListener{
Channel* const channel;
public:
EchoServer(Channel* channel);
virtual void received(Message& msg);
};
/**
* A message listener implementation that merely prints received
* messages to the console. Used to report on 'echo' responses.
*/
class LoggingListener : public MessageListener{
public:
virtual void received(Message& msg);
};
/**
* A utility class that manages the command line options needed to run
* the example confirgurably.
*/
class Args{
string host;
int port;
bool trace;
bool help;
bool client;
public:
inline Args() : host("localhost"), port(5672), trace(false), help(false), client(false){}
void parse(int argc, char** argv);
void usage();
inline const string& getHost() const { return host;}
inline int getPort() const { return port; }
inline bool getTrace() const { return trace; }
inline bool getHelp() const { return help; }
inline bool getClient() const { return client; }
};
/**
* The main test path. There are two basic modes: 'client' and
* 'service'. First one or more services are started, then one or more
* clients are started and messages can be sent.
*/
int main(int argc, char** argv){
const std::string echo_service("echo_service");
Args args;
args.parse(argc, argv);
if (args.getHelp()) {
args.usage();
} else if (args.getClient()) {
//we have been started in 'client' mode, i.e. we will send an
//echo requests and print responses received.
try {
//Create connection & open a channel
Connection connection(args.getTrace());
connection.open(args.getHost(), args.getPort());
Channel channel;
connection.openChannel(&channel);
//Setup: declare the private 'response' queue and bind it
//to the direct exchange by its name which will be
//generated by the server
Queue response;
channel.declareQueue(response);
qpid::framing::FieldTable emptyArgs;
channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, response, response.getName(), emptyArgs);
//Consume from the response queue, logging all echoed message to console:
LoggingListener listener;
std::string tag;
channel.consume(response, tag, &listener);
//Process incoming requests on a new thread
channel.start();
//get messages from console and send them:
std::string text;
std::cout << "Enter text to send:" << std::endl;
while (std::getline(std::cin, text)) {
std::cout << "Sending " << text << " to echo server." << std::endl;
Message msg;
msg.getHeaders().setString("RESPONSE_QUEUE", response.getName());
msg.setData(text);
channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service);
std::cout << "Enter text to send:" << std::endl;
}
connection.close();
} catch(qpid::QpidError error) {
std::cout << error.what() << std::endl;
}
} else {
// we are in 'service' mode, i.e. we will consume messages
// from the request queue and echo each request back to the
// senders own private response queue.
try {
//Create connection & open a channel
Connection connection(args.getTrace());
connection.open(args.getHost(), args.getPort());
Channel channel;
connection.openChannel(&channel);
//Setup: declare the 'request' queue and bind it to the direct exchange with a 'well known' name
Queue request("request");
channel.declareQueue(request);
qpid::framing::FieldTable emptyArgs;
channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, request, echo_service, emptyArgs);
//Consume from the request queue, echoing back all messages received to the client that sent them
EchoServer server(&channel);
std::string tag = "server_tag";
channel.consume(request, tag, &server);
//Process incoming requests on the main thread
channel.run();
connection.close();
} catch(qpid::QpidError error) {
std::cout << error.what() << std::endl;
}
}
}
EchoServer::EchoServer(Channel* _channel) : channel(_channel){}
void EchoServer::received(Message& message)
{
//get name of response queues binding to the default direct exchange:
const std::string name = message.getHeaders().getString("RESPONSE_QUEUE");
if (name.empty()) {
std::cout << "Cannot echo " << message.getData() << ", no response queue specified." << std::endl;
} else {
//print message to console:
std::cout << "Echoing " << message.getData() << " back to " << name << std::endl;
//'echo' the message back:
channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name);
}
}
void LoggingListener::received(Message& message)
{
//print message to console:
std::cout << "Received echo: " << message.getData() << std::endl;
}
void Args::parse(int argc, char** argv){
for(int i = 1; i < argc; i++){
string name(argv[i]);
if("-help" == name){
help = true;
break;
}else if("-host" == name){
host = argv[++i];
}else if("-port" == name){
port = atoi(argv[++i]);
}else if("-trace" == name){
trace = true;
}else if("-client" == name){
client = true;
}else{
std::cout << "Warning: unrecognised option " << name << std::endl;
}
}
}
void Args::usage(){
std::cout << "Options:" << std::endl;
std::cout << " -help" << std::endl;
std::cout << " Prints this usage message" << std::endl;
std::cout << " -host <host>" << std::endl;
std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
std::cout << " -port <port>" << std::endl;
std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
std::cout << " -trace" << std::endl;
std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
std::cout << " -client" << std::endl;
std::cout << " Run as a client (else will run as a server)" << std::endl;
}