blob: 7e6e11f6549d2a1a28fbcc3af5003e5b069816c0 [file] [log] [blame]
#--
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#++
module Qpid
module Messaging
# A +Session+ represents a distinct conversation between end points. They are
# created from an active (i.e., not closed) Connection.
#
# A +Session+ is used to acknowledge individual or all messages that have
# passed through it
class Session
def initialize(connection, session) # :nodoc:
@connection = connection
@session_impl = session
end
def session_impl # :nodoc:
@session_impl
end
# Returns the Connection associated with this session.
def connection
@connection
end
# Creates a new endpoint for sending messages.
#
# The address can either be an instance Address or else an
# address string.
#
# ==== Arguments
#
# * +address+ - the end point address.
def create_sender(address)
_address = address
if address.class == Qpid::Messaging::Address
_address = address.address_impl
end
sender_impl = @session_impl.createSender(_address)
sender_name = sender_impl.getName
Qpid::Messaging::Sender.new(self, sender_impl)
end
# Retrieves the Sender with the specified name.
#
# Raises an exception if no such Sender exists.
#
# ==== Arguments
#
# * +name+ - the name of the Sender
def sender(name)
Qpid::Messaging::Sender.new self, @session_impl.getSender(name)
end
# Creates a new endpoint for receiving messages.
#
# The +address+ can either be an instance Address or else an
# address string.
#
# ==== Arguments
#
# * +address+ - the end point address.
def create_receiver(address)
result = nil
receiver_impl = nil
if address.class == Qpid::Messaging::Address
address_impl = address.address_impl
receiver_impl = @session_impl.createReceiver address_impl
else
receiver_impl = @session_impl.createReceiver(address)
end
Qpid::Messaging::Receiver.new self, receiver_impl
end
# Retrieves the +Receiver+ with the specified name, or nil if no such
# Receiver exists.
#
# ==== Arguments
#
# * +name+ - the name of the Receiver
def receiver(name)
Qpid::Messaging::Receiver.new self, @session_impl.getReceiver(name)
end
# Closes the +Session+ and all associated +Sender+ and +Receiver+ instances.
#
# *NOTE:* All +Session+ instances for a Connection are closed when the
# Connection is closed. But closing a +Session+ does not affect the
# owning Connection.
def close; @session_impl.close; end
# Commits any pending transactions for a transactional session.
def commit; @session_impl.commit; end
# Rolls back any uncommitted transactions on a transactional session.
def rollback; @session_impl.rollback; end
# Acknowledges one or more outstanding messages that have been received
# on this session.
#
# ==== Arguments
#
# * +options+ - the set of options
#
# ==== Options
#
# * :message - if specified, then only that Message is acknowledged
# * :sync - if true, the call will block until processed by the broker
#
# ==== Examples
#
# # acknowledge all received messages
# session.acknowledge
#
# # acknowledge a single message
# session.acknowledge :message => message
#
# # acknowledge all messages, wait until the call finishes
# session.acknowledge :sync => true
#
#--
# TODO: Add an optional block to be used for blocking calls.
#++
def acknowledge(options = {})
sync = options[:sync] || false
message = options[:message] if options[:message]
unless message.nil?
@session_impl.acknowledge message.message_impl, sync
else
@session_impl.acknowledge sync
end
end
# Rejects the specified message. A rejected message will not be
# redelivered.
#
# NOTE: A message cannot be rejected once it has been acknowledged.
def reject(message); @session_impl.reject message.message_impl; end
# Releases the message, which allows the broker to attempt to
# redeliver it.
#
# NOTE: A message connot be released once it has been acknowled.
def release(message); @session_impl.release message.message_impl; end
# Requests synchronization with the broker.
#
# ==== Arguments
#
# * +options+ - the list of options
#
# ==== Options
#
# * +:block+ - if true, the call blocks until the broker acknowledges it
#
#--
# TODO: Add an optional block to be used for blocking calls.
#++
def sync(args = {})
block = args[:block] || false
@session_impl.sync block
end
# Returns the total number of receivable messages, and messages already
# received, by Receiver instances associated with this +Session+.
def receivable; @session_impl.getReceivable; end
# Returns the number of messages that have been acknowledged by this
# +Session+ whose acknowledgements have not been confirmed as processed
# by the broker.
def unsettled_acks; @session_impl.getUnsettledAcks; end
# Fetches the next Receiver with a message pending. Waits the specified
# number of milliseconds before timing out.
#
# For a Receiver to be returned, it must have a capacity > 0 and have
# Messages locally queued.
#
# If no Receiver is found within the time out period, then a MessageError
# is raised.
#
# ==== Arguments
#
# * +timeout+ - the duration
#
# ==== Examples
#
# loop do
#
# begin
# # wait a maximum of one minute for the next receiver to be ready
# recv = session.next_receiver Qpid::Messaging::Duration::MINUTE
#
# # get and dispatch the message
# msg = recv.get
# dispatch_message msg
#
# rescue
# puts "No receivers were returned"
# end
#
# end
def next_receiver(timeout = Qpid::Messaging::Duration::FOREVER, &block)
receiver_impl = @session_impl.nextReceiver(timeout.duration_impl)
unless receiver_impl.nil?
recv = Qpid::Messaging::Receiver.new self, receiver_impl
block.call recv unless block.nil?
end
return recv
end
# Returns true if there were exceptions on this session.
def errors?; @session_impl.hasError; end
# If the +Session+ has been rendered invalid due to some exception,
# this method will result in that exception being raised.
#
# If none have occurred, then no exceptions are raised.
#
# ==== Examples
#
# # show any errors that occurred during the Session
# if @session.errors?
# begin
# @session.errors
# rescue Exception => error
# puts "An error occurred: #{error}"
# end
# end
def errors; @session_impl.checkError; end
end
end
end