| #!/usr/bin/env perl |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| use strict; |
| use warnings; |
| |
| use qpid; |
| use Getopt::Long; |
| use Pod::Usage; |
| |
| my $url = "127.0.0.1"; |
| my $timeout = 0; |
| my $forever = 0; |
| my $count = 0; |
| my $connectionOptions = ""; |
| my $address = "amq.direct"; |
| my $help; |
| |
| my $result = GetOptions( |
| "broker|b=s" => \$url, |
| "timeout|t=i" => \$timeout, |
| "forever|f" => \$forever, |
| "connection-options=s" => \$connectionOptions, |
| "count|c=i" => \$count, |
| "help|h" => \$help |
| ) || pod2usage( -verbose => 0 ); |
| |
| pod2usage( -verbose => 1 ) if $help; |
| |
| if ( $#ARGV ge 0 ) { |
| $address = $ARGV[0]; |
| } |
| |
| sub getTimeout { |
| |
| # returns either the named duration FOREVER if the |
| # forever cmdline argument was used, otherwise creates |
| # a new Duration of the specified length |
| return ($forever) |
| ? qpid::messaging::Duration::FOREVER |
| : new qpid::messaging::Duration( $timeout * 1000 ); |
| } |
| |
| sub printProperties { |
| my $h = shift(); |
| return qq[{${\(join', ',map"'$_': '$h->{$_}'",keys%$h)}}]; |
| } |
| |
| # create a connection object |
| my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); |
| |
| eval { |
| # open the connection, then create a session and receiver |
| $connection->open(); |
| my $session = $connection->create_session(); |
| my $receiver = $session->create_receiver($address); |
| my $timeout = getTimeout(); |
| my $message = new qpid::messaging::Message(); |
| my $i = 0; |
| |
| for ( ; ; ) { |
| eval { $message = $receiver->fetch($timeout); }; |
| |
| if ($@) { |
| last; |
| } |
| |
| # check if the message was on that was redelivered |
| my $redelivered = |
| ( $message->get_redelivered ) ? "redelivered=True, " : ""; |
| print "Message(" |
| . $redelivered |
| . "properties=" |
| . printProperties( $message->get_properties() ) |
| . ", content='"; |
| |
| # if the message content was a map, then we will print |
| # it out as a series of name => value pairs |
| if ( $message->get_content_type() eq "amqp/map" ) { |
| my $content = $message->get_content(); |
| map { print "\n$_ => $content->{$_}"; } keys %{$content}; |
| } |
| else { |
| # it's not a map, so just print the content as a string |
| print $message->get_content(); |
| } |
| print "')\n"; |
| |
| # if the message had a reply-to address, then we'll send a |
| # response back letting the send know the message was processed |
| my $replyto = $message->get_reply_to(); |
| if ( $replyto->get_name() ) { |
| print "Replying to " . $message->get_reply_to()->str() . "...\n"; |
| |
| # create a temporary sender for the specified queue |
| my $sender = $session->create_sender($replyto); |
| my $response = |
| new qpid::messaging::Message("received by the server."); |
| $sender->send($response); |
| } |
| |
| # acknowledge all messages received on this queue so far |
| $session->acknowledge(); |
| |
| if ( $count and ( ++$i == $count ) ) { |
| last; |
| } |
| } |
| |
| # close everything to clean up |
| $receiver->close(); |
| $session->close(); |
| $connection->close(); |
| }; |
| |
| if ($@) { |
| $connection->close(); |
| die $@; |
| } |
| |
| __END__ |
| |
| =head1 NAME |
| |
| drain - Drains messages from the specified address |
| |
| =head1 SYNOPSIS |
| |
| Options: |
| -h, --help show this message |
| -b VALUE, --broker VALUE url of broker to connect to |
| -t VALUE, --timeout VALUE timeout in seconds to wait before exiting |
| -f, --forever ignore timeout and wait forever |
| --connection-options VALUE connection options string in the form {name1:value1, name2:value2} |
| -c VALUE, --count VALUE number of messages to read before exiting |
| |
| =cut |
| |