blob: 7365e732bf3e848cf8865d912cc7198aaf4d941e [file] [log] [blame]
#!/usr/bin/perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
use strict;
use warnings;
use cqpid_perl;
use Getopt::Long;
use Time::Local;
my $url = "127.0.0.1";
my $timeout = 0;
my $count = 1;
my $id = "";
my $replyto = "";
my @properties;
my @entries;
my $content = "";
my $connectionOptions = "";
my $address = "amq.direct";
my $result = GetOptions(
"broker|b=s" => \ $url,
"timeout|t=i" => \ $timeout,
"count|c=i" => \ $count,
"id|i=s" => \ $id,
"replyto=s" => \ $replyto,
"property|p=s@" => \ @properties,
"map|m=s@" => \ @entries,
"content=s" => \ $content,
"connection-options=s" => \ $connectionOptions,
);
if (! $result) {
print "Usage: perl drain.pl [OPTIONS]\n";
}
if ($#ARGV ge 0) {
$address = $ARGV[0]
}
sub setEntries {
my ($content) = @_;
foreach (@entries) {
my ($name, $value) = split("=", $_);
$content->{$name} = $value;
}
}
sub setProperties {
my ($message) = @_;
foreach (@properties) {
my ($name, $value) = split("=", $_);
$message->getProperties()->{$name} = $value;
}
}
my $connection = new cqpid_perl::Connection($url, $connectionOptions);
eval {
$connection->open();
my $session = $connection->createSession();
my $sender = $session->createSender($address);
my $message = new cqpid_perl::Message();
setProperties($message) if (@properties);
if (@entries) {
my $content = {};
setEntries($content);
cqpid_perl::encode($content, $message);
}
elsif ($content) {
$message->setContent($content);
$message->setContentType("text/plain");
}
my $receiver;
if ($replyto) {
my $responseQueue = new cqpid_perl::Address($replyto);
$receiver = $session->createReceiver($responseQueue);
$message->setReplyTo($responseQueue);
}
my $start = localtime;
my @s = split(/[:\s]/, $start);
my $s = "$s[3]$s[4]$s[5]";
my $n = $s;
for (my $i = 0;
($i < $count || $count == 0) and
($timeout == 0 || abs($n - $s) < $timeout);
$i++) {
$sender->send($message);
if ($receiver) {
my $response = $receiver->fetch();
print "$i -> " . $response->getContent() . "\n";
}
my $now = localtime;
my @n = split(/[:\s]/, $now);
my $n = "$n[3]$n[4]$n[5]";
}
$session->sync();
$connection->close();
};
if ($@) {
$connection->close();
die $@;
}