blob: a6cf35e1897eebe8089f0484a95867d1eacfc377 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
$:.unshift File.join(File.dirname(__FILE__), "..", "lib")
require 'qpid'
require 'optparse'
options = {
:broker => "localhost",
:timeout => Qpid::Messaging::Duration::IMMEDIATE,
:count => 1,
:forever => false,
:connection_options => ""
}
opts = OptionParser.new do |opts|
opts.banner = "Usage: drain.rb [OPTIONS] ADDRESS"
opts.separator ""
opts.separator "Drains messages from the specified address"
opts.separator ""
opts.on("-h", "--help",
"show this message") do
puts opts
exit
end
opts.on("-b", "--broker VALUE",
"url of broker to connect to") do |broker|
options[:broker] = broker
end
opts.on("-t", "--timeout VALUE", Integer,
"timeout in seconds to wait before exiting") do |timeout|
options[:timeout] = Qpid::Messaging::Duration.new timeout * 1000
end
opts.on("-f", "--forever",
"ignore timeout and wait forever") do
options[:forever] = true
end
opts.on("--connection-options VALUE",
"connection options string in the form {name1:value,name2:value2}") do |conopts|
options[:connection_options] = conopts
end
opts.on("-c", "--count VALUE", Integer,
"number of messages to read before exiting") do |count|
options[:count] = count
end
end
opts.parse!(ARGV)
options[:address] = ARGV[0] || ""
connection = Qpid::Messaging::Connection.new options[:broker], options[:connection_options]
connection.open
def render_map map
print "{"
map.keys.sort.each_with_index {|key,index| print "#{index > 0 ? ', ' : ''}#{key}:#{map[key]}"}
print "}"
end
begin
session = connection.create_session
receiver = session.create_receiver options[:address]
done = false
count = 0
options[:timeout] = Qpid::Messaging::Duration::FOREVER if options[:forever]
while !done && (count < options[:count])
message = receiver.fetch(options[:timeout])
print "Message(properties="
render_map message.properties
print ", content="
if message.content_type == "amqp/map"
print "'#{render_map message.content}')"
else
print "'#{message.content}'"
end
print ")\n"
session.acknowledge message
count += 1
end
rescue Exception => error
puts "Exception: #{error.to_s}"
end
connection.close