blob: 28bb4cab31638cd03ed849bfbbdd363918038044 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Message_io.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
#include <sstream>
#include <vector>
#include <ctime>
#include "OptionParser.h"
using namespace qpid::messaging;
using namespace qpid::types;
typedef std::vector<std::string> string_vector;
struct Options : OptionParser
{
std::string url;
std::string address;
int timeout;
bool durable;
int count;
std::string id;
std::string replyto;
string_vector properties;
string_vector entries;
std::string content;
std::string connectionOptions;
bool print;
Options()
: OptionParser("Usage: spout [OPTIONS] ADDRESS", "Send messages to the specified address"),
url("127.0.0.1"),
timeout(0),
durable(false),
count(1),
print(false)
{
add("broker,b", url, "url of broker to connect to");
add("timeout,t", timeout, "exit after the specified time");
add("durable,d", durable, "make the message durable (def. transient)");
add("count,c", count, "stop after count messages have been sent, zero disables");
add("id,i", id, "use the supplied id instead of generating one");
add("reply-to", replyto, "specify reply-to address");
add("property,P", properties, "specify message property");
add("map,M", entries, "specify entry for map content");
add("content", content, "specify textual content");
add("connection-options", connectionOptions, "connection options string in the form {name1:value1, name2:value2}");
add("print", print, "print each message sent");
}
static bool nameval(const std::string& in, std::string& name, std::string& value)
{
std::string::size_type i = in.find("=");
if (i == std::string::npos) {
name = in;
return false;
} else {
name = in.substr(0, i);
if (i+1 < in.size()) {
value = in.substr(i+1);
return true;
} else {
return false;
}
}
}
static void setProperty(Message& message, const std::string& property)
{
std::string name;
std::string value;
if (nameval(property, name, value)) {
message.getProperties()[name] = value;
message.getProperties()[name].setEncoding("utf8");
} else {
message.getProperties()[name] = Variant();
}
}
void setProperties(Message& message) const
{
for (string_vector::const_iterator i = properties.begin(); i != properties.end(); ++i) {
setProperty(message, *i);
}
}
void setEntries(Variant::Map& content) const
{
for (string_vector::const_iterator i = entries.begin(); i != entries.end(); ++i) {
std::string name;
std::string value;
if (nameval(*i, name, value)) {
content[name] = value;
} else {
content[name] = Variant();
}
}
}
bool checkAddress()
{
if (getArguments().empty()) {
error("Address is required");
return false;
} else {
address = getArguments()[0];
return true;
}
}
bool isDurable() const
{
return durable;
}
};
int main(int argc, char** argv)
{
Options options;
if (options.parse(argc, argv) && options.checkAddress()) {
Connection connection(options.url, options.connectionOptions);
try {
connection.open();
Session session = connection.createSession();
Sender sender = session.createSender(options.address);
Message message;
message.setDurable(options.isDurable());
options.setProperties(message);
Variant& obj = message.getContentObject();
if (options.entries.size()) {
Variant::Map content;
options.setEntries(content);
obj = content;
} else if (options.content.size()) {
obj = options.content;
obj.setEncoding("utf8");
}
std::time_t start = std::time(0);
for (int count = 0;
(count < options.count || options.count == 0) &&
(options.timeout == 0 || std::difftime(std::time(0), start) < options.timeout);
count++) {
if (!options.replyto.empty()) message.setReplyTo(Address(options.replyto));
std::string id = options.id.empty() ? Uuid(true).str() : options.id;
std::stringstream spoutid;
spoutid << id << ":" << count;
message.getProperties()["spout-id"] = spoutid.str();
if (options.print) std::cout << message << std::endl;
sender.send(message);
}
session.sync();
connection.close();
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
connection.close();
}
}
return 1;
}