blob: 36bcf7414d520f2d94ea28c7355489837eb18dd3 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
module Qpid::Proton::Messenger
# The +Messenger+ class defines a high level interface for
# sending and receiving Messages. Every Messenger contains
# a single logical queue of incoming messages and a single
# logical queue of outgoing messages. These messages in these
# queues may be destined for, or originate from, a variety of
# addresses.
#
# The messenger interface is single-threaded. All methods
# except one ( #interrupt ) are intended to be used from within
# the messenger thread.
#
# === Sending & Receiving Messages
#
# The Messenger class works in conjuction with the Message class. The
# Message class is a mutable holder of message content.
#
# The put method copies its Message to the outgoing queue, and may
# send queued messages if it can do so without blocking. The send
# method blocks until it has sent the requested number of messages,
# or until a timeout interrupts the attempt.
#
# Similarly, the recv method receives messages into the incoming
# queue, and may block as it attempts to receive the requested number
# of messages, or until timeout is reached. It may receive fewer
# than the requested number. The get method pops the
# eldest Message off the incoming queue and copies it into the Message
# object that you supply. It will not block.
#
# The blocking attribute allows you to turn off blocking behavior entirely,
# in which case send and recv will do whatever they can without
# blocking, and then return. You can then look at the number
# of incoming and outgoing messages to see how much outstanding work
# still remains.
#
class Messenger
include Qpid::Proton::Util::ErrorHandler
can_raise_error [:send, :receive, :password=, :start, :stop,
:perform_put, :perform_get, :interrupt,
:route, :rewrite, :accept, :reject,
:incoming_window=, :outgoing_window=]
# Creates a new +Messenger+.
#
# The +name+ parameter is optional. If one is not provided then
# a unique name is generated.
#
# ==== Options
#
# * name - the name (def. nil)
#
def initialize(name = nil)
@impl = Cproton.pn_messenger(name)
@selectables = {}
ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
end
def self.finalize!(impl) # :nodoc:
proc {
Cproton.pn_messenger_free(impl)
}
end
# Returns the name.
#
def name
Cproton.pn_messenger_name(@impl)
end
# This property contains the password for the Messenger.private_key
# file, or +nil+ if the file is not encrypted.
#
# ==== Arguments
#
# * password - the password
#
def password=(password)
Cproton.pn_messenger_set_password(@impl, password)
end
# Returns the password property for the Messenger.private_key file.
#
def password
Cproton.pn_messenger_get_password(@impl)
end
# Sets the timeout period, in milliseconds.
#
# A negative timeout period implies an infinite timeout.
#
# ==== Options
#
# * timeout - the timeout period
#
def timeout=(timeout)
raise TypeError.new("invalid timeout: #{timeout}") if timeout.nil?
Cproton.pn_messenger_set_timeout(@impl, timeout)
end
# Returns the timeout period
#
def timeout
Cproton.pn_messenger_get_timeout(@impl)
end
# Returns true if blocking mode is enabled.
#
# Enable or disable blocking behavior during message sending
# and receiving. This affects every blocking call, with the
# exception of work(). Currently, the affected calls are
# send, recv, and stop.
def blocking?
Cproton.pn_messenger_is_blocking(@impl)
end
# Sets the blocking mode.
def blocking=(blocking)
Cproton.pn_messenger_set_blocking(@impl, blocking)
end
# Returns true if passive mode is enabled.
#
def passive?
Cproton.pn_messenger_is_passive(@impl)
end
# Turns passive mode on or off.
#
# When set to passive mode, Messenger will not attempt to perform I/O
# operations internally. In this mode it is necesssary to use the
# Selectable type to drive any I/O needed to perform requestioned
# actions.
#
# In this mode Messenger will never block.
#
def passive=(mode)
Cproton.pn_messenger_set_passive(@impl, mode)
end
def deadline
tstamp = Cproton.pn_messenger_deadline(@impl)
return tstamp / 1000.0 unless tstamp.nil?
end
# Reports whether an error occurred.
#
def error?
!Cproton.pn_messenger_errno(@impl).zero?
end
# Returns the most recent error number.
#
def errno
Cproton.pn_messenger_errno(@impl)
end
# Returns the most recent error message.
#
def error
Cproton.pn_error_text(Cproton.pn_messenger_error(@impl))
end
# Clears the current error state.
#
def clear_error
error = Cproton.pn_messenger_error(@impl)
unless error.nil?
Cproton.pn_error_clear(error)
end
end
# Currently a no-op placeholder.
# For future compatibility, do not send or recv messages
# before starting the +Messenger+.
#
def start
Cproton.pn_messenger_start(@impl)
end
# Stops the +Messenger+, preventing it from sending or receiving
# any more messages.
#
def stop
Cproton.pn_messenger_stop(@impl)
end
# Returns true if a Messenger is in the stopped state.
# This function does not block.
#
def stopped?
Cproton.pn_messenger_stopped(@impl)
end
# Subscribes the Messenger to messages originating from the
# specified source. The source is an address as specified in the
# Messenger introduction with the following addition. If the
# domain portion of the address begins with the '~' character, the
# Messenger will interpret the domain as host/port, bind to it,
# and listen for incoming messages. For example "~0.0.0.0",
# "amqp://~0.0.0.0" will all bind to any local interface and
# listen for incoming messages. An address of "amqps://~0.0.0.0"
# will only permit incoming SSL connections.
#
# ==== Options
#
# * address - the source address to be subscribe
# * timeout - an optional time-to-live value, in seconds, for the
# subscription
#
def subscribe(address, timeout=0)
raise TypeError.new("invalid address: #{address}") if address.nil?
subscription = Cproton.pn_messenger_subscribe_ttl(@impl, address, timeout)
raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil?
Subscription.new(subscription)
end
# Path to a certificate file for the +Messenger+.
#
# This certificate is used when the +Messenger+ accepts or establishes
# SSL/TLS connections. This property must be specified for the
# Messenger to accept incoming SSL/TLS connections and to establish
# client authenticated outgoing SSL/TLS connection. Non client authenticated
# outgoing SSL/TLS connections do not require this property.
#
# ==== Options
#
# * certificate - the certificate
#
def certificate=(certificate)
Cproton.pn_messenger_set_certificate(@impl, certificate)
end
# Returns the path to a certificate file.
#
def certificate
Cproton.pn_messenger_get_certificate(@impl)
end
# Path to a private key file for the +Messenger+.
#
# The property must be specified for the +Messenger+ to accept incoming
# SSL/TLS connections and to establish client authenticated outgoing
# SSL/TLS connections. Non client authenticated SSL/TLS connections
# do not require this property.
#
# ==== Options
#
# * key - the key file
#
def private_key=(key)
Cproton.pn_messenger_set_private_key(@impl, key)
end
# Returns the path to a private key file.
#
def private_key
Cproton.pn_messenger_get_private_key(@impl)
end
# A path to a database of trusted certificates for use in verifying the
# peer on an SSL/TLS connection. If this property is +nil+, then the
# peer will not be verified.
#
# ==== Options
#
# * certificates - the certificates path
#
def trusted_certificates=(certificates)
Cproton.pn_messenger_set_trusted_certificates(@impl,certificates)
end
# The path to the databse of trusted certificates.
#
def trusted_certificates
Cproton.pn_messenger_get_trusted_certificates(@impl)
end
# Places the content contained in the message onto the outgoing
# queue of the Messenger.
#
# This method will never block, however it will send any unblocked
# Messages in the outgoing queue immediately and leave any blocked
# Messages remaining in the outgoing queue.
# The send call may then be used to block until the outgoing queue
# is empty. The outgoing attribute may be used to check the depth
# of the outgoing queue.
#
# ==== Options
#
# * message - the message
#
def put(message)
if message.nil?
raise TypeError.new("invalid message: #{message}")
end
unless message.kind_of?(Qpid::Proton::Message)
raise ::ArgumentError.new("invalid message type: #{message.class}")
end
# encode the message first
message.pre_encode
perform_put(message)
return outgoing_tracker
end
private
def perform_put(message) # :nodoc:
Cproton.pn_messenger_put(@impl, message.impl)
end
public
# This call will block until the indicated number of messages
# have been sent, or until the operation times out.
# If n is -1 this call will block until all outgoing messages
# have been sent. If n is 0 then this call will send whatever
# it can without blocking.
#
def send(n = -1)
Cproton.pn_messenger_send(@impl, n)
end
# Moves the message from the head of the incoming message queue into
# the supplied message object. Any content in the supplied message
# will be overwritten.
# A tracker for the incoming Message is returned. The tracker can
# later be used to communicate your acceptance or rejection of the
# Message.
#
# If no message is provided in the argument, then one is created. In
# either case, the one returned will be the fetched message.
#
# ==== Options
#
# * msg - the (optional) +Message+ instance to be used
#
def get(msg = nil)
msg_impl = nil
if msg.nil? then
msg_impl = nil
else
msg_impl = msg.impl
end
perform_get(msg_impl)
msg.post_decode unless msg.nil?
return incoming_tracker
end
private
def perform_get(msg) # :nodoc:
Cproton.pn_messenger_get(@impl, msg)
end
public
# Receives up to limit messages into the incoming queue. If no value
# for limit is supplied, this call will receive as many messages as it
# can buffer internally. If the Messenger is in blocking mode, this
# call will block until at least one Message is available in the
# incoming queue.
#
# Options ====
#
# * limit - the maximum number of messages to receive
#
def receive(limit = -1)
Cproton.pn_messenger_recv(@impl, limit)
end
# Returns true if the messenger is currently receiving data.
def receiving?
Cproton.pn_messenger_receiving(@impl)
end
# Attempts interrupting of the messenger thread.
#
# The Messenger interface is single-threaded, and this is the only
# function intended to be called from outside of is thread.
#
# Call this from a non-Messenger thread to interrupt it while it
# is blocking. This will cause a ::InterruptError to be raised.
#
# If there is no currently blocking call, then the next blocking
# call will be affected, even if it is within the same thread that
# originated the interrupt.
#
def interrupt
Cproton.pn_messenger_interrupt(@impl)
end
# Sends or receives any outstanding messages queued for a Messenger.
#
# This will block for the indicated timeout. This method may also do I/O
# other than sending and receiving messages. For example, closing
# connections after stop() has been called.
#
def work(timeout=-1)
err = Cproton.pn_messenger_work(@impl, timeout)
if (err == Cproton::PN_TIMEOUT) then
return false
else
check_for_error(err)
return true
end
end
# Returns the number messages in the outgoing queue that have not been
# transmitted.
#
def outgoing
Cproton.pn_messenger_outgoing(@impl)
end
# Returns the number of messages in the incoming queue that have not
# been retrieved.
#
def incoming
Cproton.pn_messenger_incoming(@impl)
end
# Adds a routing rule to the Messenger's internal routing table.
#
# The route procedure may be used to influence how a Messenger will
# internally treat a given address or class of addresses. Every call
# to the route procedure will result in Messenger appending a routing
# rule to its internal routing table.
#
# Whenever a Message is presented to a Messenger for delivery, it
# will match the address of this message against the set of routing
# rules in order. The first rule to match will be triggered, and
# instead of routing based on the address presented in the message,
# the Messenger will route based on the address supplied in the rule.
#
# The pattern matching syntax supports two types of matches, a '%'
# will match any character except a '/', and a '*' will match any
# character including a '/'.
#
# A routing address is specified as a normal AMQP address, however it
# may additionally use substitution variables from the pattern match
# that triggered the rule.
#
# ==== Arguments
#
# * pattern - the address pattern
# * address - the target address
#
# ==== Examples
#
# # route messages sent to foo to the destionaty amqp://foo.com
# messenger.route("foo", "amqp://foo.com")
#
# # any message to foobar will be routed to amqp://foo.com/bar
# messenger.route("foobar", "amqp://foo.com/bar")
#
# # any message to bar/<path> will be routed to the same path within
# # the amqp://bar.com domain
# messenger.route("bar/*", "amqp://bar.com/$1")
#
# # route all Message objects over TLS
# messenger.route("amqp:*", "amqps:$1")
#
# # supply credentials for foo
# messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1")
#
# # supply credentials for all domains
# messenger.route("amqp://*", "amqp://user:password@$1")
#
# # route all addresses through a single proxy while preserving the
# # original destination
# messenger.route("amqp://%$/*", "amqp://user:password@proxy/$1/$2")
#
# # route any address through a single broker
# messenger.route("*", "amqp://user:password@broker/$1")
#
def route(pattern, address)
Cproton.pn_messenger_route(@impl, pattern, address)
end
# Similar to #route, except that the destination of
# the Message is determined before the message address is rewritten.
#
# The outgoing address is only rewritten after routing has been
# finalized. If a message has an outgoing address of
# "amqp://0.0.0.0:5678", and a rewriting rule that changes its
# outgoing address to "foo", it will still arrive at the peer that
# is listening on "amqp://0.0.0.0:5678", but when it arrives there,
# the receiver will see its outgoing address as "foo".
#
# The default rewrite rule removes username and password from addresses
# before they are transmitted.
#
# ==== Arguments
#
# * pattern - the outgoing address
# * address - the target address
#
def rewrite(pattern, address)
Cproton.pn_messenger_rewrite(@impl, pattern, address)
end
def selectable
impl = Cproton.pn_messenger_selectable(@impl)
# if we don't have any selectables, then return
return nil if impl.nil?
fd = Cproton.pn_selectable_get_fd(impl)
selectable = @selectables[fd]
if selectable.nil?
selectable = Selectable.new(self, impl)
@selectables[fd] = selectable
end
return selectable
end
# Returns a +Tracker+ for the message most recently sent via the put
# method.
#
def outgoing_tracker
impl = Cproton.pn_messenger_outgoing_tracker(@impl)
return nil if impl == -1
Tracker.new(impl)
end
# Returns a +Tracker+ for the most recently received message.
#
def incoming_tracker
impl = Cproton.pn_messenger_incoming_tracker(@impl)
return nil if impl == -1
Tracker.new(impl)
end
# Signal the sender that you have acted on the Message
# pointed to by the tracker. If no tracker is supplied,
# then all messages that have been returned by the get
# method are accepted, except those that have already been
# auto-settled by passing beyond your incoming window size.
#
# ==== Options
#
# * tracker - the tracker
#
def accept(tracker = nil)
raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker)
if tracker.nil? then
tracker = self.incoming_tracker
flag = Cproton::PN_CUMULATIVE
else
flag = 0
end
Cproton.pn_messenger_accept(@impl, tracker.impl, flag)
end
# Rejects the incoming message identified by the tracker.
# If no tracker is supplied, all messages that have been returned
# by the get method are rejected, except those that have already
# been auto-settled by passing beyond your outgoing window size.
#
# ==== Options
#
# * tracker - the tracker
#
def reject(tracker)
raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker)
if tracker.nil? then
tracker = self.incoming_tracker
flag = Cproton::PN_CUMULATIVE
else
flag = 0
end
Cproton.pn_messenger_reject(@impl, tracker.impl, flag)
end
# Gets the last known remote state of the delivery associated with
# the given tracker, as long as the Message is still within your
# outgoing window. (Also works on incoming messages that are still
# within your incoming queue. See TrackerStatus for details on the
# values returned.
#
# ==== Options
#
# * tracker - the tracker
#
def status(tracker)
raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker)
TrackerStatus.by_value(Cproton.pn_messenger_status(@impl, tracker.impl))
end
# Frees a Messenger from tracking the status associated
# with a given tracker. If you don't supply a tracker, all
# outgoing messages up to the most recent will be settled.
#
# ==== Options
#
# * tracker - the tracker
#
# ==== Examples
#
def settle(tracker)
raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker)
if tracker.nil? then
tracker = self.incoming_tracker
flag = Cproton::PN_CUMULATIVE
else
flag = 0
end
Cproton.pn_messenger_settle(@impl, tracker.impl, flag)
end
# Sets the incoming window.
#
# The Messenger will track the remote status of this many incoming
# deliveries after they have been accepted or rejected.
#
# Messages enter this window only when you take them into your application
# using get(). If your incoming window size is n, and you get n+1 messages
# without explicitly accepting or rejecting the oldest message, then the
# message that passes beyond the edge of the incoming window will be
# assigned the default disposition of its link.
#
# ==== Options
#
# * window - the window size
#
def incoming_window=(window)
raise TypeError.new("invalid window: #{window}") unless valid_window?(window)
Cproton.pn_messenger_set_incoming_window(@impl, window)
end
# Returns the incoming window.
#
def incoming_window
Cproton.pn_messenger_get_incoming_window(@impl)
end
# Sets the outgoing window.
#
# The Messenger will track the remote status of this many outgoing
# deliveries after calling send.
# A Message enters this window when you call the put() method with the
# message. If your outgoing window size is n, and you call put n+1
# times, status information will no longer be available for the
# first message.
#
# ==== Options
#
# * window - the window size
#
def outgoing_window=(window)
raise TypeError.new("invalid window: #{window}") unless valid_window?(window)
Cproton.pn_messenger_set_outgoing_window(@impl, window)
end
# Returns the outgoing window.
#
def outgoing_window
Cproton.pn_messenger_get_outgoing_window(@impl)
end
# Unregisters a selectable object.
def unregister_selectable(fileno) # :nodoc:
@selectables.delete(fileno)
end
private
def valid_tracker?(tracker)
!tracker.nil? && tracker.is_a?(Tracker)
end
def valid_window?(window)
!window.nil? && [Float, Fixnum].include?(window.class)
end
end
end