blob: 543c26cc7091ba949f49ed0a86b7a63d6c2d7cf9 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
require 'cqpid'
require 'qpid/errors'
module Qpid
module Messaging
# A Session represents a distinct conversation between end points.
class Session
def initialize(session) # :nodoc:
@session_impl = session
end
def session_impl # :nodoc:
@session_impl
end
# Returns the +Connection+ for the +Session+.
def connection
connection_impl = @session_impl.getConnection
Qpid::Messaging::Connection.new "", {}, connection_impl
end
# Creates a new endpoint for sending messages.
def create_sender(address)
_address = address
if address.class == Qpid::Messaging::Address
_address = address.address_impl
end
Qpid::Messaging::Sender.new(@session_impl.createSender(_address))
end
# Retrieves the +Sender+ with the specified name.
def sender(name)
result = nil
begin
sender_impl = @session_impl.getSender name
result = Sender.for_impl sender_impl
rescue
# treat any error as a key error
end
raise Qpid::Messaging::KeyError, "No such sender: #{name}" if result.nil?
result
end
# Retrieves the +Receiver+ with the specified name.
def receiver(name)
result = nil
begin
receiver_impl = @session_impl.getReceiver name
result = Receiver.for_impl receiver_impl
rescue
# treat any error as a key error
end
raise Qpid::Messaging::KeyError, "No such receiver: #{name}" if result.nil?
result
end
# Creates a new endpoint for receiving messages.
def create_receiver(address)
result = nil
if address.class == Qpid::Messaging::Address
address_impl = address.address_impl
result = Qpid::Messaging::Receiver.new(@session_impl.createReceiver(address_impl))
else
result = Qpid::Messaging::Receiver.new(@session_impl.createReceiver(address))
end
return result
end
# Closes the Session and all associated Senders and Receivers.
# All Sessions are closed when the associated Connection is closed.
def close; @session_impl.close; end
# Commits any pending transactions for a transactional session.
def commit; @session_impl.commit; end
# Rolls back any uncommitted transactions on a transactional session.
def rollback; @session_impl.rollback; end
# Acknowledges one or more outstanding messages that have been received
# on this session.
#
# If a message is submitted (:message => something_message) then only
# that message is acknowledged. Otherwise all messsages are acknowledged.
#
# If :sync => true then the call will block until the server completes
# processing the acknowledgements.
# If :sync => true then the call will block until processed by the server (def. false)
def acknowledge(args = {})
sync = args[:sync] || false
message = args[:message] if args[:message]
unless message.nil?
@session_impl.acknowledge message.message_impl, sync
else
@session_impl.acknowledge sync
end
end
# Rejects the specified message. A rejected message will not be redelivered.
#
# NOTE: A message cannot be rejected once it has been acknowledged.
def reject(message); @session_impl.reject message.message_impl; end
# Releases the message, which allows the broker to attempt to
# redeliver it.
#
# NOTE: A message connot be released once it has been acknowled.
def release(message); @session_impl.release message.message_impl; end
# Requests synchronization with the server.
#
# If :block => true then the call will block until the server acknowledges.
#
# If :block => false (default) then the call will complete and the server
# will send notification on completion.
def sync(args = {})
block = args[:block] || false
@session_impl.sync block
end
# Returns the total number of receivable messages, and messages already received,
# by Receivers associated with this session.
def receivable; @session_impl.getReceivable; end
# Returns the number of messages that have been acknowledged by this session
# whose acknowledgements have not been confirmed as processed by the server.
def unsettled_acks; @session_impl.getUnsettledAcks; end
# Fetches the receiver for the next message.
def next_receiver(timeout = Qpid::Messaging::Duration::FOREVER)
receiver_impl = @session_impl.nextReceiver(timeout.duration_impl)
Qpid::Messaging::Receiver.new receiver_impl
end
# Returns whether there are errors on this session.
def error?; @session_impl.hasError; end
def check_error; @session_impl.checkError; end
# Returns if the underlying session is valid.
def valid?; @session_impl.isValid; end
# Returns if the underlying session is null.
def null?; @session_impl.isNull; end
def swap session
@session_impl.swap session.session_impl
end
end
end
end