| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| require 'cqpid' |
| |
| require 'qpid/errors' |
| |
| module Qpid |
| |
| module Messaging |
| |
| # A Session represents a distinct conversation between end points. |
| class Session |
| |
| def initialize(connection, session) # :nodoc: |
| @connection = connection |
| @session_impl = session |
| @senders = Hash.new |
| @receivers = Hash.new |
| end |
| |
| def session_impl # :nodoc: |
| @session_impl |
| end |
| |
| # Returns the +Connection+ associated with this session. |
| def connection |
| @connection |
| end |
| |
| # Creates a new endpoint for sending messages. |
| # |
| # The +address+ can either be an instance +Address+ or else a |
| # string that describes an address endpoint. |
| # |
| # ==== Arguments |
| # |
| # * +address+ The end point address. |
| # |
| # ==== Examples |
| # |
| # sender = session.create_sender "my-queue;{create:always}" |
| # |
| def create_sender(address) |
| _address = address |
| |
| if address.class == Qpid::Messaging::Address |
| _address = address.address_impl |
| end |
| |
| sender_impl = @session_impl.createSender(_address) |
| sender_name = sender_impl.getName |
| |
| @senders[sender_name] = Qpid::Messaging::Sender.new(self, sender_impl) |
| |
| @senders[sender_name] |
| end |
| |
| # Retrieves the +Sender+ with the specified name. |
| # |
| # The +Sender+ must have been previously created using |
| # the +create_sender+ method. |
| # |
| # ==== Arguments |
| # |
| # * +name+ The +Sender+ name. |
| # |
| # ==== Examples |
| # |
| # sender = session.sender "my-queue" |
| # |
| def sender(name) |
| raise Qpid::Messaging::KeyError, "No such sender: #{name}" unless @senders.has_key? name |
| |
| @senders[name] |
| end |
| |
| # Creates a new endpoint for receiving messages. |
| # |
| # The +address+ can either be an instance +Address+ or else a |
| # string that describes an address endpoint. |
| # |
| # ==== Arguments |
| # |
| # * +address+ The end point address. |
| # |
| # ==== Examples |
| # |
| # receiver = session.create_receiver "my-queue" |
| # |
| def create_receiver(address) |
| result = nil |
| receiver_impl = nil |
| |
| if address.class == Qpid::Messaging::Address |
| address_impl = address.address_impl |
| receiver_impl = @session_impl.createReceiver address_impl |
| else |
| receiver_impl = @session_impl.createReceiver(address) |
| end |
| |
| receiver_name = receiver_impl.getName |
| |
| @receivers[receiver_name] = Qpid::Messaging::Receiver.new self, receiver_impl |
| |
| @receivers[receiver_name] |
| end |
| |
| # Retrieves the +Receiver+ with the specified name. |
| # |
| # The +Receiver+ must have been previously created using |
| # the +create_receiver+ method. |
| # |
| # ==== Arguments |
| # |
| # * +name+ The +Receiver+ name. |
| # |
| # ==== Examples |
| # |
| # receiver = session.receiver "my-queue" |
| # |
| def receiver(name) |
| raise Qpid::Messaging::KeyError, "No such receiver: #{name}" unless @receivers.has_key? name |
| |
| @receivers[name] |
| end |
| |
| # Closes the +Session+ and all associated +Sender+ and +Receiver+ instances. |
| # |
| # NOTE: All +Session+ instances for a +Connection+ are closed when the |
| # +Connection+ is closed. |
| def close; @session_impl.close; end |
| |
| # Commits any pending transactions for a transactional session. |
| def commit; @session_impl.commit; end |
| |
| # Rolls back any uncommitted transactions on a transactional session. |
| def rollback; @session_impl.rollback; end |
| |
| # Acknowledges one or more outstanding messages that have been received |
| # on this session. |
| # |
| # ==== Arguments |
| # |
| # * :message - if specified, then only the +Message+ specified is acknowledged |
| # * :sync - if true then the call will block until processed by the server (def. false) |
| # |
| # ==== Examples |
| # |
| # session.acknowledge # acknowledges all received messages |
| # session.acknowledge :message => message # acknowledge one message |
| # session.acknowledge :sync => true # blocks until the call completes |
| # |
| #-- |
| # TODO: Add an optional block to be used for blocking calls. |
| #++ |
| def acknowledge(args = {}) |
| sync = args[:sync] || false |
| message = args[:message] if args[:message] |
| |
| unless message.nil? |
| @session_impl.acknowledge message.message_impl, sync |
| else |
| @session_impl.acknowledge sync |
| end |
| end |
| |
| # Rejects the specified message. A rejected message will not be |
| # redelivered. |
| # |
| # NOTE: A message cannot be rejected once it has been acknowledged. |
| def reject(message); @session_impl.reject message.message_impl; end |
| |
| # Releases the message, which allows the broker to attempt to |
| # redeliver it. |
| # |
| # NOTE: A message connot be released once it has been acknowled. |
| def release(message); @session_impl.release message.message_impl; end |
| |
| # Requests synchronization with the server. |
| # |
| # ==== Arguments |
| # |
| # * :block - if true then the call blocks until the server acknowledges it (def. false) |
| # |
| #-- |
| # TODO: Add an optional block to be used for blocking calls. |
| #++ |
| def sync(args = {}) |
| block = args[:block] || false |
| @session_impl.sync block |
| end |
| |
| # Returns the total number of receivable messages, and messages already |
| # received, by +Receiver+ instances associated with this +Session+. |
| def receivable; @session_impl.getReceivable; end |
| |
| # Returns the number of messages that have been acknowledged by this session |
| # whose acknowledgements have not been confirmed as processed by the server. |
| def unsettled_acks; @session_impl.getUnsettledAcks; end |
| |
| # Fetches the +Receiver+ for the next message. |
| # |
| # ==== Arguments |
| # |
| # * timeout - time to wait for a +Receiver+ before timing out |
| # |
| # ==== Examples |
| # |
| # recv = session.next_receiver # wait forever for the next +Receiver+ |
| # # execute a block on the next receiver |
| # session.next_receiver do |recv| |
| # msg = recv.get |
| # puts "Received message: #{msg.content}" |
| # end |
| def next_receiver(timeout = Qpid::Messaging::Duration::FOREVER, &block) |
| receiver_impl = @session_impl.nextReceiver(timeout.duration_impl) |
| |
| unless receiver_impl.nil? |
| recv = Qpid::Messaging::Receiver.new self, receiver_impl |
| block.call recv unless block.nil? |
| end |
| |
| return recv |
| end |
| |
| # Returns true if there were exceptions on this session. |
| # |
| # ==== Examples |
| # |
| # puts "There were session errors." if @session.errors? |
| def errors?; @session_impl.hasError; end |
| |
| # If the +Session+ has been rendered invalid due to some exception, |
| # this method will result in that exception being raised. |
| # |
| # If none have occurred, then no exceptions are raised. |
| # |
| # ==== Examples |
| # |
| # if @session.errors? |
| # begin |
| # @session.errors |
| # rescue Exception => error |
| # puts "An error occurred: #{error}" |
| # end |
| # end |
| def errors; @session_impl.checkError; end |
| |
| end |
| |
| end |
| |
| end |
| |