blob: 2a7a030a117d8f576f5549c279cc1ebafd236bf0 [file] [log] [blame]
#--
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#++
module Qpid::Proton::Reactor
# @private
class InternalTransactionHandler < Qpid::Proton::Handler::OutgoingMessageHandler
def initialize
super
end
def on_settled(event)
if event.delivery.respond_to? :transaction
event.transaction = event.delivery.transaction
event.delivery.transaction.handle_outcome(event)
end
end
end
# A representation of the AMQP concept of a container which, loosely
# speaking, is something that establishes links to or from another
# container on which messages are transferred.
#
# This is an extension to the Reactor classthat adds convenience methods
# for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender
# and Qpid::Proton::Receiver.
#
# @example
#
class Container < Reactor
include Qpid::Proton::Util::Reactor
include Qpid::Proton::Util::UUID
attr_accessor :container_id
attr_accessor :global_handler
def initialize(handlers, options = {})
super(handlers, options)
# only do the following if we're creating a new instance
if !options.has_key?(:impl)
@ssl = SSLConfig.new
if options[:global_handler]
self.global_handler = GlobalOverrides.new(options[:global_handler])
else
# very ugly, but using self.global_handler doesn't work in the constructor
ghandler = Reactor.instance_method(:global_handler).bind(self).call
ghandler = GlobalOverrides.new(ghandler)
Reactor.instance_method(:global_handler=).bind(self).call(ghandler)
end
@trigger = nil
@container_id = generate_uuid
end
end
# Initiates the establishment of an AMQP connection.
#
# @param options [Hash] A hash of named arguments.
#
def connect(options = {})
conn = self.connection(options[:handler])
conn.container = self.container_id || generate_uuid
connector = Connector.new(conn)
conn.overrides = connector
if !options[:url].nil?
connector.address = URLs.new([options[:url]])
elsif !options[:urls].nil?
connector.address = URLs.new(options[:urls])
elsif !options[:address].nil?
connector.address = URLs.new([Qpid::Proton::URL.new(options[:address])])
else
raise ::ArgumentError.new("either :url or :urls or :address required")
end
connector.heartbeat = options[:heartbeat] if !options[:heartbeat].nil?
if !options[:reconnect].nil?
connector.reconnect = options[:reconnect]
else
connector.reconnect = Backoff.new()
end
connector.ssl_domain = SessionPerConnection.new # TODO seems this should be configurable
conn.open
return conn
end
def _session(context)
if context.is_a?(Qpid::Proton::URL)
return self._session(self.connect(:url => context))
elsif context.is_a?(Qpid::Proton::Session)
return context
elsif context.is_a?(Qpid::Proton::Connection)
if context.session_policy?
return context.session_policy.session(context)
else
return self.create_session(context)
end
else
return context.session
end
end
# Initiates the establishment of a link over which messages can be sent.
#
# @param context [String, URL] The context.
# @param opts [Hash] Additional options.
# @param opts [String, Qpid::Proton::URL] The target address.
# @param opts [String] :source The source address.
# @param opts [Boolean] :dynamic
# @param opts [Object] :handler
# @param opts [Object] :tag_generator The tag generator.
# @param opts [Hash] :options Addtional link options
#
# @return [Sender] The sender.
#
def create_sender(context, opts = {})
if context.is_a?(::String)
context = Qpid::Proton::URL.new(context)
end
target = opts[:target]
if context.is_a?(Qpid::Proton::URL) && target.nil?
target = context.path
end
session = self._session(context)
sender = session.sender(opts[:name] ||
id(session.connection.container,
target, opts[:source]))
sender.source.address = opts[:source] if !opts[:source].nil?
sender.target.address = target if target
sender.handler = opts[:handler] if !opts[:handler].nil?
sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil?
self._apply_link_options(opts[:options], sender)
sender.open
return sender
end
# Initiates the establishment of a link over which messages can be received.
#
# There are two accepted arguments for the context
# 1. If a Connection is supplied then the link is established using that
# object. The source, and optionally the target, address can be supplied
# 2. If it is a String or a URL then a new Connection is created on which
# the link will be attached. If a path is specified, but not the source
# address, then the path of the URL is used as the target address.
#
# The name will be generated for the link if one is not specified.
#
# @param context [Connection, URL, String] The connection or the address.
# @param opts [Hash] Additional otpions.
# @option opts [String, Qpid::Proton::URL] The source address.
# @option opts [String] :target The target address
# @option opts [String] :name The link name.
# @option opts [Boolean] :dynamic
# @option opts [Object] :handler
# @option opts [Hash] :options Additional link options.
#
# @return [Receiver
#
def create_receiver(context, opts = {})
if context.is_a?(::String)
context = Qpid::Proton::URL.new(context)
end
source = opts[:source]
if context.is_a?(Qpid::Proton::URL) && source.nil?
source = context.path
end
session = self._session(context)
receiver = session.receiver(opts[:name] ||
id(session.connection.container,
source, opts[:target]))
receiver.source.address = source if source
receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic]
receiver.target.address = opts[:target] if !opts[:target].nil?
receiver.handler = opts[:handler] if !opts[:handler].nil?
self._apply_link_options(opts[:options], receiver)
receiver.open
return receiver
end
def declare_transaction(context, handler = nil, settle_before_discharge = false)
if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil?
class << context
attr_accessor :txn_ctl
end
context.txn_ctl = self.create_sender(context, nil, "txn-ctl",
InternalTransactionHandler.new())
end
return Transaction.new(context.txn_ctl, handler, settle_before_discharge)
end
# Initiates a server socket, accepting incoming AMQP connections on the
# interface and port specified.
#
# @param url []
# @param ssl_domain []
#
def listen(url, ssl_domain = nil)
url = Qpid::Proton::URL.new(url)
acceptor = self.acceptor(url.host, url.port)
ssl_config = ssl_domain
if ssl_config.nil? && (url.scheme == 'amqps') && @ssl
ssl_config = @ssl.server
end
if !ssl_config.nil?
acceptor.ssl_domain(ssl_config)
end
return acceptor
end
def do_work(timeout = nil)
self.timeout = timeout unless timeout.nil?
self.process
end
def id(container, remote, local)
if !local.nil? && !remote.nil?
"#{container}-#{remote}-#{local}"
elsif !local.nil?
"#{container}-#{local}"
elsif !remote.nil?
"#{container}-#{remote}"
else
"#{container}-#{generate_uuid}"
end
end
def _apply_link_options(options, link)
if !options.nil? && !options.empty?
if !options.is_a?(::List)
options = [Options].flatten
end
options.each {|option| o.apply(link) if o.test(link)}
end
end
def to_s
"#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>"
end
end
end