blob: f7a710c4856d751bcd89873418376cc26226ce6a [file] [log] [blame]
#!/usr/bin/env perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
use strict;
use warnings;
use qpid;
use Getopt::Long;
use Pod::Usage;
my $url = "127.0.0.1";
my $timeout = 0;
my $forever = 0;
my $count = 0;
my $connectionOptions = "";
my $address = "amq.direct";
my $help;
my $result = GetOptions(
"broker|b=s" => \$url,
"timeout|t=i" => \$timeout,
"forever|f" => \$forever,
"connection-options=s" => \$connectionOptions,
"count|c=i" => \$count,
"help|h" => \$help
) || pod2usage( -verbose => 0 );
pod2usage( -verbose => 1 ) if $help;
if ( $#ARGV ge 0 ) {
$address = $ARGV[0];
}
sub getTimeout {
# returns either the named duration FOREVER if the
# forever cmdline argument was used, otherwise creates
# a new Duration of the specified length
return ($forever)
? qpid::messaging::Duration::FOREVER
: new qpid::messaging::Duration( $timeout * 1000 );
}
sub printProperties {
my $h = shift();
return qq[{${\(join', ',map"'$_': '$h->{$_}'",keys%$h)}}];
}
# create a connection object
my $connection = new qpid::messaging::Connection( $url, $connectionOptions );
eval {
# open the connection, then create a session and receiver
$connection->open();
my $session = $connection->create_session();
my $receiver = $session->create_receiver($address);
my $timeout = getTimeout();
my $message = new qpid::messaging::Message();
my $i = 0;
for ( ; ; ) {
eval { $message = $receiver->fetch($timeout); };
if ($@) {
last;
}
# check if the message was on that was redelivered
my $redelivered =
( $message->get_redelivered ) ? "redelivered=True, " : "";
print "Message("
. $redelivered
. "properties="
. printProperties( $message->get_properties() )
. ", content='";
# if the message content was a map, then we will print
# it out as a series of name => value pairs
if ( $message->get_content_type() eq "amqp/map" ) {
my $content = $message->get_content();
map { print "\n$_ => $content->{$_}"; } keys %{$content};
}
else {
# it's not a map, so just print the content as a string
print $message->get_content();
}
print "')\n";
# if the message had a reply-to address, then we'll send a
# response back letting the send know the message was processed
my $replyto = $message->get_reply_to();
if ( $replyto->get_name() ) {
print "Replying to " . $message->get_reply_to()->str() . "...\n";
# create a temporary sender for the specified queue
my $sender = $session->create_sender($replyto);
my $response =
new qpid::messaging::Message("received by the server.");
$sender->send($response);
}
# acknowledge all messages received on this queue so far
$session->acknowledge();
if ( $count and ( ++$i == $count ) ) {
last;
}
}
# close everything to clean up
$receiver->close();
$session->close();
$connection->close();
};
if ($@) {
$connection->close();
die $@;
}
__END__
=head1 NAME
drain - Drains messages from the specified address
=head1 SYNOPSIS
Options:
-h, --help show this message
-b VALUE, --broker VALUE url of broker to connect to
-t VALUE, --timeout VALUE timeout in seconds to wait before exiting
-f, --forever ignore timeout and wait forever
--connection-options VALUE connection options string in the form {name1:value1, name2:value2}
-c VALUE, --count VALUE number of messages to read before exiting
=cut