blob: d8ac860143d9be0353e22e22bc2886790d32739b [file] [log] [blame]
#!/usr/bin/env perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
use strict;
use warnings;
use qpid;
use Getopt::Long;
use Pod::Usage;
use Time::Local;
my $url = "127.0.0.1";
my $timeout = 0;
my $count = 1;
my $id = "";
my $replyto = "";
my @properties;
my @entries;
my $content = "";
my $connectionOptions = "";
my $address = "amq.direct";
my $help;
my $result = GetOptions(
"broker|b=s" => \$url,
"timeout|t=i" => \$timeout,
"count|c=i" => \$count,
"id|i=s" => \$id,
"replyto=s" => \$replyto,
"property|p=s@" => \@properties,
"map|m=s@" => \@entries,
"content=s" => \$content,
"connection-options=s" => \$connectionOptions,
"help|h" => \$help
) || pod2usage( -verbose => 0 );
pod2usage( -verbose => 1 ) if $help;
if ( $#ARGV ge 0 ) {
$address = $ARGV[0];
}
sub setEntries {
my ($content) = @_;
foreach (@entries) {
my ( $name, $value ) = split( "=", $_ );
$content->{$name} = $value;
}
}
sub setProperties {
my ($message) = @_;
foreach (@properties) {
my ( $name, $value ) = split( "=", $_ );
$message->setProperty( $name, $value );
}
}
# create a connection object
my $connection = new qpid::messaging::Connection( $url, $connectionOptions );
eval {
# open the connection, create a session and then a sender
$connection->open();
my $session = $connection->create_session();
my $sender = $session->create_sender($address);
# create a message to be sent
my $message = new qpid::messaging::Message();
setProperties($message) if (@properties);
if (@entries) {
my $content = {};
setEntries($content);
$message->set_content($content);
}
elsif ($content) {
$message->set_content($content);
$message->set_content_type("text/plain");
}
# if a reply-to address was supplied, then create a receiver from the
# session and wait for a response to be sent
my $receiver;
if ($replyto) {
my $responseQueue = new qpid::messaging::Address($replyto);
$receiver = $session->create_receiver($responseQueue);
$message->set_reply_to($responseQueue);
}
my $start = localtime;
my @s = split( /[:\s]/, $start );
my $s = "$s[3]$s[4]$s[5]";
my $n = $s;
for (
my $i = 0 ;
( $i < $count || $count == 0 )
and ( $timeout == 0 || abs( $n - $s ) < $timeout ) ;
$i++
)
{
$sender->send($message);
if ($receiver) {
print "Waiting for a response.\n";
my $response = $receiver->fetch();
print "$i -> " . $response->get_content() . "\n";
}
my $now = localtime;
my @n = split( /[:\s]/, $now );
my $n = "$n[3]$n[4]$n[5]";
}
$session->sync();
$connection->close();
};
if ($@) {
$connection->close();
die $@;
}
__END__
=head1 NAME
spout - Send messages to the specified address
=head1 SYNOPSIS
Usage: spout [OPTIONS] ADDRESS
Options:
-h, --help show this message
-b VALUE, --broker VALUE url of broker to connect to
-t VALUE, --timeout VALUE exit after the specified time
-c VALUE, --count VALUE stop after count messageshave been sent, zero disables
-i VALUE, --id VALUE use the supplied id instead of generating one
--replyto VALUE specify reply-to value
-P VALUE, --property VALUE specify message property
-M VALUE, --map VALUE specify entry for map content
--content VALUE specify textual content
--connection-options VALUE connection options string in the form {name1:value1, name2:value2}
=cut