| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| require 'qpid_proton' |
| require 'optparse' |
| require 'pathname' |
| |
| require_relative 'lib/debugging' |
| |
| class Exchange |
| |
| include Debugging |
| |
| def initialize(dynamic = false) |
| @dynamic = dynamic |
| @queue = Queue.new |
| @consumers = [] |
| end |
| |
| def subscribe(consumer) |
| debug("subscribing #{consumer}") if $options[:debug] |
| @consumers << (consumer) |
| debug(" there are #{@consumers.size} consumers") if $options[:debug] |
| end |
| |
| def unsubscribe(consumer) |
| debug("unsubscribing #{consumer}") if $options[:debug] |
| if @consumers.include?(consumer) |
| @consumers.delete(consumer) |
| else |
| debug(" consumer doesn't exist") if $options[:debug] |
| end |
| debug(" there are #{@consumers.size} consumers") if $options[:debug] |
| @consumers.empty? && (@dynamic || @queue.empty?) |
| end |
| |
| def publish(message) |
| debug("queueing message: #{message.body}") if $options[:debug] |
| @queue << message |
| self.dispatch |
| end |
| |
| def dispatch(consumer = nil) |
| debug("dispatching: consumer=#{consumer}") if $options[:debug] |
| if consumer |
| c = [consumer] |
| else |
| c = @consumers |
| end |
| |
| while self.deliver_to(c) do |
| end |
| end |
| |
| def deliver_to(consumers) |
| debug("delivering to #{consumers.size} consumer(s)") if $options[:debug] |
| result = false |
| consumers.each do |consumer| |
| debug(" current consumer=#{consumer} credit=#{consumer.credit}") if $options[:debug] |
| if consumer.credit > 0 && !@queue.empty? |
| consumer.send(@queue.pop(true)) |
| result = true |
| end |
| end |
| return result |
| end |
| |
| end |
| |
| class Broker < Qpid::Proton::Handler::MessagingHandler |
| |
| include Debugging |
| |
| def initialize(url) |
| super() |
| @url = url |
| @queues = {} |
| end |
| |
| def on_start(event) |
| debug("on_start event") if $options[:debug] |
| @acceptor = event.container.listen(@url) |
| print "Listening on #{@url}\n" |
| STDOUT.flush |
| end |
| |
| def queue(address) |
| debug("fetching queue for #{address}: (there are #{@queues.size} queues)") if $options[:debug] |
| unless @queues.has_key?(address) |
| debug(" creating new queue") if $options[:debug] |
| @queues[address] = Exchange.new |
| else |
| debug(" using existing queue") if $options[:debug] |
| end |
| result = @queues[address] |
| debug(" returning #{result}") if $options[:debug] |
| return result |
| end |
| |
| def on_link_opening(event) |
| debug("processing on_link_opening") if $options[:debug] |
| debug("link is#{event.link.sender? ? '' : ' not'} a sender") if $options[:debug] |
| if event.link.sender? |
| if event.link.remote_source.dynamic? |
| address = SecureRandom.uuid |
| event.link.source.address = address |
| q = Exchange.new(true) |
| @queues[address] = q |
| q.subscribe(event.link) |
| elsif event.link.remote_source.address |
| event.link.source.address = event.link.remote_source.address |
| self.queue(event.link.source.address).subscribe(event.link) |
| end |
| elsif event.link.remote_target.address |
| event.link.target.address = event.link.remote_target.address |
| end |
| end |
| |
| def unsubscribe(link) |
| debug("unsubscribing #{link.address}") if $options[:debug] |
| if @queues.has_key?(link.source.address) |
| if @queues[link.source.address].unsubscribe(link) |
| @queues.delete(link.source.address) |
| end |
| end |
| end |
| |
| def on_link_closing(event) |
| self.unsubscribe(event.link) if event.link.sender? |
| end |
| |
| def on_connection_closing(event) |
| self.remove_stale_consumers(event.connection) |
| end |
| |
| def on_disconnected(event) |
| self.remove_stale_consumers(event.connection) |
| end |
| |
| def remove_stale_consumers(connection) |
| l = connection.link_head(Qpid::Proton::Endpoint::REMOTE_ACTIVE) |
| while !l.nil? |
| self.unsubscribe(l) if l.sender? |
| l = l.next(Qpid::Proton::Endpoint::REMOTE_ACTIVE) |
| end |
| end |
| |
| def on_sendable(event) |
| debug("on_sendable event") if $options[:debug] |
| q = self.queue(event.link.source.address) |
| debug(" dispatching #{event.message} to #{q}") if $options[:debug] |
| q.dispatch(event.link) |
| end |
| |
| def on_message(event) |
| debug("on_message event") if $options[:debug] |
| q = self.queue(event.link.target.address) |
| debug(" dispatching #{event.message} to #{q}") if $options[:debug] |
| q.publish(event.message) |
| end |
| |
| end |
| |
| $options = { |
| :address => "localhost:5672", |
| :debug => false |
| } |
| |
| OptionParser.new do |opts| |
| opts.banner = "Usage: #{Pathname.new(__FILE__).basename} [$options]" |
| |
| opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{$options[:address]}).") do |address| |
| $options[:address] = address |
| end |
| |
| opts.on("-d", "--debug", "Enable debugging output (def. #{$options[:debug]})") do |
| $options[:debug] = true |
| end |
| |
| end.parse! |
| |
| begin |
| Qpid::Proton::Reactor::Container.new(Broker.new($options[:address])).run |
| rescue Interrupt |
| end |