| #-- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| #++ |
| |
| module Qpid |
| |
| module Messaging |
| |
| # A +Session+ represents a distinct conversation between end points. They are |
| # created from an active (i.e., not closed) Connection. |
| # |
| # A +Session+ is used to acknowledge individual or all messages that have |
| # passed through it |
| class Session |
| |
| def initialize(connection, session) # :nodoc: |
| @connection = connection |
| @session_impl = session |
| end |
| |
| def session_impl # :nodoc: |
| @session_impl |
| end |
| |
| # Returns the Connection associated with this session. |
| def connection |
| @connection |
| end |
| |
| # Creates a new endpoint for sending messages. |
| # |
| # The address can either be an instance Address or else an |
| # address string. |
| # |
| # ==== Arguments |
| # |
| # * +address+ - the end point address. |
| def create_sender(address) |
| _address = address |
| |
| if address.class == Qpid::Messaging::Address |
| _address = address.address_impl |
| end |
| |
| sender_impl = @session_impl.createSender(_address) |
| sender_name = sender_impl.getName |
| |
| Qpid::Messaging::Sender.new(self, sender_impl) |
| end |
| |
| # Retrieves the Sender with the specified name. |
| # |
| # Raises an exception if no such Sender exists. |
| # |
| # ==== Arguments |
| # |
| # * +name+ - the name of the Sender |
| def sender(name) |
| Qpid::Messaging::Sender.new self, @session_impl.getSender(name) |
| end |
| |
| # Creates a new endpoint for receiving messages. |
| # |
| # The +address+ can either be an instance Address or else an |
| # address string. |
| # |
| # ==== Arguments |
| # |
| # * +address+ - the end point address. |
| def create_receiver(address) |
| result = nil |
| receiver_impl = nil |
| |
| if address.class == Qpid::Messaging::Address |
| address_impl = address.address_impl |
| receiver_impl = @session_impl.createReceiver address_impl |
| else |
| receiver_impl = @session_impl.createReceiver(address) |
| end |
| |
| Qpid::Messaging::Receiver.new self, receiver_impl |
| end |
| |
| # Retrieves the +Receiver+ with the specified name, or nil if no such |
| # Receiver exists. |
| # |
| # ==== Arguments |
| # |
| # * +name+ - the name of the Receiver |
| def receiver(name) |
| Qpid::Messaging::Receiver.new self, @session_impl.getReceiver(name) |
| end |
| |
| # Closes the +Session+ and all associated +Sender+ and +Receiver+ instances. |
| # |
| # *NOTE:* All +Session+ instances for a Connection are closed when the |
| # Connection is closed. But closing a +Session+ does not affect the |
| # owning Connection. |
| def close; @session_impl.close; end |
| |
| # Commits any pending transactions for a transactional session. |
| def commit; @session_impl.commit; end |
| |
| # Rolls back any uncommitted transactions on a transactional session. |
| def rollback; @session_impl.rollback; end |
| |
| # Acknowledges one or more outstanding messages that have been received |
| # on this session. |
| # |
| # ==== Arguments |
| # |
| # * +options+ - the set of options |
| # |
| # ==== Options |
| # |
| # * :message - if specified, then only that Message is acknowledged |
| # * :sync - if true, the call will block until processed by the broker |
| # |
| # ==== Examples |
| # |
| # # acknowledge all received messages |
| # session.acknowledge |
| # |
| # # acknowledge a single message |
| # session.acknowledge :message => message |
| # |
| # # acknowledge all messages, wait until the call finishes |
| # session.acknowledge :sync => true |
| # |
| #-- |
| # TODO: Add an optional block to be used for blocking calls. |
| #++ |
| def acknowledge(options = {}) |
| sync = options[:sync] || false |
| message = options[:message] if options[:message] |
| |
| unless message.nil? |
| @session_impl.acknowledge message.message_impl, sync |
| else |
| @session_impl.acknowledge sync |
| end |
| end |
| |
| # Rejects the specified message. A rejected message will not be |
| # redelivered. |
| # |
| # NOTE: A message cannot be rejected once it has been acknowledged. |
| def reject(message); @session_impl.reject message.message_impl; end |
| |
| # Releases the message, which allows the broker to attempt to |
| # redeliver it. |
| # |
| # NOTE: A message connot be released once it has been acknowled. |
| def release(message); @session_impl.release message.message_impl; end |
| |
| # Requests synchronization with the broker. |
| # |
| # ==== Arguments |
| # |
| # * +options+ - the list of options |
| # |
| # ==== Options |
| # |
| # * +:block+ - if true, the call blocks until the broker acknowledges it |
| # |
| #-- |
| # TODO: Add an optional block to be used for blocking calls. |
| #++ |
| def sync(args = {}) |
| block = args[:block] || false |
| @session_impl.sync block |
| end |
| |
| # Returns the total number of receivable messages, and messages already |
| # received, by Receiver instances associated with this +Session+. |
| def receivable; @session_impl.getReceivable; end |
| |
| # Returns the number of messages that have been acknowledged by this |
| # +Session+ whose acknowledgements have not been confirmed as processed |
| # by the broker. |
| def unsettled_acks; @session_impl.getUnsettledAcks; end |
| |
| # Fetches the next Receiver with a message pending. Waits the specified |
| # number of milliseconds before timing out. |
| # |
| # For a Receiver to be returned, it must have a capacity > 0 and have |
| # Messages locally queued. |
| # |
| # If no Receiver is found within the time out period, then a MessageError |
| # is raised. |
| # |
| # ==== Arguments |
| # |
| # * +timeout+ - the duration |
| # |
| # ==== Examples |
| # |
| # loop do |
| # |
| # begin |
| # # wait a maximum of one minute for the next receiver to be ready |
| # recv = session.next_receiver Qpid::Messaging::Duration::MINUTE |
| # |
| # # get and dispatch the message |
| # msg = recv.get |
| # dispatch_message msg |
| # |
| # rescue |
| # puts "No receivers were returned" |
| # end |
| # |
| # end |
| def next_receiver(timeout = Qpid::Messaging::Duration::FOREVER, &block) |
| receiver_impl = @session_impl.nextReceiver(timeout.duration_impl) |
| |
| unless receiver_impl.nil? |
| recv = Qpid::Messaging::Receiver.new self, receiver_impl |
| block.call recv unless block.nil? |
| end |
| |
| return recv |
| end |
| |
| # Returns true if there were exceptions on this session. |
| def errors?; @session_impl.hasError; end |
| |
| # If the +Session+ has been rendered invalid due to some exception, |
| # this method will result in that exception being raised. |
| # |
| # If none have occurred, then no exceptions are raised. |
| # |
| # ==== Examples |
| # |
| # # show any errors that occurred during the Session |
| # if @session.errors? |
| # begin |
| # @session.errors |
| # rescue Exception => error |
| # puts "An error occurred: #{error}" |
| # end |
| # end |
| def errors; @session_impl.checkError; end |
| |
| end |
| |
| end |
| |
| end |
| |