blob: 7602d17de6e9e2d7743edaf49b108dffcc0202d2 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
The proton.endpoints module
"""
from __future__ import absolute_import
import weakref
from cproton import PN_CONFIGURATION, PN_COORDINATOR, PN_DELIVERIES, PN_DIST_MODE_COPY, PN_DIST_MODE_MOVE, \
PN_DIST_MODE_UNSPECIFIED, PN_EOS, PN_EXPIRE_NEVER, PN_EXPIRE_WITH_CONNECTION, PN_EXPIRE_WITH_LINK, \
PN_EXPIRE_WITH_SESSION, PN_LOCAL_ACTIVE, PN_LOCAL_CLOSED, PN_LOCAL_UNINIT, PN_NONDURABLE, PN_RCV_FIRST, \
PN_RCV_SECOND, PN_REMOTE_ACTIVE, PN_REMOTE_CLOSED, PN_REMOTE_UNINIT, PN_SND_MIXED, PN_SND_SETTLED, PN_SND_UNSETTLED, \
PN_SOURCE, PN_TARGET, PN_UNSPECIFIED, pn_connection, pn_connection_attachments, pn_connection_close, \
pn_connection_collect, pn_connection_condition, pn_connection_desired_capabilities, pn_connection_error, \
pn_connection_get_authorization, pn_connection_get_container, pn_connection_get_hostname, pn_connection_get_user, \
pn_connection_offered_capabilities, \
pn_connection_open, pn_connection_properties, pn_connection_release, pn_connection_remote_condition, \
pn_connection_remote_container, pn_connection_remote_desired_capabilities, pn_connection_remote_hostname, \
pn_connection_remote_offered_capabilities, pn_connection_remote_properties, \
pn_connection_set_authorization, pn_connection_set_container, \
pn_connection_set_hostname, pn_connection_set_password, pn_connection_set_user, pn_connection_state, \
pn_connection_transport, pn_delivery, pn_error_code, pn_error_text, pn_link_advance, pn_link_attachments, \
pn_link_available, pn_link_close, pn_link_condition, pn_link_credit, pn_link_current, pn_link_detach, pn_link_drain, \
pn_link_drained, pn_link_draining, pn_link_error, pn_link_flow, pn_link_free, pn_link_get_drain, pn_link_head, \
pn_link_is_receiver, pn_link_is_sender, pn_link_max_message_size, pn_link_name, pn_link_next, pn_link_offered, \
pn_link_open, pn_link_queued, pn_link_rcv_settle_mode, pn_link_recv, pn_link_remote_condition, \
pn_link_remote_max_message_size, pn_link_remote_rcv_settle_mode, pn_link_remote_snd_settle_mode, \
pn_link_remote_source, pn_link_remote_target, pn_link_send, pn_link_session, pn_link_set_drain, \
pn_link_set_max_message_size, pn_link_set_rcv_settle_mode, pn_link_set_snd_settle_mode, pn_link_snd_settle_mode, \
pn_link_source, pn_link_state, pn_link_target, pn_link_unsettled, pn_receiver, pn_sender, pn_session, \
pn_session_attachments, pn_session_close, pn_session_condition, pn_session_connection, pn_session_free, \
pn_session_get_incoming_capacity, pn_session_get_outgoing_window, pn_session_head, pn_session_incoming_bytes, \
pn_session_next, pn_session_open, pn_session_outgoing_bytes, pn_session_remote_condition, \
pn_session_set_incoming_capacity, pn_session_set_outgoing_window, pn_session_state, pn_terminus_capabilities, \
pn_terminus_copy, pn_terminus_filter, pn_terminus_get_address, pn_terminus_get_distribution_mode, \
pn_terminus_get_durability, pn_terminus_get_expiry_policy, pn_terminus_get_timeout, pn_terminus_get_type, \
pn_terminus_is_dynamic, pn_terminus_outcomes, pn_terminus_properties, pn_terminus_set_address, \
pn_terminus_set_distribution_mode, pn_terminus_set_durability, pn_terminus_set_dynamic, \
pn_terminus_set_expiry_policy, pn_terminus_set_timeout, pn_terminus_set_type, pn_work_head, \
pn_link_properties, pn_link_remote_properties
from ._common import unicode2utf8, utf82unicode
from ._condition import cond2obj, obj2cond
from ._data import Data, dat2obj, obj2dat, PropertyDict, SymbolList
from ._delivery import Delivery
from ._exceptions import ConnectionException, EXCEPTIONS, LinkException, SessionException
from ._transport import Transport
from ._wrapper import Wrapper
class Endpoint(object):
"""
Abstract class from which :class:`Connection`, :class:`Session`
and :class:`Link` are derived, and which defines the state
of these classes.
The :class:`Endpoint` state is an integral value with flags that
encode both the local and remote state of an AMQP Endpoint
(:class:`Connection`, :class:`Link`, or :class:`Session`).
The individual bits may be accessed using :const:`LOCAL_UNINIT`,
:const:`LOCAL_ACTIVE`, :const:`LOCAL_CLOSED`, and
:const:`REMOTE_UNINIT`, :const:`REMOTE_ACTIVE`, :const:`REMOTE_CLOSED`.
Every AMQP endpoint (:class:`Connection`, :class:`Link`, or
:class:`Session`) starts out in an uninitialized state and then
proceeds linearly to an active and then closed state. This
lifecycle occurs at both endpoints involved, and so the state
model for an endpoint includes not only the known local state,
but also the last known state of the remote endpoint.
"""
LOCAL_UNINIT = PN_LOCAL_UNINIT
""" The local endpoint state is uninitialized. """
REMOTE_UNINIT = PN_REMOTE_UNINIT
""" The local endpoint state is active. """
LOCAL_ACTIVE = PN_LOCAL_ACTIVE
""" The local endpoint state is closed. """
REMOTE_ACTIVE = PN_REMOTE_ACTIVE
""" The remote endpoint state is uninitialized. """
LOCAL_CLOSED = PN_LOCAL_CLOSED
""" The remote endpoint state is active. """
REMOTE_CLOSED = PN_REMOTE_CLOSED
""" The remote endpoint state is closed. """
def _init(self):
self.condition = None
self._handler = None
def _update_cond(self):
obj2cond(self.condition, self._get_cond_impl())
@property
def remote_condition(self):
"""
The remote condition associated with the connection endpoint.
See :class:`Condition` for more information.
:type: :class:`Condition`
"""
return cond2obj(self._get_remote_cond_impl())
# the following must be provided by subclasses
def _get_cond_impl(self):
assert False, "Subclass must override this!"
def _get_remote_cond_impl(self):
assert False, "Subclass must override this!"
def _get_handler(self):
return self._handler
def _set_handler(self, handler):
# TODO Hack This is here for some very odd (IMO) backwards compat behaviour
from ._events import Handler
if handler is None:
self._handler = None
elif issubclass(type(handler), Handler):
self._handler = handler
else:
self._handler = Handler()
self._handler.add(handler)
handler = property(_get_handler, _set_handler, doc="""
Handler for events.
:getter: Get the event handler, or return ``None`` if no handler has been set.
:setter: Set the event handler.
:type: :class:`Handler` or ``None``
""")
class Connection(Wrapper, Endpoint):
"""
A representation of an AMQP connection.
"""
@staticmethod
def wrap(impl):
if impl is None:
return None
else:
return Connection(impl)
def __init__(self, impl=pn_connection):
Wrapper.__init__(self, impl, pn_connection_attachments)
def _init(self):
Endpoint._init(self)
self.offered_capabilities = None
self.desired_capabilities = None
self.properties = None
self.url = None
self._acceptor = None
def _get_attachments(self):
return pn_connection_attachments(self._impl)
@property
def connection(self):
"""
Get this connection.
:type: :class:`Connection`
"""
return self
@property
def transport(self):
"""
The transport bound to this connection. If the connection
is unbound, then this operation will return ``None``.
:type: :class:`Transport` or ``None``
"""
return Transport.wrap(pn_connection_transport(self._impl))
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, ConnectionException)
raise exc("[%s]: %s" % (err, pn_connection_error(self._impl)))
else:
return err
def _get_cond_impl(self):
return pn_connection_condition(self._impl)
def _get_remote_cond_impl(self):
return pn_connection_remote_condition(self._impl)
# TODO: Blacklisted API call
def collect(self, collector):
if collector is None:
pn_connection_collect(self._impl, None)
else:
pn_connection_collect(self._impl, collector._impl)
self._collector = weakref.ref(collector)
def _get_container(self):
return utf82unicode(pn_connection_get_container(self._impl))
def _set_container(self, name):
pn_connection_set_container(self._impl, unicode2utf8(name))
container = property(_get_container, _set_container, doc="""
The container name for this connection object.
:type: ``str``
""")
def _get_hostname(self):
return utf82unicode(pn_connection_get_hostname(self._impl))
def _set_hostname(self, name):
pn_connection_set_hostname(self._impl, unicode2utf8(name))
hostname = property(_get_hostname, _set_hostname, doc="""
Set the name of the host (either fully qualified or relative) to which this
connection is connecting to. This information may be used by the remote
peer to determine the correct back-end service to connect the client to.
This value will be sent in the Open performative, and will be used by SSL
and SASL layers to identify the peer.
:type: ``str``
""")
def _get_user(self):
return utf82unicode(pn_connection_get_user(self._impl))
def _set_user(self, name):
pn_connection_set_user(self._impl, unicode2utf8(name))
user = property(_get_user, _set_user, doc="""
The authentication username for a client connection.
It is necessary to set the username and password before binding
the connection to a transport and it isn't allowed to change
after the binding.
If not set then no authentication will be negotiated unless the
client sasl layer is explicitly created (this would be for something
like Kerberos where the credentials are implicit in the environment,
or to explicitly use the ``ANONYMOUS`` SASL mechanism)
:type: ``str``
""")
def _get_authorization(self):
return utf82unicode(pn_connection_get_authorization(self._impl))
def _set_authorization(self, name):
pn_connection_set_authorization(self._impl, unicode2utf8(name))
authorization = property(_get_authorization, _set_authorization, doc="""
The authorization username for a client connection.
It is necessary to set the authorization before binding
the connection to a transport and it isn't allowed to change
after the binding.
If not set then implicitly the requested authorization is the same as the
authentication user.
:type: ``str``
""")
def _get_password(self):
return None
def _set_password(self, name):
pn_connection_set_password(self._impl, unicode2utf8(name))
password = property(_get_password, _set_password, doc="""
Set the authentication password for a client connection.
It is necessary to set the username and password before binding the connection
to a transport and it isn't allowed to change after the binding.
.. note:: Getting the password always returns ``None``.
:type: ``str``
""")
@property
def remote_container(self):
"""
The container identifier specified by the remote peer for this connection.
This will return ``None`` until the :const:'REMOTE_ACTIVE` state is
reached. See :class:`Endpoint` for more details on endpoint state.
Any (non ``None``) name returned by this operation will be valid until
the connection object is unbound from a transport or freed,
whichever happens sooner.
:type: ``str``
"""
return pn_connection_remote_container(self._impl)
@property
def remote_hostname(self):
"""
The hostname specified by the remote peer for this connection.
This will return ``None`` until the :const:`REMOTE_ACTIVE` state is
reached. See :class:`Endpoint` for more details on endpoint state.
Any (non ``None``) name returned by this operation will be valid until
the connection object is unbound from a transport or freed,
whichever happens sooner.
:type: ``str``
"""
return pn_connection_remote_hostname(self._impl)
@property
def remote_offered_capabilities(self):
"""
The capabilities offered by the remote peer for this connection.
This operation will return a :class:`Data` object that
is valid until the connection object is freed. This :class:`Data`
object will be empty until the remote connection is opened as
indicated by the :const:`REMOTE_ACTIVE` flag.
:type: :class:`Data`
"""
return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
@property
def remote_desired_capabilities(self):
"""
The capabilities desired by the remote peer for this connection.
This operation will return a :class:`Data` object that
is valid until the connection object is freed. This :class:`Data`
object will be empty until the remote connection is opened as
indicated by the :const:`REMOTE_ACTIVE` flag.
:type: :class:`Data`
"""
return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
@property
def remote_properties(self):
"""
The properties specified by the remote peer for this connection.
This operation will return a :class:`Data` object that
is valid until the connection object is freed. This :class:`Data`
object will be empty until the remote connection is opened as
indicated by the :const:`REMOTE_ACTIVE` flag.
:type: :class:`Data`
"""
return dat2obj(pn_connection_remote_properties(self._impl))
@property
def connected_address(self):
"""
The address for this connection.
:type: ``str``
"""
return self.url and str(self.url)
def open(self):
"""
Opens the connection.
In more detail, this moves the local state of the connection to
the ``ACTIVE`` state and triggers an open frame to be sent to the
peer. A connection is fully active once both peers have opened it.
"""
obj2dat(self.offered_capabilities,
pn_connection_offered_capabilities(self._impl))
obj2dat(self.desired_capabilities,
pn_connection_desired_capabilities(self._impl))
obj2dat(self.properties, pn_connection_properties(self._impl))
pn_connection_open(self._impl)
def close(self):
"""
Closes the connection.
In more detail, this moves the local state of the connection to
the ``CLOSED`` state and triggers a close frame to be sent to the
peer. A connection is fully closed once both peers have closed it.
"""
self._update_cond()
pn_connection_close(self._impl)
if hasattr(self, '_session_policy'):
# break circular ref
del self._session_policy
t = self.transport
if t and t._connect_selectable:
# close() requested before TCP connect handshake completes on socket.
# Dismantle connection setup logic.
s = t._connect_selectable
t._connect_selectable = None
t.close_head()
t.close_tail()
s._transport = None
t._selectable = None
s.terminate()
s.update()
@property
def state(self):
"""
The state of the connection as a bit field. The state has a local
and a remote component. Each of these can be in one of three
states: ``UNINIT``, ``ACTIVE`` or ``CLOSED``. These can be tested by masking
against :const:`LOCAL_UNINIT`, :const:`LOCAL_ACTIVE`, :const:`LOCAL_CLOSED`, :const:`REMOTE_UNINIT`,
:const:`REMOTE_ACTIVE` and :const:`REMOTE_CLOSED`.
"""
return pn_connection_state(self._impl)
def session(self):
"""
Returns a new session on this connection.
:return: New session
:rtype: :class:`Session`
:raises: :class:`SessionException`
"""
ssn = pn_session(self._impl)
if ssn is None:
raise (SessionException("Session allocation failed."))
else:
return Session(ssn)
def session_head(self, mask):
"""
Retrieve the first session from a given connection that matches the
specified state mask.
Examines the state of each session owned by the connection, and
returns the first session that matches the given state mask. If
state contains both local and remote flags, then an exact match
against those flags is performed. If state contains only local or
only remote flags, then a match occurs if any of the local or
remote flags are set respectively.
:param mask: State mask to match
:return: The first session owned by the connection that matches the
mask, else ``None`` if no sessions matches.
:rtype: :class:`Session` or ``None``
"""
return Session.wrap(pn_session_head(self._impl, mask))
def link_head(self, mask):
"""
Retrieve the first link that matches the given state mask.
Examines the state of each link owned by the connection and returns
the first link that matches the given state mask. If state contains
both local and remote flags, then an exact match against those
flags is performed. If state contains only local or only remote
flags, then a match occurs if any of the local or remote flags are
set respectively. ``state==0`` matches all links.
:param mask: State mask to match
:type mask: ``int``
:return: The first link owned by the connection that matches the
mask, else ``None`` if no link matches.
:rtype: :class:`Link` or ``None``
"""
return Link.wrap(pn_link_head(self._impl, mask))
@property
def work_head(self):
"""Deprecated: use on_message(), on_accepted(), on_rejected(),
on_released(), and on_settled() instead.
Extracts the first delivery on the connection that has pending
operations.
Retrieves the first delivery on the Connection that has pending
operations. A readable delivery indicates message data is waiting
to be read. A writable delivery indicates that message data may be
sent. An updated delivery indicates that the delivery's disposition
has changed. A delivery will never be both readable and writable,
but it may be both readable and updated or both writable and
updated.
:return: The first delivery object that needs to be serviced, or ``None`` if none.
:rtype: :class:`Delivery` or ``None``
"""
return Delivery.wrap(pn_work_head(self._impl))
@property
def error(self):
"""
Additional error information associated with the connection.
Whenever a connection operation fails (i.e. returns an error code),
additional error details can be obtained using this property. The
returned value is the error code defined by Proton in ``pn_error_t``
(see ``error.h``).
:type: ``int``
"""
return pn_error_code(pn_connection_error(self._impl))
def free(self):
"""
Releases this connection object.
When a connection object is released, all :class:`Session` and
:class:`Link` objects associated with this connection are also
released and all :class:`Delivery` objects are settled.
"""
pn_connection_release(self._impl)
def _get_offered_capabilities(self):
return self.offered_capabilities_list
def _set_offered_capabilities(self, offered_capability_list):
if isinstance(offered_capability_list, list):
self.offered_capabilities_list = SymbolList(offered_capability_list, raise_on_error=False)
else:
self.offered_capabilities_list = offered_capability_list
offered_capabilities = property(_get_offered_capabilities, _set_offered_capabilities, doc="""
Offered capabilities as a list of symbols. The AMQP 1.0 specification
restricts this list to symbol elements only. It is possible to use
the special ``list`` subclass :class:`SymbolList` as it will by
default enforce this restriction on construction. In addition, if a
string type is used, it will be silently converted into the required
symbol.
:type: ``list`` containing :class:`symbol`.
""")
def _get_desired_capabilities(self):
return self.desired_capabilities_list
def _set_desired_capabilities(self, desired_capability_list):
if isinstance(desired_capability_list, list):
self.desired_capabilities_list = SymbolList(desired_capability_list, raise_on_error=False)
else:
self.desired_capabilities_list = desired_capability_list
desired_capabilities = property(_get_desired_capabilities, _set_desired_capabilities, doc="""
Desired capabilities as a list of symbols. The AMQP 1.0 specification
restricts this list to symbol elements only. It is possible to use
the special ``list`` subclass :class:`SymbolList` which will by
default enforce this restriction on construction. In addition, if string
types are used, this class will be silently convert them into symbols.
:type: ``list`` containing :class:`symbol`.
""")
def _get_properties(self):
return self.properties_dict
def _set_properties(self, properties_dict):
if isinstance(properties_dict, dict):
self.properties_dict = PropertyDict(properties_dict, raise_on_error=False)
else:
self.properties_dict = properties_dict
properties = property(_get_properties, _set_properties, doc="""
Connection properties as a dictionary of key/values. The AMQP 1.0
specification restricts this dictionary to have keys that are only
:class:`symbol` types. It is possible to use the special ``dict``
subclass :class:`PropertyDict` which will by default enforce this
restrictions on construction. In addition, if strings type are used,
this will silently convert them into symbols.
:type: ``dict`` containing :class:`symbol`` keys.
""")
class Session(Wrapper, Endpoint):
"""A container of links"""
@staticmethod
def wrap(impl):
if impl is None:
return None
else:
return Session(impl)
def __init__(self, impl):
Wrapper.__init__(self, impl, pn_session_attachments)
def _get_attachments(self):
return pn_session_attachments(self._impl)
def _get_cond_impl(self):
return pn_session_condition(self._impl)
def _get_remote_cond_impl(self):
return pn_session_remote_condition(self._impl)
def _get_incoming_capacity(self):
return pn_session_get_incoming_capacity(self._impl)
def _set_incoming_capacity(self, capacity):
pn_session_set_incoming_capacity(self._impl, capacity)
incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity, doc="""
The incoming capacity of this session in bytes. The incoming capacity
of a session determines how much incoming message data the session
can buffer.
.. note:: If set, this value must be greater than or equal to the negotiated
frame size of the transport. The window is computed as a whole number of
frames when dividing remaining capacity at a given time by the connection
max frame size. As such, capacity and max frame size should be chosen so
as to ensure the frame window isn't unduly small and limiting performance.
:type: ``int`` (bytes)
""")
def _get_outgoing_window(self):
return pn_session_get_outgoing_window(self._impl)
def _set_outgoing_window(self, window):
pn_session_set_outgoing_window(self._impl, window)
outgoing_window = property(_get_outgoing_window, _set_outgoing_window, doc="""
The outgoing window for this session.
:type: ``int``
""")
@property
def outgoing_bytes(self):
"""
The number of outgoing bytes currently buffered.
:type: ``int`` (bytes)
"""
return pn_session_outgoing_bytes(self._impl)
@property
def incoming_bytes(self):
"""
The number of incoming bytes currently buffered.
:type: ``int`` (bytes)
"""
return pn_session_incoming_bytes(self._impl)
def open(self):
"""
Open a session. Once this operation has completed, the
:const:`LOCAL_ACTIVE` state flag will be set.
"""
pn_session_open(self._impl)
def close(self):
"""
Close a session.
Once this operation has completed, the :const:`LOCAL_CLOSED` state flag
will be set. This may be called without calling
:meth:`open`, in this case it is equivalent to calling
:meth:`open` followed by :meth:`close`.
"""
self._update_cond()
pn_session_close(self._impl)
def next(self, mask):
"""
Retrieve the next session for this connection that matches the
specified state mask.
When used with :meth:`Connection.session_head`, application can
access all sessions on the connection that match the given state.
See :meth:`Connection.session_head` for description of match
behavior.
:param mask: Mask to match.
:return: The next session owned by this connection that matches the
mask, else ``None`` if no sessions match.
:rtype: :class:`Session` or ``None``
"""
return Session.wrap(pn_session_next(self._impl, mask))
@property
def state(self):
"""
The endpoint state flags for this session. See :class:`Endpoint` for
details of the flags.
:type: ``int``
"""
return pn_session_state(self._impl)
@property
def connection(self):
"""
The parent connection for this session.
:type: :class:`Connection`
"""
return Connection.wrap(pn_session_connection(self._impl))
@property
def transport(self):
"""
The transport bound to the parent connection for this session.
:type: :class:`Transport`
"""
return self.connection.transport
def sender(self, name):
"""
Create a new :class:`Sender` on this session.
:param name: Name of sender
:type name: ``str``
:return: New Sender object
:rtype: :class:`Sender`
"""
return Sender(pn_sender(self._impl, unicode2utf8(name)))
def receiver(self, name):
"""
Create a new :class:`Receiver` on this session.
:param name: Name of receiver
:type name: ``str``
:return: New Receiver object
:rtype: :class:`Receiver`
"""
return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
def free(self):
"""
Free this session. When a session is freed it will no
longer be retained by the connection once any internal
references to the session are no longer needed. Freeing
a session will free all links on that session and settle
any deliveries on those links.
"""
pn_session_free(self._impl)
class Link(Wrapper, Endpoint):
"""
A representation of an AMQP link (a unidirectional channel for
transferring messages), of which there are two concrete
implementations, :class:`Sender` and :class:`Receiver`.
"""
SND_UNSETTLED = PN_SND_UNSETTLED
"""The sender will send all deliveries initially unsettled."""
SND_SETTLED = PN_SND_SETTLED
"""The sender will send all deliveries settled to the receiver."""
SND_MIXED = PN_SND_MIXED
"""The sender may send a mixture of settled and unsettled deliveries."""
RCV_FIRST = PN_RCV_FIRST
"""The receiver will settle deliveries regardless of what the sender does."""
RCV_SECOND = PN_RCV_SECOND
"""The receiver will only settle deliveries after the sender settles."""
@staticmethod
def wrap(impl):
if impl is None:
return None
if pn_link_is_sender(impl):
return Sender(impl)
else:
return Receiver(impl)
def __init__(self, impl):
Wrapper.__init__(self, impl, pn_link_attachments)
def _init(self):
Endpoint._init(self)
self.properties = None
def _get_attachments(self):
return pn_link_attachments(self._impl)
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, LinkException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
else:
return err
def _get_cond_impl(self):
return pn_link_condition(self._impl)
def _get_remote_cond_impl(self):
return pn_link_remote_condition(self._impl)
def open(self):
"""
Opens the link.
In more detail, this moves the local state of the link to the
:const:`LOCAL_ACTIVE` state and triggers an attach frame to be
sent to the peer. A link is fully active once both peers have
attached it.
"""
obj2dat(self.properties, pn_link_properties(self._impl))
pn_link_open(self._impl)
def close(self):
"""
Closes the link.
In more detail, this moves the local state of the link to the
:const:`LOCAL_CLOSED` state and triggers an detach frame (with
the closed flag set) to be sent to the peer. A link is fully
closed once both peers have detached it.
This may be called without calling :meth:`open`, in this case it
is equivalent to calling :meth:`open` followed by :meth:`close`.
"""
self._update_cond()
pn_link_close(self._impl)
@property
def state(self):
"""
The state of the link as a bit field. The state has a local
and a remote component. Each of these can be in one of three
states: ``UNINIT``, ``ACTIVE`` or ``CLOSED``. These can be
tested by masking against :const:`LOCAL_UNINIT`,
:const:`LOCAL_ACTIVE`, :const:`LOCAL_CLOSED`,
:const:`REMOTE_UNINIT`, :const:`REMOTE_ACTIVE` and
:const:`REMOTE_CLOSED`.
:type: ``int``
"""
return pn_link_state(self._impl)
@property
def source(self):
"""
The source of the link as described by the local peer. The
returned object is valid until the link is freed.
:type: :class:`Terminus`
"""
return Terminus(pn_link_source(self._impl))
@property
def target(self):
"""
The target of the link as described by the local peer. The
returned object is valid until the link is freed.
:type: :class:`Terminus`
"""
return Terminus(pn_link_target(self._impl))
@property
def remote_source(self):
"""
The source of the link as described by the remote peer. The
returned object is valid until the link is freed. The remote
:class:`Terminus` object will be empty until the link is
remotely opened as indicated by the :const:`REMOTE_ACTIVE`
flag.
:type: :class:`Terminus`
"""
return Terminus(pn_link_remote_source(self._impl))
@property
def remote_target(self):
"""
The target of the link as described by the remote peer. The
returned object is valid until the link is freed. The remote
:class:`Terminus` object will be empty until the link is
remotely opened as indicated by the :const:`REMOTE_ACTIVE`
flag.
:type: :class:`Terminus`
"""
return Terminus(pn_link_remote_target(self._impl))
@property
def session(self):
"""
The parent session for this link.
:type: :class:`Session`
"""
return Session.wrap(pn_link_session(self._impl))
@property
def connection(self):
"""
The connection on which this link was attached.
:type: :class:`Connection`
"""
return self.session.connection
@property
def transport(self):
"""
The transport bound to the connection on which this link was attached.
:type: :class:`Transport`
"""
return self.session.transport
def delivery(self, tag):
"""
Create a delivery. Every delivery object within a
link must be supplied with a unique tag. Links
maintain a sequence of delivery object in the order that
they are created.
:param tag: Delivery tag unique for this link.
:type tag: ``bytes``
:rtype: :class:`Delivery`
"""
return Delivery(pn_delivery(self._impl, tag))
@property
def current(self):
"""
The current delivery for this link.
Each link maintains a sequence of deliveries in the order
they were created, along with a pointer to the *current*
delivery. All send/recv operations on a link take place
on the *current* delivery. If a link has no current delivery,
the current delivery is automatically initialized to the
next delivery created on the link. Once initialized, the
current delivery remains the same until it is changed through
use of :meth:`advance` or until it is settled via
:meth:`Delivery.settle`.
:rtype: :class:`Delivery`
"""
return Delivery.wrap(pn_link_current(self._impl))
def advance(self):
"""
Advance the current delivery of this link to the next delivery.
For sending links this operation is used to finish sending message
data for the current outgoing delivery and move on to the next
outgoing delivery (if any).
For receiving links, this operation is used to finish accessing
message data from the current incoming delivery and move on to the
next incoming delivery (if any).
Each link maintains a sequence of deliveries in the order they were
created, along with a pointer to the *current* delivery. The
:meth:`advance` operation will modify the *current* delivery on the
link to point to the next delivery in the sequence. If there is no
next delivery in the sequence, the current delivery will be set to
``NULL``.
:return: ``True`` if the value of the current delivery changed (even
if it was set to ``NULL``, ``False`` otherwise.
:rtype: ``bool``
"""
return pn_link_advance(self._impl)
@property
def unsettled(self):
"""
The number of unsettled deliveries for this link.
:type: ``int``
"""
return pn_link_unsettled(self._impl)
@property
def credit(self):
"""
The amount of outstanding credit on this link.
Links use a credit based flow control scheme. Every receiver
maintains a credit balance that corresponds to the number of
deliveries that the receiver can accept at any given moment. As
more capacity becomes available at the receiver (see
:meth:`Receiver.flow`), it adds credit to this balance and
communicates the new balance to the sender. Whenever a delivery
is sent/received, the credit balance maintained by the link is
decremented by one. Once the credit balance at the sender reaches
zero, the sender must pause sending until more credit is obtained
from the receiver.
.. note:: A sending link may still be used to send deliveries even
if :attr:`credit` reaches zero, however those deliveries will end
up being buffered by the link until enough credit is obtained from
the receiver to send them over the wire. In this case the balance
reported by :attr:`credit` will go negative.
:type: ``int``
"""
return pn_link_credit(self._impl)
@property
def available(self):
"""
The available deliveries hint for this link.
The available count for a link provides a hint as to the number of
deliveries that might be able to be sent if sufficient credit were
issued by the receiving link endpoint. See :meth:`Sender.offered` for
more details.
:type: ``int``
"""
return pn_link_available(self._impl)
@property
def queued(self):
"""
The number of queued deliveries for a link.
Links may queue deliveries for a number of reasons, for example
there may be insufficient credit to send them to the receiver (see
:meth:`credit`), or they simply may not have yet had a chance to
be written to the wire. This operation will return the number of
queued deliveries on a link.
:type: ``int``
"""
return pn_link_queued(self._impl)
def next(self, mask):
"""
Retrieve the next link that matches the given state mask.
When used with :meth:`Connection.link_head`, the application
can access all links on the connection that match the given
state. See :meth:`Connection.link_head` for a description of
match behavior.
:param mask: State mask to match
:type mask: ``int``
:return: The next link that matches the given state mask, or
``None`` if no link matches.
:rtype: :class:`Link`
"""
return Link.wrap(pn_link_next(self._impl, mask))
@property
def name(self):
"""
The name of the link.
:type: ``str``
"""
return utf82unicode(pn_link_name(self._impl))
@property
def is_sender(self):
"""
``True`` if this link is a sender, ``False`` otherwise.
:type: ``bool``
"""
return pn_link_is_sender(self._impl)
@property
def is_receiver(self):
"""
``True`` if this link is a receiver, ``False`` otherwise.
:type: ``bool``
"""
return pn_link_is_receiver(self._impl)
@property
def remote_snd_settle_mode(self):
"""
The remote sender settle mode for this link. One of
:const:`SND_UNSETTLED`, :const:`SND_SETTLED` or
:const:`SND_MIXED`.
:type: ``int``
"""
return pn_link_remote_snd_settle_mode(self._impl)
@property
def remote_rcv_settle_mode(self):
"""
The remote receiver settle mode for this link. One of
:const:`RCV_FIRST` or :const:`RCV_SECOND`.
:type: ``int``
"""
return pn_link_remote_rcv_settle_mode(self._impl)
def _get_snd_settle_mode(self):
return pn_link_snd_settle_mode(self._impl)
def _set_snd_settle_mode(self, mode):
pn_link_set_snd_settle_mode(self._impl, mode)
snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode, doc="""
The local sender settle mode for this link. One of
:const:`SND_UNSETTLED`, :const:`SND_SETTLED` or
:const:`SND_MIXED`.
:type: ``int``
""")
def _get_rcv_settle_mode(self):
return pn_link_rcv_settle_mode(self._impl)
def _set_rcv_settle_mode(self, mode):
pn_link_set_rcv_settle_mode(self._impl, mode)
rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode, doc="""
The local receiver settle mode for this link. One of
:const:`RCV_FIRST` or :const:`RCV_SECOND`.
:type: ``int``
""")
def _get_drain(self):
return pn_link_get_drain(self._impl)
def _set_drain(self, b):
pn_link_set_drain(self._impl, bool(b))
drain_mode = property(_get_drain, _set_drain, doc="""
The drain mode on this link.
If a link is in drain mode (``True``), then the sending
endpoint of a link must immediately use up all available
credit on the link. If this is not possible, the excess
credit must be returned by invoking :meth:`drained`. Only
the receiving endpoint can set the drain mode.
When ``False``, this link is not in drain mode.
:type: ``bool``
""")
def drained(self):
"""
Drain excess credit for this link.
When a link is in drain mode (see :attr:`drain_mode`), the
sender must use all excess credit immediately, and release
any excess credit back to the receiver if there are no
deliveries available to send.
When invoked on a sending link that is in drain mode, this
operation will release all excess credit back to the receiver
and return the number of credits released back to the sender.
If the link is not in drain mode, this operation is a noop.
When invoked on a receiving link, this operation will return
and reset the number of credits the sender has released back
to the receiver.
:return: The number of credits drained.
:rtype: ``int``
"""
return pn_link_drained(self._impl)
@property
def remote_max_message_size(self):
"""
Get the remote view of the maximum message size for this link.
.. warning:: **Unsettled API**
A zero value means the size is unlimited.
:type: ``long``
"""
return pn_link_remote_max_message_size(self._impl)
def _get_max_message_size(self):
return pn_link_max_message_size(self._impl)
def _set_max_message_size(self, mode):
pn_link_set_max_message_size(self._impl, mode)
max_message_size = property(_get_max_message_size, _set_max_message_size, doc="""
The maximum message size for this link. A zero value means the
size is unlimited.
.. warning:: **Unsettled API**
:type: ``long``
""")
def detach(self):
"""
Detach this link.
"""
return pn_link_detach(self._impl)
def free(self):
"""
Free this link object. When a link object is freed,
all :class:`Delivery` objects associated with the session (**<-- CHECK THIS**)
are also freed. Freeing a link will settle any unsettled
deliveries on the link.
"""
pn_link_free(self._impl)
@property
def remote_properties(self):
"""
The properties specified by the remote peer for this link.
This operation will return a :class:`Data` object that
is valid until the link object is freed. This :class:`Data`
object will be empty until the remote link is opened as
indicated by the :const:`REMOTE_ACTIVE` flag.
:type: :class:`Data`
"""
return dat2obj(pn_link_remote_properties(self._impl))
def _get_properties(self):
return self._properties_dict
def _set_properties(self, properties_dict):
if isinstance(properties_dict, dict):
self._properties_dict = PropertyDict(properties_dict, raise_on_error=False)
else:
self._properties_dict = properties_dict
properties = property(_get_properties, _set_properties, doc="""
Link properties as a dictionary of key/values. The AMQP 1.0
specification restricts this dictionary to have keys that are only
:class:`symbol` types. It is possible to use the special ``dict``
subclass :class:`PropertyDict` which will by default enforce this
restrictions on construction. In addition, if strings type are used,
this will silently convert them into symbols.
:type: ``dict`` containing :class:`symbol`` keys.
""")
class Sender(Link):
"""
A link over which messages are sent.
"""
def offered(self, n):
"""
Signal the availability of deliveries for this Sender.
:param n: Credit the number of deliveries potentially
available for transfer.
:type n: ``int``
"""
pn_link_offered(self._impl, n)
def stream(self, data):
"""
Send specified data as part of the current delivery.
:param data: Data to send
:type data: ``binary``
"""
return self._check(pn_link_send(self._impl, data))
def send(self, obj, tag=None):
"""
A convenience method to send objects as message content.
Send specified object over this sender; the object is expected to
have a ``send()`` method on it that takes the sender and an optional
tag as arguments.
Where the object is a :class:`Message`, this will send the message over
this link, creating a new delivery for the purpose.
"""
if hasattr(obj, 'send'):
return obj.send(self, tag=tag)
else:
# treat object as bytes
return self.stream(obj)
def delivery_tag(self):
"""
**HELP!** I have no idea what is going on with this!
:meth:`Link.next` returns a Link, not a Delivery.
"""
if not hasattr(self, 'tag_generator'):
def simple_tags():
count = 1
while True:
yield str(count)
count += 1
self.tag_generator = simple_tags()
return next(self.tag_generator)
class Receiver(Link):
"""
A link over which messages are received.
"""
def flow(self, n):
"""
Increases the credit issued to the remote sender by the specified number of messages.
:param n: The credit to be issued to the remote sender.
:type n: ``int``
"""
pn_link_flow(self._impl, n)
def recv(self, limit):
"""
Receive message data for the current delivery on this receiver.
.. note:: The link API can be used to stream large messages across
the network, so just because there is no data to read does not
imply the message is complete. To ensure the entirety of the
message data has been read, either invoke :meth:`recv` until
``None`` is returned.
:param limit: the max data size to receive of this message
:type limit: ``int``
:return: The received message data, or ``None`` if the message
has been completely received.
:rtype: ``binary`` or ``None``
:raise: * :class:`Timeout` if timed out
* :class:`Interrupt` if interrupted
* :class:`LinkException` for all other exceptions
"""
n, binary = pn_link_recv(self._impl, limit)
if n == PN_EOS:
return None
else:
self._check(n)
return binary
def drain(self, n):
"""
Grant credit for incoming deliveries on this receiver, and
set drain mode to true.
Use :attr:`drain_mode` to set the drain mode explicitly.
:param n: The amount by which to increment the link credit
:type n: ``int``
"""
pn_link_drain(self._impl, n)
def draining(self):
"""
Check if a link is currently draining. A link is defined
to be draining when drain mode is set to ``True``, and the
sender still has excess credit.
:return: ``True`` if the link is currently draining, ``False`` otherwise.
:rtype: ``bool``
"""
return pn_link_draining(self._impl)
class Terminus(object):
"""
A source or target for messages.
"""
UNSPECIFIED = PN_UNSPECIFIED
"""A nonexistent terminus, may used as a source or target."""
SOURCE = PN_SOURCE
"""A source of messages."""
TARGET = PN_TARGET
"""A target for messages."""
COORDINATOR = PN_COORDINATOR
"""A special target identifying a transaction coordinator."""
NONDURABLE = PN_NONDURABLE
"""A non durable terminus."""
CONFIGURATION = PN_CONFIGURATION
"""A terminus with durably held configuration, but not delivery state."""
DELIVERIES = PN_DELIVERIES
"""A terminus with both durably held configuration and durably held delivery state."""
DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
"""The behavior is defined by the node."""
DIST_MODE_COPY = PN_DIST_MODE_COPY
"""The receiver gets all messages."""
DIST_MODE_MOVE = PN_DIST_MODE_MOVE
"""The receiver competes for messages."""
EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK
"""The terminus is orphaned when the parent link is closed."""
EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION
"""The terminus is orphaned when the parent session is closed"""
EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION
"""The terminus is orphaned when the parent connection is closed"""
EXPIRE_NEVER = PN_EXPIRE_NEVER
"""The terminus is never considered orphaned"""
def __init__(self, impl):
self._impl = impl
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, LinkException)
raise exc("[%s]" % err)
else:
return err
def _get_type(self):
return pn_terminus_get_type(self._impl)
def _set_type(self, type):
self._check(pn_terminus_set_type(self._impl, type))
type = property(_get_type, _set_type, doc="""
The terminus type, must be one of :const:`UNSPECIFIED`,
:const:`SOURCE`, :const:`TARGET` or :const:`COORDINATOR`
:type: ``int``
""")
def _get_address(self):
"""
The address that identifies the source or target node
"""
return utf82unicode(pn_terminus_get_address(self._impl))
def _set_address(self, address):
self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
address = property(_get_address, _set_address, doc="""
The terminus address.
:type: ``str``
""")
def _get_durability(self):
return pn_terminus_get_durability(self._impl)
def _set_durability(self, seconds):
self._check(pn_terminus_set_durability(self._impl, seconds))
durability = property(_get_durability, _set_durability, doc="""
The terminus durability mode, must be one of :const:`NONDURABLE`,
:const:`CONFIGURATION` or :const:`DELIVERIES`.
:type: ``int``
""")
def _get_expiry_policy(self):
return pn_terminus_get_expiry_policy(self._impl)
def _set_expiry_policy(self, seconds):
self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
expiry_policy = property(_get_expiry_policy, _set_expiry_policy, doc="""
The terminus expiry policy, must be one of :const:`EXPIRE_WITH_LINK`,
:const:`EXPIRE_WITH_SESSION`, :const:`EXPIRE_WITH_CONNECTION` or
:const:`EXPIRE_NEVER`.
:type: ``int``
""")
def _get_timeout(self):
return pn_terminus_get_timeout(self._impl)
def _set_timeout(self, seconds):
self._check(pn_terminus_set_timeout(self._impl, seconds))
timeout = property(_get_timeout, _set_timeout, doc="""
The terminus timeout in seconds.
:type: ``int``
""")
def _is_dynamic(self):
"""Indicates whether the source or target node was dynamically
created"""
return pn_terminus_is_dynamic(self._impl)
def _set_dynamic(self, dynamic):
self._check(pn_terminus_set_dynamic(self._impl, dynamic))
dynamic = property(_is_dynamic, _set_dynamic, doc="""
The dynamic flag for this terminus object. This indicates if this
terminus was dynamically created.
:type: ``bool``
""")
def _get_distribution_mode(self):
return pn_terminus_get_distribution_mode(self._impl)
def _set_distribution_mode(self, mode):
self._check(pn_terminus_set_distribution_mode(self._impl, mode))
distribution_mode = property(_get_distribution_mode, _set_distribution_mode, doc="""
The terminus distribution mode, must be one of :const:`DIST_MODE_UNSPECIFIED`,
:const:`DIST_MODE_COPY` or :const:`DIST_MODE_MOVE`.
:type: ``int``
""")
@property
def properties(self):
"""
Properties of a dynamic source or target.
:type: :class:`Data` containing a map with :class:`symbol` keys.
"""
return Data(pn_terminus_properties(self._impl))
@property
def capabilities(self):
"""
Capabilities of the source or target.
:type: :class:`Data` containing an array of :class:`symbol`.
"""
return Data(pn_terminus_capabilities(self._impl))
@property
def outcomes(self):
"""
Outcomes of the source or target.
:type: :class:`Data` containing an array of :class:`symbol`.
"""
return Data(pn_terminus_outcomes(self._impl))
@property
def filter(self):
"""
A filter on a source allows the set of messages transfered over
the link to be restricted. The symbol-keyed map represents a'
filter set.
:type: :class:`Data` containing a map with :class:`symbol` keys.
"""
return Data(pn_terminus_filter(self._impl))
def copy(self, src):
"""
Copy another terminus object.
:param src: The terminus to be copied from
:type src: :class:`Terminus`
:raises: :class:`LinkException` if there is an error
"""
self._check(pn_terminus_copy(self._impl, src._impl))