blob: 76bb7d3ce082289b92fdb7c01801a0e778329f44 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
The proton module defines a suite of APIs that implement the AMQP 1.0
protocol.
The proton APIs consist of the following classes:
- L{Message} -- A class for creating and/or accessing AMQP message content.
- L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
data.
"""
from __future__ import absolute_import
from cproton import *
from .wrapper import Wrapper
from proton import _compat
import weakref, socket, sys, threading
try:
import uuid
def generate_uuid():
return uuid.uuid4()
except ImportError:
"""
No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases.
"""
import struct
class uuid:
class UUID:
def __init__(self, hex=None, bytes=None):
if [hex, bytes].count(None) != 1:
raise TypeError("need one of hex or bytes")
if bytes is not None:
self.bytes = bytes
elif hex is not None:
fields=hex.split("-")
fields[4:5] = [fields[4][:4], fields[4][4:]]
self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
def __cmp__(self, other):
if isinstance(other, uuid.UUID):
return cmp(self.bytes, other.bytes)
else:
return -1
def __str__(self):
return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
def __repr__(self):
return "UUID(%r)" % str(self)
def __hash__(self):
return self.bytes.__hash__()
import os, random, time
rand = random.Random()
rand.seed((os.getpid(), time.time(), socket.gethostname()))
def random_uuid():
data = [rand.randint(0, 255) for i in xrange(16)]
# From RFC4122, the version bits are set to 0100
data[6] &= 0x0F
data[6] |= 0x40
# From RFC4122, the top two bits of byte 8 get set to 01
data[8] &= 0x3F
data[8] |= 0x80
return "".join(map(chr, data))
def uuid4():
return uuid.UUID(bytes=random_uuid())
def generate_uuid():
return uuid4()
#
# Hacks to provide Python2 <---> Python3 compatibility
#
try:
bytes()
except NameError:
bytes = str
try:
long()
except NameError:
long = int
try:
unicode()
except NameError:
unicode = str
VERSION_MAJOR = PN_VERSION_MAJOR
VERSION_MINOR = PN_VERSION_MINOR
VERSION_POINT = PN_VERSION_POINT
VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT)
API_LANGUAGE = "C"
IMPLEMENTATION_LANGUAGE = "C"
class Constant(object):
def __init__(self, name):
self.name = name
def __repr__(self):
return self.name
class ProtonException(Exception):
"""
The root of the proton exception hierarchy. All proton exception
classes derive from this exception.
"""
pass
class Timeout(ProtonException):
"""
A timeout exception indicates that a blocking operation has timed
out.
"""
pass
class Interrupt(ProtonException):
"""
An interrupt exception indicaes that a blocking operation was interrupted.
"""
pass
class MessageException(ProtonException):
"""
The MessageException class is the root of the message exception
hierarhcy. All exceptions generated by the Message class derive from
this exception.
"""
pass
EXCEPTIONS = {
PN_TIMEOUT: Timeout,
PN_INTR: Interrupt
}
AUTOMATIC = Constant("AUTOMATIC")
MANUAL = Constant("MANUAL")
class Message(object):
"""The L{Message} class is a mutable holder of message content.
@ivar instructions: delivery instructions for the message
@type instructions: dict
@ivar annotations: infrastructure defined message annotations
@type annotations: dict
@ivar properties: application defined message properties
@type properties: dict
@ivar body: message body
@type body: bytes | unicode | dict | list | int | long | float | UUID
"""
DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
def __init__(self, body=None, **kwargs):
"""
@param kwargs: Message property name/value pairs to initialise the Message
"""
self._msg = pn_message()
self._id = Data(pn_message_id(self._msg))
self._correlation_id = Data(pn_message_correlation_id(self._msg))
self.instructions = None
self.annotations = None
self.properties = None
self.body = body
for k,v in _compat.iteritems(kwargs):
getattr(self, k) # Raise exception if it's not a valid attribute.
setattr(self, k, v)
def __del__(self):
if hasattr(self, "_msg"):
pn_message_free(self._msg)
del self._msg
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, MessageException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
else:
return err
def _pre_encode(self):
inst = Data(pn_message_instructions(self._msg))
ann = Data(pn_message_annotations(self._msg))
props = Data(pn_message_properties(self._msg))
body = Data(pn_message_body(self._msg))
inst.clear()
if self.instructions is not None:
inst.put_object(self.instructions)
ann.clear()
if self.annotations is not None:
ann.put_object(self.annotations)
props.clear()
if self.properties is not None:
props.put_object(self.properties)
body.clear()
if self.body is not None:
body.put_object(self.body)
def _post_decode(self):
inst = Data(pn_message_instructions(self._msg))
ann = Data(pn_message_annotations(self._msg))
props = Data(pn_message_properties(self._msg))
body = Data(pn_message_body(self._msg))
if inst.next():
self.instructions = inst.get_object()
else:
self.instructions = None
if ann.next():
self.annotations = ann.get_object()
else:
self.annotations = None
if props.next():
self.properties = props.get_object()
else:
self.properties = None
if body.next():
self.body = body.get_object()
else:
self.body = None
def clear(self):
"""
Clears the contents of the L{Message}. All fields will be reset to
their default values.
"""
pn_message_clear(self._msg)
self.instructions = None
self.annotations = None
self.properties = None
self.body = None
def _is_inferred(self):
return pn_message_is_inferred(self._msg)
def _set_inferred(self, value):
self._check(pn_message_set_inferred(self._msg, bool(value)))
inferred = property(_is_inferred, _set_inferred, doc="""
The inferred flag for a message indicates how the message content
is encoded into AMQP sections. If inferred is true then binary and
list values in the body of the message will be encoded as AMQP DATA
and AMQP SEQUENCE sections, respectively. If inferred is false,
then all values in the body of the message will be encoded as AMQP
VALUE sections regardless of their type.
""")
def _is_durable(self):
return pn_message_is_durable(self._msg)
def _set_durable(self, value):
self._check(pn_message_set_durable(self._msg, bool(value)))
durable = property(_is_durable, _set_durable,
doc="""
The durable property indicates that the message should be held durably
by any intermediaries taking responsibility for the message.
""")
def _get_priority(self):
return pn_message_get_priority(self._msg)
def _set_priority(self, value):
self._check(pn_message_set_priority(self._msg, value))
priority = property(_get_priority, _set_priority,
doc="""
The priority of the message.
""")
def _get_ttl(self):
return millis2secs(pn_message_get_ttl(self._msg))
def _set_ttl(self, value):
self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
ttl = property(_get_ttl, _set_ttl,
doc="""
The time to live of the message measured in seconds. Expired messages
may be dropped.
""")
def _is_first_acquirer(self):
return pn_message_is_first_acquirer(self._msg)
def _set_first_acquirer(self, value):
self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
doc="""
True iff the recipient is the first to acquire the message.
""")
def _get_delivery_count(self):
return pn_message_get_delivery_count(self._msg)
def _set_delivery_count(self, value):
self._check(pn_message_set_delivery_count(self._msg, value))
delivery_count = property(_get_delivery_count, _set_delivery_count,
doc="""
The number of delivery attempts made for this message.
""")
def _get_id(self):
return self._id.get_object()
def _set_id(self, value):
if type(value) in _compat.INT_TYPES:
value = ulong(value)
self._id.rewind()
self._id.put_object(value)
id = property(_get_id, _set_id,
doc="""
The id of the message.
""")
def _get_user_id(self):
return pn_message_get_user_id(self._msg)
def _set_user_id(self, value):
self._check(pn_message_set_user_id(self._msg, value))
user_id = property(_get_user_id, _set_user_id,
doc="""
The user id of the message creator.
""")
def _get_address(self):
return utf82unicode(pn_message_get_address(self._msg))
def _set_address(self, value):
self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
address = property(_get_address, _set_address,
doc="""
The address of the message.
""")
def _get_subject(self):
return utf82unicode(pn_message_get_subject(self._msg))
def _set_subject(self, value):
self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
subject = property(_get_subject, _set_subject,
doc="""
The subject of the message.
""")
def _get_reply_to(self):
return utf82unicode(pn_message_get_reply_to(self._msg))
def _set_reply_to(self, value):
self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
reply_to = property(_get_reply_to, _set_reply_to,
doc="""
The reply-to address for the message.
""")
def _get_correlation_id(self):
return self._correlation_id.get_object()
def _set_correlation_id(self, value):
if type(value) in _compat.INT_TYPES:
value = ulong(value)
self._correlation_id.rewind()
self._correlation_id.put_object(value)
correlation_id = property(_get_correlation_id, _set_correlation_id,
doc="""
The correlation-id for the message.
""")
def _get_content_type(self):
return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
def _set_content_type(self, value):
self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
content_type = property(_get_content_type, _set_content_type,
doc="""
The content-type of the message.
""")
def _get_content_encoding(self):
return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
def _set_content_encoding(self, value):
self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
content_encoding = property(_get_content_encoding, _set_content_encoding,
doc="""
The content-encoding of the message.
""")
def _get_expiry_time(self):
return millis2secs(pn_message_get_expiry_time(self._msg))
def _set_expiry_time(self, value):
self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
expiry_time = property(_get_expiry_time, _set_expiry_time,
doc="""
The expiry time of the message.
""")
def _get_creation_time(self):
return millis2secs(pn_message_get_creation_time(self._msg))
def _set_creation_time(self, value):
self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
creation_time = property(_get_creation_time, _set_creation_time,
doc="""
The creation time of the message.
""")
def _get_group_id(self):
return utf82unicode(pn_message_get_group_id(self._msg))
def _set_group_id(self, value):
self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
group_id = property(_get_group_id, _set_group_id,
doc="""
The group id of the message.
""")
def _get_group_sequence(self):
return pn_message_get_group_sequence(self._msg)
def _set_group_sequence(self, value):
self._check(pn_message_set_group_sequence(self._msg, value))
group_sequence = property(_get_group_sequence, _set_group_sequence,
doc="""
The sequence of the message within its group.
""")
def _get_reply_to_group_id(self):
return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
def _set_reply_to_group_id(self, value):
self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
doc="""
The group-id for any replies.
""")
def encode(self):
self._pre_encode()
sz = 16
while True:
err, data = pn_message_encode(self._msg, sz)
if err == PN_OVERFLOW:
sz *= 2
continue
else:
self._check(err)
return data
def decode(self, data):
self._check(pn_message_decode(self._msg, data))
self._post_decode()
def send(self, sender, tag=None):
dlv = sender.delivery(tag or sender.delivery_tag())
encoded = self.encode()
sender.stream(encoded)
sender.advance()
if sender.snd_settle_mode == Link.SND_SETTLED:
dlv.settle()
return dlv
def recv(self, link):
"""
Receives and decodes the message content for the current delivery
from the link. Upon success it will return the current delivery
for the link. If there is no current delivery, or if the current
delivery is incomplete, or if the link is not a receiver, it will
return None.
@type link: Link
@param link: the link to receive a message from
@return the delivery associated with the decoded message (or None)
"""
if link.is_sender: return None
dlv = link.current
if not dlv or dlv.partial: return None
dlv.encoded = link.recv(dlv.pending)
link.advance()
# the sender has already forgotten about the delivery, so we might
# as well too
if link.remote_snd_settle_mode == Link.SND_SETTLED:
dlv.settle()
self.decode(dlv.encoded)
return dlv
def __repr2__(self):
props = []
for attr in ("inferred", "address", "reply_to", "durable", "ttl",
"priority", "first_acquirer", "delivery_count", "id",
"correlation_id", "user_id", "group_id", "group_sequence",
"reply_to_group_id", "instructions", "annotations",
"properties", "body"):
value = getattr(self, attr)
if value: props.append("%s=%r" % (attr, value))
return "Message(%s)" % ", ".join(props)
def __repr__(self):
tmp = pn_string(None)
err = pn_inspect(self._msg, tmp)
result = pn_string_get(tmp)
pn_free(tmp)
self._check(err)
return result
class Subscription(object):
def __init__(self, impl):
self._impl = impl
@property
def address(self):
return pn_subscription_address(self._impl)
_DEFAULT = object()
class Selectable(Wrapper):
@staticmethod
def wrap(impl):
if impl is None:
return None
else:
return Selectable(impl)
def __init__(self, impl):
Wrapper.__init__(self, impl, pn_selectable_attachments)
def _init(self):
pass
def fileno(self, fd = _DEFAULT):
if fd is _DEFAULT:
return pn_selectable_get_fd(self._impl)
elif fd is None:
pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
else:
pn_selectable_set_fd(self._impl, fd)
def _is_reading(self):
return pn_selectable_is_reading(self._impl)
def _set_reading(self, val):
pn_selectable_set_reading(self._impl, bool(val))
reading = property(_is_reading, _set_reading)
def _is_writing(self):
return pn_selectable_is_writing(self._impl)
def _set_writing(self, val):
pn_selectable_set_writing(self._impl, bool(val))
writing = property(_is_writing, _set_writing)
def _get_deadline(self):
tstamp = pn_selectable_get_deadline(self._impl)
if tstamp:
return millis2secs(tstamp)
else:
return None
def _set_deadline(self, deadline):
pn_selectable_set_deadline(self._impl, secs2millis(deadline))
deadline = property(_get_deadline, _set_deadline)
def readable(self):
pn_selectable_readable(self._impl)
def writable(self):
pn_selectable_writable(self._impl)
def expired(self):
pn_selectable_expired(self._impl)
def _is_registered(self):
return pn_selectable_is_registered(self._impl)
def _set_registered(self, registered):
pn_selectable_set_registered(self._impl, registered)
registered = property(_is_registered, _set_registered,
doc="""
The registered property may be get/set by an I/O polling system to
indicate whether the fd has been registered or not.
""")
@property
def is_terminal(self):
return pn_selectable_is_terminal(self._impl)
def terminate(self):
pn_selectable_terminate(self._impl)
def release(self):
pn_selectable_release(self._impl)
class DataException(ProtonException):
"""
The DataException class is the root of the Data exception hierarchy.
All exceptions raised by the Data class extend this exception.
"""
pass
class UnmappedType:
def __init__(self, msg):
self.msg = msg
def __repr__(self):
return "UnmappedType(%s)" % self.msg
class ulong(long):
def __repr__(self):
return "ulong(%s)" % long.__repr__(self)
class timestamp(long):
def __repr__(self):
return "timestamp(%s)" % long.__repr__(self)
class symbol(unicode):
def __repr__(self):
return "symbol(%s)" % unicode.__repr__(self)
class char(unicode):
def __repr__(self):
return "char(%s)" % unicode.__repr__(self)
class byte(int):
def __repr__(self):
return "byte(%s)" % int.__repr__(self)
class short(int):
def __repr__(self):
return "short(%s)" % int.__repr__(self)
class int32(int):
def __repr__(self):
return "int32(%s)" % int.__repr__(self)
class ubyte(int):
def __repr__(self):
return "ubyte(%s)" % int.__repr__(self)
class ushort(int):
def __repr__(self):
return "ushort(%s)" % int.__repr__(self)
class uint(long):
def __repr__(self):
return "uint(%s)" % long.__repr__(self)
class float32(float):
def __repr__(self):
return "float32(%s)" % float.__repr__(self)
class decimal32(int):
def __repr__(self):
return "decimal32(%s)" % int.__repr__(self)
class decimal64(long):
def __repr__(self):
return "decimal64(%s)" % long.__repr__(self)
class decimal128(bytes):
def __repr__(self):
return "decimal128(%s)" % bytes.__repr__(self)
class Described(object):
def __init__(self, descriptor, value):
self.descriptor = descriptor
self.value = value
def __repr__(self):
return "Described(%r, %r)" % (self.descriptor, self.value)
def __eq__(self, o):
if isinstance(o, Described):
return self.descriptor == o.descriptor and self.value == o.value
else:
return False
UNDESCRIBED = Constant("UNDESCRIBED")
class Array(object):
def __init__(self, descriptor, type, *elements):
self.descriptor = descriptor
self.type = type
self.elements = elements
def __iter__(self):
return iter(self.elements)
def __repr__(self):
if self.elements:
els = ", %s" % (", ".join(map(repr, self.elements)))
else:
els = ""
return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
def __eq__(self, o):
if isinstance(o, Array):
return self.descriptor == o.descriptor and \
self.type == o.type and self.elements == o.elements
else:
return False
class Data:
"""
The L{Data} class provides an interface for decoding, extracting,
creating, and encoding arbitrary AMQP data. A L{Data} object
contains a tree of AMQP values. Leaf nodes in this tree correspond
to scalars in the AMQP type system such as L{ints<INT>} or
L{strings<STRING>}. Non-leaf nodes in this tree correspond to
compound values in the AMQP type system such as L{lists<LIST>},
L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
The root node of the tree is the L{Data} object itself and can have
an arbitrary number of children.
A L{Data} object maintains the notion of the current sibling node
and a current parent node. Siblings are ordered within their parent.
Values are accessed and/or added by using the L{next}, L{prev},
L{enter}, and L{exit} methods to navigate to the desired location in
the tree and using the supplied variety of put_*/get_* methods to
access or add a value of the desired type.
The put_* methods will always add a value I{after} the current node
in the tree. If the current node has a next sibling the put_* method
will overwrite the value on this node. If there is no current node
or the current node has no next sibling then one will be added. The
put_* methods always set the added/modified node to the current
node. The get_* methods read the value of the current node and do
not change which node is current.
The following types of scalar values are supported:
- L{NULL}
- L{BOOL}
- L{UBYTE}
- L{USHORT}
- L{SHORT}
- L{UINT}
- L{INT}
- L{ULONG}
- L{LONG}
- L{FLOAT}
- L{DOUBLE}
- L{BINARY}
- L{STRING}
- L{SYMBOL}
The following types of compound values are supported:
- L{DESCRIBED}
- L{ARRAY}
- L{LIST}
- L{MAP}
"""
NULL = PN_NULL; "A null value."
BOOL = PN_BOOL; "A boolean value."
UBYTE = PN_UBYTE; "An unsigned byte value."
BYTE = PN_BYTE; "A signed byte value."
USHORT = PN_USHORT; "An unsigned short value."
SHORT = PN_SHORT; "A short value."
UINT = PN_UINT; "An unsigned int value."
INT = PN_INT; "A signed int value."
CHAR = PN_CHAR; "A character value."
ULONG = PN_ULONG; "An unsigned long value."
LONG = PN_LONG; "A signed long value."
TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
FLOAT = PN_FLOAT; "A float value."
DOUBLE = PN_DOUBLE; "A double value."
DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
UUID = PN_UUID; "A UUID value."
BINARY = PN_BINARY; "A binary string."
STRING = PN_STRING; "A unicode string."
SYMBOL = PN_SYMBOL; "A symbolic string."
DESCRIBED = PN_DESCRIBED; "A described value."
ARRAY = PN_ARRAY; "An array value."
LIST = PN_LIST; "A list value."
MAP = PN_MAP; "A map value."
type_names = {
NULL: "null",
BOOL: "bool",
BYTE: "byte",
UBYTE: "ubyte",
SHORT: "short",
USHORT: "ushort",
INT: "int",
UINT: "uint",
CHAR: "char",
LONG: "long",
ULONG: "ulong",
TIMESTAMP: "timestamp",
FLOAT: "float",
DOUBLE: "double",
DECIMAL32: "decimal32",
DECIMAL64: "decimal64",
DECIMAL128: "decimal128",
UUID: "uuid",
BINARY: "binary",
STRING: "string",
SYMBOL: "symbol",
DESCRIBED: "described",
ARRAY: "array",
LIST: "list",
MAP: "map"
}
@classmethod
def type_name(type): return Data.type_names[type]
def __init__(self, capacity=16):
if type(capacity) in _compat.INT_TYPES:
self._data = pn_data(capacity)
self._free = True
else:
self._data = capacity
self._free = False
def __del__(self):
if self._free and hasattr(self, "_data"):
pn_data_free(self._data)
del self._data
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, DataException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
else:
return err
def clear(self):
"""
Clears the data object.
"""
pn_data_clear(self._data)
def rewind(self):
"""
Clears current node and sets the parent to the root node. Clearing the
current node sets it _before_ the first node, calling next() will advance to
the first node.
"""
assert self._data is not None
pn_data_rewind(self._data)
def next(self):
"""
Advances the current node to its next sibling and returns its
type. If there is no next sibling the current node remains
unchanged and None is returned.
"""
found = pn_data_next(self._data)
if found:
return self.type()
else:
return None
def prev(self):
"""
Advances the current node to its previous sibling and returns its
type. If there is no previous sibling the current node remains
unchanged and None is returned.
"""
found = pn_data_prev(self._data)
if found:
return self.type()
else:
return None
def enter(self):
"""
Sets the parent node to the current node and clears the current node.
Clearing the current node sets it _before_ the first child,
call next() advances to the first child.
"""
return pn_data_enter(self._data)
def exit(self):
"""
Sets the current node to the parent node and the parent node to
its own parent.
"""
return pn_data_exit(self._data)
def lookup(self, name):
return pn_data_lookup(self._data, name)
def narrow(self):
pn_data_narrow(self._data)
def widen(self):
pn_data_widen(self._data)
def type(self):
"""
Returns the type of the current node.
"""
dtype = pn_data_type(self._data)
if dtype == -1:
return None
else:
return dtype
def encoded_size(self):
"""
Returns the size in bytes needed to encode the data in AMQP format.
"""
return pn_data_encoded_size(self._data)
def encode(self):
"""
Returns a representation of the data encoded in AMQP format.
"""
size = 1024
while True:
cd, enc = pn_data_encode(self._data, size)
if cd == PN_OVERFLOW:
size *= 2
elif cd >= 0:
return enc
else:
self._check(cd)
def decode(self, encoded):
"""
Decodes the first value from supplied AMQP data and returns the
number of bytes consumed.
@type encoded: binary
@param encoded: AMQP encoded binary data
"""
return self._check(pn_data_decode(self._data, encoded))
def put_list(self):
"""
Puts a list value. Elements may be filled by entering the list
node and putting element values.
>>> data = Data()
>>> data.put_list()
>>> data.enter()
>>> data.put_int(1)
>>> data.put_int(2)
>>> data.put_int(3)
>>> data.exit()
"""
self._check(pn_data_put_list(self._data))
def put_map(self):
"""
Puts a map value. Elements may be filled by entering the map node
and putting alternating key value pairs.
>>> data = Data()
>>> data.put_map()
>>> data.enter()
>>> data.put_string("key")
>>> data.put_string("value")
>>> data.exit()
"""
self._check(pn_data_put_map(self._data))
def put_array(self, described, element_type):
"""
Puts an array value. Elements may be filled by entering the array
node and putting the element values. The values must all be of the
specified array element type. If an array is described then the
first child value of the array is the descriptor and may be of any
type.
>>> data = Data()
>>>
>>> data.put_array(False, Data.INT)
>>> data.enter()
>>> data.put_int(1)
>>> data.put_int(2)
>>> data.put_int(3)
>>> data.exit()
>>>
>>> data.put_array(True, Data.DOUBLE)
>>> data.enter()
>>> data.put_symbol("array-descriptor")
>>> data.put_double(1.1)
>>> data.put_double(1.2)
>>> data.put_double(1.3)
>>> data.exit()
@type described: bool
@param described: specifies whether the array is described
@type element_type: int
@param element_type: the type of the array elements
"""
self._check(pn_data_put_array(self._data, described, element_type))
def put_described(self):
"""
Puts a described value. A described node has two children, the
descriptor and the value. These are specified by entering the node
and putting the desired values.
>>> data = Data()
>>> data.put_described()
>>> data.enter()
>>> data.put_symbol("value-descriptor")
>>> data.put_string("the value")
>>> data.exit()
"""
self._check(pn_data_put_described(self._data))
def put_null(self):
"""
Puts a null value.
"""
self._check(pn_data_put_null(self._data))
def put_bool(self, b):
"""
Puts a boolean value.
@param b: a boolean value
"""
self._check(pn_data_put_bool(self._data, b))
def put_ubyte(self, ub):
"""
Puts an unsigned byte value.
@param ub: an integral value
"""
self._check(pn_data_put_ubyte(self._data, ub))
def put_byte(self, b):
"""
Puts a signed byte value.
@param b: an integral value
"""
self._check(pn_data_put_byte(self._data, b))
def put_ushort(self, us):
"""
Puts an unsigned short value.
@param us: an integral value.
"""
self._check(pn_data_put_ushort(self._data, us))
def put_short(self, s):
"""
Puts a signed short value.
@param s: an integral value
"""
self._check(pn_data_put_short(self._data, s))
def put_uint(self, ui):
"""
Puts an unsigned int value.
@param ui: an integral value
"""
self._check(pn_data_put_uint(self._data, ui))
def put_int(self, i):
"""
Puts a signed int value.
@param i: an integral value
"""
self._check(pn_data_put_int(self._data, i))
def put_char(self, c):
"""
Puts a char value.
@param c: a single character
"""
self._check(pn_data_put_char(self._data, ord(c)))
def put_ulong(self, ul):
"""
Puts an unsigned long value.
@param ul: an integral value
"""
self._check(pn_data_put_ulong(self._data, ul))
def put_long(self, l):
"""
Puts a signed long value.
@param l: an integral value
"""
self._check(pn_data_put_long(self._data, l))
def put_timestamp(self, t):
"""
Puts a timestamp value.
@param t: an integral value
"""
self._check(pn_data_put_timestamp(self._data, t))
def put_float(self, f):
"""
Puts a float value.
@param f: a floating point value
"""
self._check(pn_data_put_float(self._data, f))
def put_double(self, d):
"""
Puts a double value.
@param d: a floating point value.
"""
self._check(pn_data_put_double(self._data, d))
def put_decimal32(self, d):
"""
Puts a decimal32 value.
@param d: a decimal32 value
"""
self._check(pn_data_put_decimal32(self._data, d))
def put_decimal64(self, d):
"""
Puts a decimal64 value.
@param d: a decimal64 value
"""
self._check(pn_data_put_decimal64(self._data, d))
def put_decimal128(self, d):
"""
Puts a decimal128 value.
@param d: a decimal128 value
"""
self._check(pn_data_put_decimal128(self._data, d))
def put_uuid(self, u):
"""
Puts a UUID value.
@param u: a uuid value
"""
self._check(pn_data_put_uuid(self._data, u.bytes))
def put_binary(self, b):
"""
Puts a binary value.
@type b: binary
@param b: a binary value
"""
self._check(pn_data_put_binary(self._data, bytes(b)))
def put_string(self, s):
"""
Puts a unicode value.
@type s: unicode
@param s: a unicode value
"""
self._check(pn_data_put_string(self._data, s.encode("utf8")))
def put_symbol(self, s):
"""
Puts a symbolic value.
@type s: string
@param s: the symbol name
"""
self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
def get_list(self):
"""
If the current node is a list, return the number of elements,
otherwise return zero. List elements can be accessed by entering
the list.
>>> count = data.get_list()
>>> data.enter()
>>> for i in range(count):
... type = data.next()
... if type == Data.STRING:
... print data.get_string()
... elif type == ...:
... ...
>>> data.exit()
"""
return pn_data_get_list(self._data)
def get_map(self):
"""
If the current node is a map, return the number of child elements,
otherwise return zero. Key value pairs can be accessed by entering
the map.
>>> count = data.get_map()
>>> data.enter()
>>> for i in range(count/2):
... type = data.next()
... if type == Data.STRING:
... print data.get_string()
... elif type == ...:
... ...
>>> data.exit()
"""
return pn_data_get_map(self._data)
def get_array(self):
"""
If the current node is an array, return a tuple of the element
count, a boolean indicating whether the array is described, and
the type of each element, otherwise return (0, False, None). Array
data can be accessed by entering the array.
>>> # read an array of strings with a symbolic descriptor
>>> count, described, type = data.get_array()
>>> data.enter()
>>> data.next()
>>> print "Descriptor:", data.get_symbol()
>>> for i in range(count):
... data.next()
... print "Element:", data.get_string()
>>> data.exit()
"""
count = pn_data_get_array(self._data)
described = pn_data_is_array_described(self._data)
type = pn_data_get_array_type(self._data)
if type == -1:
type = None
return count, described, type
def is_described(self):
"""
Checks if the current node is a described value. The descriptor
and value may be accessed by entering the described value.
>>> # read a symbolically described string
>>> assert data.is_described() # will error if the current node is not described
>>> data.enter()
>>> data.next()
>>> print data.get_symbol()
>>> data.next()
>>> print data.get_string()
>>> data.exit()
"""
return pn_data_is_described(self._data)
def is_null(self):
"""
Checks if the current node is a null.
"""
return pn_data_is_null(self._data)
def get_bool(self):
"""
If the current node is a boolean, returns its value, returns False
otherwise.
"""
return pn_data_get_bool(self._data)
def get_ubyte(self):
"""
If the current node is an unsigned byte, returns its value,
returns 0 otherwise.
"""
return ubyte(pn_data_get_ubyte(self._data))
def get_byte(self):
"""
If the current node is a signed byte, returns its value, returns 0
otherwise.
"""
return byte(pn_data_get_byte(self._data))
def get_ushort(self):
"""
If the current node is an unsigned short, returns its value,
returns 0 otherwise.
"""
return ushort(pn_data_get_ushort(self._data))
def get_short(self):
"""
If the current node is a signed short, returns its value, returns
0 otherwise.
"""
return short(pn_data_get_short(self._data))
def get_uint(self):
"""
If the current node is an unsigned int, returns its value, returns
0 otherwise.
"""
return uint(pn_data_get_uint(self._data))
def get_int(self):
"""
If the current node is a signed int, returns its value, returns 0
otherwise.
"""
return int32(pn_data_get_int(self._data))
def get_char(self):
"""
If the current node is a char, returns its value, returns 0
otherwise.
"""
return char(_compat.unichar(pn_data_get_char(self._data)))
def get_ulong(self):
"""
If the current node is an unsigned long, returns its value,
returns 0 otherwise.
"""
return ulong(pn_data_get_ulong(self._data))
def get_long(self):
"""
If the current node is an signed long, returns its value, returns
0 otherwise.
"""
return long(pn_data_get_long(self._data))
def get_timestamp(self):
"""
If the current node is a timestamp, returns its value, returns 0
otherwise.
"""
return timestamp(pn_data_get_timestamp(self._data))
def get_float(self):
"""
If the current node is a float, returns its value, raises 0
otherwise.
"""
return float32(pn_data_get_float(self._data))
def get_double(self):
"""
If the current node is a double, returns its value, returns 0
otherwise.
"""
return pn_data_get_double(self._data)
# XXX: need to convert
def get_decimal32(self):
"""
If the current node is a decimal32, returns its value, returns 0
otherwise.
"""
return decimal32(pn_data_get_decimal32(self._data))
# XXX: need to convert
def get_decimal64(self):
"""
If the current node is a decimal64, returns its value, returns 0
otherwise.
"""
return decimal64(pn_data_get_decimal64(self._data))
# XXX: need to convert
def get_decimal128(self):
"""
If the current node is a decimal128, returns its value, returns 0
otherwise.
"""
return decimal128(pn_data_get_decimal128(self._data))
def get_uuid(self):
"""
If the current node is a UUID, returns its value, returns None
otherwise.
"""
if pn_data_type(self._data) == Data.UUID:
return uuid.UUID(bytes=pn_data_get_uuid(self._data))
else:
return None
def get_binary(self):
"""
If the current node is binary, returns its value, returns ""
otherwise.
"""
return pn_data_get_binary(self._data)
def get_string(self):
"""
If the current node is a string, returns its value, returns ""
otherwise.
"""
return pn_data_get_string(self._data).decode("utf8")
def get_symbol(self):
"""
If the current node is a symbol, returns its value, returns ""
otherwise.
"""
return symbol(pn_data_get_symbol(self._data).decode('ascii'))
def copy(self, src):
self._check(pn_data_copy(self._data, src._data))
def format(self):
sz = 16
while True:
err, result = pn_data_format(self._data, sz)
if err == PN_OVERFLOW:
sz *= 2
continue
else:
self._check(err)
return result
def dump(self):
pn_data_dump(self._data)
def put_dict(self, d):
self.put_map()
self.enter()
try:
for k, v in d.items():
self.put_object(k)
self.put_object(v)
finally:
self.exit()
def get_dict(self):
if self.enter():
try:
result = {}
while self.next():
k = self.get_object()
if self.next():
v = self.get_object()
else:
v = None
result[k] = v
finally:
self.exit()
return result
def put_sequence(self, s):
self.put_list()
self.enter()
try:
for o in s:
self.put_object(o)
finally:
self.exit()
def get_sequence(self):
if self.enter():
try:
result = []
while self.next():
result.append(self.get_object())
finally:
self.exit()
return result
def get_py_described(self):
if self.enter():
try:
self.next()
descriptor = self.get_object()
self.next()
value = self.get_object()
finally:
self.exit()
return Described(descriptor, value)
def put_py_described(self, d):
self.put_described()
self.enter()
try:
self.put_object(d.descriptor)
self.put_object(d.value)
finally:
self.exit()
def get_py_array(self):
"""
If the current node is an array, return an Array object
representing the array and its contents. Otherwise return None.
This is a convenience wrapper around get_array, enter, etc.
"""
count, described, type = self.get_array()
if type is None: return None
if self.enter():
try:
if described:
self.next()
descriptor = self.get_object()
else:
descriptor = UNDESCRIBED
elements = []
while self.next():
elements.append(self.get_object())
finally:
self.exit()
return Array(descriptor, type, *elements)
def put_py_array(self, a):
described = a.descriptor != UNDESCRIBED
self.put_array(described, a.type)
self.enter()
try:
if described:
self.put_object(a.descriptor)
for e in a.elements:
self.put_object(e)
finally:
self.exit()
put_mappings = {
None.__class__: lambda s, _: s.put_null(),
bool: put_bool,
ubyte: put_ubyte,
ushort: put_ushort,
uint: put_uint,
ulong: put_ulong,
byte: put_byte,
short: put_short,
int32: put_int,
long: put_long,
float32: put_float,
float: put_double,
decimal32: put_decimal32,
decimal64: put_decimal64,
decimal128: put_decimal128,
char: put_char,
timestamp: put_timestamp,
uuid.UUID: put_uuid,
bytes: put_binary,
unicode: put_string,
symbol: put_symbol,
list: put_sequence,
tuple: put_sequence,
dict: put_dict,
Described: put_py_described,
Array: put_py_array
}
# for python 3.x, long is merely an alias for int, but for python 2.x
# we need to add an explicit int since it is a different type
if int not in put_mappings:
put_mappings[int] = put_int
# For python 3.x use 'memoryview', for <=2.5 use 'buffer'. Python >=2.6 has both.
if getattr(__builtins__, 'memoryview', None):
put_mappings[memoryview] = put_binary
if getattr(__builtins__, 'buffer', None):
put_mappings[buffer] = put_binary
get_mappings = {
NULL: lambda s: None,
BOOL: get_bool,
BYTE: get_byte,
UBYTE: get_ubyte,
SHORT: get_short,
USHORT: get_ushort,
INT: get_int,
UINT: get_uint,
CHAR: get_char,
LONG: get_long,
ULONG: get_ulong,
TIMESTAMP: get_timestamp,
FLOAT: get_float,
DOUBLE: get_double,
DECIMAL32: get_decimal32,
DECIMAL64: get_decimal64,
DECIMAL128: get_decimal128,
UUID: get_uuid,
BINARY: get_binary,
STRING: get_string,
SYMBOL: get_symbol,
DESCRIBED: get_py_described,
ARRAY: get_py_array,
LIST: get_sequence,
MAP: get_dict
}
def put_object(self, obj):
putter = self.put_mappings[obj.__class__]
putter(self, obj)
def get_object(self):
type = self.type()
if type is None: return None
getter = self.get_mappings.get(type)
if getter:
return getter(self)
else:
return UnmappedType(str(type))
class ConnectionException(ProtonException):
pass
class Endpoint(object):
LOCAL_UNINIT = PN_LOCAL_UNINIT
REMOTE_UNINIT = PN_REMOTE_UNINIT
LOCAL_ACTIVE = PN_LOCAL_ACTIVE
REMOTE_ACTIVE = PN_REMOTE_ACTIVE
LOCAL_CLOSED = PN_LOCAL_CLOSED
REMOTE_CLOSED = PN_REMOTE_CLOSED
def _init(self):
self.condition = None
def _update_cond(self):
obj2cond(self.condition, self._get_cond_impl())
@property
def remote_condition(self):
return cond2obj(self._get_remote_cond_impl())
# the following must be provided by subclasses
def _get_cond_impl(self):
assert False, "Subclass must override this!"
def _get_remote_cond_impl(self):
assert False, "Subclass must override this!"
def _get_handler(self):
from . import reactor
ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
if ractor:
on_error = ractor.on_error
else:
on_error = None
record = self._get_attachments()
return WrappedHandler.wrap(pn_record_get_handler(record), on_error)
def _set_handler(self, handler):
from . import reactor
ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
if ractor:
on_error = ractor.on_error
else:
on_error = None
impl = _chandler(handler, on_error)
record = self._get_attachments()
pn_record_set_handler(record, impl)
pn_decref(impl)
handler = property(_get_handler, _set_handler)
@property
def transport(self):
return self.connection.transport
class Condition:
def __init__(self, name, description=None, info=None):
self.name = name
self.description = description
self.info = info
def __repr__(self):
return "Condition(%s)" % ", ".join([repr(x) for x in
(self.name, self.description, self.info)
if x])
def __eq__(self, o):
if not isinstance(o, Condition): return False
return self.name == o.name and \
self.description == o.description and \
self.info == o.info
def obj2cond(obj, cond):
pn_condition_clear(cond)
if obj:
pn_condition_set_name(cond, str(obj.name))
pn_condition_set_description(cond, obj.description)
info = Data(pn_condition_info(cond))
if obj.info:
info.put_object(obj.info)
def cond2obj(cond):
if pn_condition_is_set(cond):
return Condition(pn_condition_get_name(cond),
pn_condition_get_description(cond),
dat2obj(pn_condition_info(cond)))
else:
return None
def dat2obj(dimpl):
if dimpl:
d = Data(dimpl)
d.rewind()
d.next()
obj = d.get_object()
d.rewind()
return obj
def obj2dat(obj, dimpl):
if obj is not None:
d = Data(dimpl)
d.put_object(obj)
def secs2millis(secs):
return long(secs*1000)
def millis2secs(millis):
return float(millis)/1000.0
def timeout2millis(secs):
if secs is None: return PN_MILLIS_MAX
return secs2millis(secs)
def millis2timeout(millis):
if millis == PN_MILLIS_MAX: return None
return millis2secs(millis)
def unicode2utf8(string):
"""Some Proton APIs expect a null terminated string. Convert python text
types to UTF8 to avoid zero bytes introduced by other multi-byte encodings.
This method will throw if the string cannot be converted.
"""
if string is None:
return None
if _compat.IS_PY2:
if isinstance(string, unicode):
return string.encode('utf-8')
elif isinstance(string, str):
return string
else:
# decoding a string results in bytes
if isinstance(string, str):
string = string.encode('utf-8')
# fall through
if isinstance(string, bytes):
return string.decode('utf-8')
raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
def utf82unicode(string):
"""Covert C strings returned from proton-c into python unicode"""
if string is None:
return None
if isinstance(string, _compat.TEXT_TYPES):
# already unicode
return string
elif isinstance(string, _compat.BINARY_TYPES):
return string.decode('utf8')
else:
raise TypeError("Unrecognized string type")
class Connection(Wrapper, Endpoint):
"""
A representation of an AMQP connection
"""
@staticmethod
def wrap(impl):
if impl is None:
return None
else:
return Connection(impl)
def __init__(self, impl = pn_connection):
Wrapper.__init__(self, impl, pn_connection_attachments)
def _init(self):
Endpoint._init(self)
self.offered_capabilities = None
self.desired_capabilities = None
self.properties = None
def _get_attachments(self):
return pn_connection_attachments(self._impl)
@property
def connection(self):
return self
@property
def transport(self):
return Transport.wrap(pn_connection_transport(self._impl))
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, ConnectionException)
raise exc("[%s]: %s" % (err, pn_connection_error(self._impl)))
else:
return err
def _get_cond_impl(self):
return pn_connection_condition(self._impl)
def _get_remote_cond_impl(self):
return pn_connection_remote_condition(self._impl)
def collect(self, collector):
if collector is None:
pn_connection_collect(self._impl, None)
else:
pn_connection_collect(self._impl, collector._impl)
self._collector = weakref.ref(collector)
def _get_container(self):
return utf82unicode(pn_connection_get_container(self._impl))
def _set_container(self, name):
return pn_connection_set_container(self._impl, unicode2utf8(name))
container = property(_get_container, _set_container)
def _get_hostname(self):
return utf82unicode(pn_connection_get_hostname(self._impl))
def _set_hostname(self, name):
return pn_connection_set_hostname(self._impl, unicode2utf8(name))
hostname = property(_get_hostname, _set_hostname,
doc="""
Set the name of the host (either fully qualified or relative) to which this
connection is connecting to. This information may be used by the remote
peer to determine the correct back-end service to connect the client to.
This value will be sent in the Open performative, and will be used by SSL
and SASL layers to identify the peer.
""")
def _get_user(self):
return utf82unicode(pn_connection_get_user(self._impl))
def _set_user(self, name):
return pn_connection_set_user(self._impl, unicode2utf8(name))
user = property(_get_user, _set_user)
def _get_password(self):
return None
def _set_password(self, name):
return pn_connection_set_password(self._impl, unicode2utf8(name))
password = property(_get_password, _set_password)
@property
def remote_container(self):
"""The container identifier specified by the remote peer for this connection."""
return pn_connection_remote_container(self._impl)
@property
def remote_hostname(self):
"""The hostname specified by the remote peer for this connection."""
return pn_connection_remote_hostname(self._impl)
@property
def remote_offered_capabilities(self):
"""The capabilities offered by the remote peer for this connection."""
return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
@property
def remote_desired_capabilities(self):
"""The capabilities desired by the remote peer for this connection."""
return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
@property
def remote_properties(self):
"""The properties specified by the remote peer for this connection."""
return dat2obj(pn_connection_remote_properties(self._impl))
def open(self):
"""
Opens the connection.
In more detail, this moves the local state of the connection to
the ACTIVE state and triggers an open frame to be sent to the
peer. A connection is fully active once both peers have opened it.
"""
obj2dat(self.offered_capabilities,
pn_connection_offered_capabilities(self._impl))
obj2dat(self.desired_capabilities,
pn_connection_desired_capabilities(self._impl))
obj2dat(self.properties, pn_connection_properties(self._impl))
pn_connection_open(self._impl)
def close(self):
"""
Closes the connection.
In more detail, this moves the local state of the connection to
the CLOSED state and triggers a close frame to be sent to the
peer. A connection is fully closed once both peers have closed it.
"""
self._update_cond()
pn_connection_close(self._impl)
@property
def state(self):
"""
The state of the connection as a bit field. The state has a local
and a remote component. Each of these can be in one of three
states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
REMOTE_ACTIVE and REMOTE_CLOSED.
"""
return pn_connection_state(self._impl)
def session(self):
"""
Returns a new session on this connection.
"""
ssn = pn_session(self._impl)
if ssn is None:
raise(SessionException("Session allocation failed."))
else:
return Session(ssn)
def session_head(self, mask):
return Session.wrap(pn_session_head(self._impl, mask))
def link_head(self, mask):
return Link.wrap(pn_link_head(self._impl, mask))
@property
def work_head(self):
return Delivery.wrap(pn_work_head(self._impl))
@property
def error(self):
return pn_error_code(pn_connection_error(self._impl))
def free(self):
pn_connection_release(self._impl)
class SessionException(ProtonException):
pass
class Session(Wrapper, Endpoint):
@staticmethod
def wrap(impl):
if impl is None:
return None
else:
return Session(impl)
def __init__(self, impl):
Wrapper.__init__(self, impl, pn_session_attachments)
def _get_attachments(self):
return pn_session_attachments(self._impl)
def _get_cond_impl(self):
return pn_session_condition(self._impl)
def _get_remote_cond_impl(self):
return pn_session_remote_condition(self._impl)
def _get_incoming_capacity(self):
return pn_session_get_incoming_capacity(self._impl)
def _set_incoming_capacity(self, capacity):
pn_session_set_incoming_capacity(self._impl, capacity)
incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
def _get_outgoing_window(self):
return pn_session_get_outgoing_window(self._impl)
def _set_outgoing_window(self, window):
pn_session_set_outgoing_window(self._impl, window)
outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
@property
def outgoing_bytes(self):
return pn_session_outgoing_bytes(self._impl)
@property
def incoming_bytes(self):
return pn_session_incoming_bytes(self._impl)
def open(self):
pn_session_open(self._impl)
def close(self):
self._update_cond()
pn_session_close(self._impl)
def next(self, mask):
return Session.wrap(pn_session_next(self._impl, mask))
@property
def state(self):
return pn_session_state(self._impl)
@property
def connection(self):
return Connection.wrap(pn_session_connection(self._impl))
def sender(self, name):
return Sender(pn_sender(self._impl, unicode2utf8(name)))
def receiver(self, name):
return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
def free(self):
pn_session_free(self._impl)
class LinkException(ProtonException):
pass
class Link(Wrapper, Endpoint):
"""
A representation of an AMQP link, of which there are two concrete
implementations, Sender and Receiver.
"""
SND_UNSETTLED = PN_SND_UNSETTLED
SND_SETTLED = PN_SND_SETTLED
SND_MIXED = PN_SND_MIXED
RCV_FIRST = PN_RCV_FIRST
RCV_SECOND = PN_RCV_SECOND
@staticmethod
def wrap(impl):
if impl is None: return None
if pn_link_is_sender(impl):
return Sender(impl)
else:
return Receiver(impl)
def __init__(self, impl):
Wrapper.__init__(self, impl, pn_link_attachments)
def _get_attachments(self):
return pn_link_attachments(self._impl)
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, LinkException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
else:
return err
def _get_cond_impl(self):
return pn_link_condition(self._impl)
def _get_remote_cond_impl(self):
return pn_link_remote_condition(self._impl)
def open(self):
"""
Opens the link.
In more detail, this moves the local state of the link to the
ACTIVE state and triggers an attach frame to be sent to the
peer. A link is fully active once both peers have attached it.
"""
pn_link_open(self._impl)
def close(self):
"""
Closes the link.
In more detail, this moves the local state of the link to the
CLOSED state and triggers an detach frame (with the closed flag
set) to be sent to the peer. A link is fully closed once both
peers have detached it.
"""
self._update_cond()
pn_link_close(self._impl)
@property
def state(self):
"""
The state of the link as a bit field. The state has a local
and a remote component. Each of these can be in one of three
states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
REMOTE_ACTIVE and REMOTE_CLOSED.
"""
return pn_link_state(self._impl)
@property
def source(self):
"""The source of the link as described by the local peer."""
return Terminus(pn_link_source(self._impl))
@property
def target(self):
"""The target of the link as described by the local peer."""
return Terminus(pn_link_target(self._impl))
@property
def remote_source(self):
"""The source of the link as described by the remote peer."""
return Terminus(pn_link_remote_source(self._impl))
@property
def remote_target(self):
"""The target of the link as described by the remote peer."""
return Terminus(pn_link_remote_target(self._impl))
@property
def session(self):
return Session.wrap(pn_link_session(self._impl))
@property
def connection(self):
"""The connection on which this link was attached."""
return self.session.connection
def delivery(self, tag):
return Delivery(pn_delivery(self._impl, tag))
@property
def current(self):
return Delivery.wrap(pn_link_current(self._impl))
def advance(self):
return pn_link_advance(self._impl)
@property
def unsettled(self):
return pn_link_unsettled(self._impl)
@property
def credit(self):
"""The amount of oustanding credit on this link."""
return pn_link_credit(self._impl)
@property
def available(self):
return pn_link_available(self._impl)
@property
def queued(self):
return pn_link_queued(self._impl)
def next(self, mask):
return Link.wrap(pn_link_next(self._impl, mask))
@property
def name(self):
"""Returns the name of the link"""
return utf82unicode(pn_link_name(self._impl))
@property
def is_sender(self):
"""Returns true if this link is a sender."""
return pn_link_is_sender(self._impl)
@property
def is_receiver(self):
"""Returns true if this link is a receiver."""
return pn_link_is_receiver(self._impl)
@property
def remote_snd_settle_mode(self):
return pn_link_remote_snd_settle_mode(self._impl)
@property
def remote_rcv_settle_mode(self):
return pn_link_remote_rcv_settle_mode(self._impl)
def _get_snd_settle_mode(self):
return pn_link_snd_settle_mode(self._impl)
def _set_snd_settle_mode(self, mode):
pn_link_set_snd_settle_mode(self._impl, mode)
snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
def _get_rcv_settle_mode(self):
return pn_link_rcv_settle_mode(self._impl)
def _set_rcv_settle_mode(self, mode):
pn_link_set_rcv_settle_mode(self._impl, mode)
rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
def _get_drain(self):
return pn_link_get_drain(self._impl)
def _set_drain(self, b):
pn_link_set_drain(self._impl, bool(b))
drain_mode = property(_get_drain, _set_drain)
def drained(self):
return pn_link_drained(self._impl)
@property
def remote_max_message_size(self):
return pn_link_remote_max_message_size(self._impl)
def _get_max_message_size(self):
return pn_link_max_message_size(self._impl)
def _set_max_message_size(self, mode):
pn_link_set_max_message_size(self._impl, mode)
max_message_size = property(_get_max_message_size, _set_max_message_size)
def detach(self):
return pn_link_detach(self._impl)
def free(self):
pn_link_free(self._impl)
class Terminus(object):
UNSPECIFIED = PN_UNSPECIFIED
SOURCE = PN_SOURCE
TARGET = PN_TARGET
COORDINATOR = PN_COORDINATOR
NONDURABLE = PN_NONDURABLE
CONFIGURATION = PN_CONFIGURATION
DELIVERIES = PN_DELIVERIES
DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
DIST_MODE_COPY = PN_DIST_MODE_COPY
DIST_MODE_MOVE = PN_DIST_MODE_MOVE
EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK
EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION
EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION
EXPIRE_NEVER = PN_EXPIRE_NEVER
def __init__(self, impl):
self._impl = impl
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, LinkException)
raise exc("[%s]" % err)
else:
return err
def _get_type(self):
return pn_terminus_get_type(self._impl)
def _set_type(self, type):
self._check(pn_terminus_set_type(self._impl, type))
type = property(_get_type, _set_type)
def _get_address(self):
"""The address that identifies the source or target node"""
return utf82unicode(pn_terminus_get_address(self._impl))
def _set_address(self, address):
self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
address = property(_get_address, _set_address)
def _get_durability(self):
return pn_terminus_get_durability(self._impl)
def _set_durability(self, seconds):
self._check(pn_terminus_set_durability(self._impl, seconds))
durability = property(_get_durability, _set_durability)
def _get_expiry_policy(self):
return pn_terminus_get_expiry_policy(self._impl)
def _set_expiry_policy(self, seconds):
self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
def _get_timeout(self):
return pn_terminus_get_timeout(self._impl)
def _set_timeout(self, seconds):
self._check(pn_terminus_set_timeout(self._impl, seconds))
timeout = property(_get_timeout, _set_timeout)
def _is_dynamic(self):
"""Indicates whether the source or target node was dynamically
created"""
return pn_terminus_is_dynamic(self._impl)
def _set_dynamic(self, dynamic):
self._check(pn_terminus_set_dynamic(self._impl, dynamic))
dynamic = property(_is_dynamic, _set_dynamic)
def _get_distribution_mode(self):
return pn_terminus_get_distribution_mode(self._impl)
def _set_distribution_mode(self, mode):
self._check(pn_terminus_set_distribution_mode(self._impl, mode))
distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
@property
def properties(self):
"""Properties of a dynamic source or target."""
return Data(pn_terminus_properties(self._impl))
@property
def capabilities(self):
"""Capabilities of the source or target."""
return Data(pn_terminus_capabilities(self._impl))
@property
def outcomes(self):
return Data(pn_terminus_outcomes(self._impl))
@property
def filter(self):
"""A filter on a source allows the set of messages transfered over
the link to be restricted"""
return Data(pn_terminus_filter(self._impl))
def copy(self, src):
self._check(pn_terminus_copy(self._impl, src._impl))
class Sender(Link):
"""
A link over which messages are sent.
"""
def offered(self, n):
pn_link_offered(self._impl, n)
def stream(self, data):
"""
Send specified data as part of the current delivery
@type data: binary
@param data: data to send
"""
return self._check(pn_link_send(self._impl, data))
def send(self, obj, tag=None):
"""
Send specified object over this sender; the object is expected to
have a send() method on it that takes the sender and an optional
tag as arguments.
Where the object is a Message, this will send the message over
this link, creating a new delivery for the purpose.
"""
if hasattr(obj, 'send'):
return obj.send(self, tag=tag)
else:
# treat object as bytes
return self.stream(obj)
def delivery_tag(self):
if not hasattr(self, 'tag_generator'):
def simple_tags():
count = 1
while True:
yield str(count)
count += 1
self.tag_generator = simple_tags()
return next(self.tag_generator)
class Receiver(Link):
"""
A link over which messages are received.
"""
def flow(self, n):
"""Increases the credit issued to the remote sender by the specified number of messages."""
pn_link_flow(self._impl, n)
def recv(self, limit):
n, binary = pn_link_recv(self._impl, limit)
if n == PN_EOS:
return None
else:
self._check(n)
return binary
def drain(self, n):
pn_link_drain(self._impl, n)
def draining(self):
return pn_link_draining(self._impl)
class NamedInt(int):
values = {}
def __new__(cls, i, name):
ni = super(NamedInt, cls).__new__(cls, i)
cls.values[i] = ni
return ni
def __init__(self, i, name):
self.name = name
def __repr__(self):
return self.name
def __str__(self):
return self.name
@classmethod
def get(cls, i):
return cls.values.get(i, i)
class DispositionType(NamedInt):
values = {}
class Disposition(object):
RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
REJECTED = DispositionType(PN_REJECTED, "REJECTED")
RELEASED = DispositionType(PN_RELEASED, "RELEASED")
MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
def __init__(self, impl, local):
self._impl = impl
self.local = local
self._data = None
self._condition = None
self._annotations = None
@property
def type(self):
return DispositionType.get(pn_disposition_type(self._impl))
def _get_section_number(self):
return pn_disposition_get_section_number(self._impl)
def _set_section_number(self, n):
pn_disposition_set_section_number(self._impl, n)
section_number = property(_get_section_number, _set_section_number)
def _get_section_offset(self):
return pn_disposition_get_section_offset(self._impl)
def _set_section_offset(self, n):
pn_disposition_set_section_offset(self._impl, n)
section_offset = property(_get_section_offset, _set_section_offset)
def _get_failed(self):
return pn_disposition_is_failed(self._impl)
def _set_failed(self, b):
pn_disposition_set_failed(self._impl, b)
failed = property(_get_failed, _set_failed)
def _get_undeliverable(self):
return pn_disposition_is_undeliverable(self._impl)
def _set_undeliverable(self, b):
pn_disposition_set_undeliverable(self._impl, b)
undeliverable = property(_get_undeliverable, _set_undeliverable)
def _get_data(self):
if self.local:
return self._data
else:
return dat2obj(pn_disposition_data(self._impl))
def _set_data(self, obj):
if self.local:
self._data = obj
else:
raise AttributeError("data attribute is read-only")
data = property(_get_data, _set_data)
def _get_annotations(self):
if self.local:
return self._annotations
else:
return dat2obj(pn_disposition_annotations(self._impl))
def _set_annotations(self, obj):
if self.local:
self._annotations = obj
else:
raise AttributeError("annotations attribute is read-only")
annotations = property(_get_annotations, _set_annotations)
def _get_condition(self):
if self.local:
return self._condition
else:
return cond2obj(pn_disposition_condition(self._impl))
def _set_condition(self, obj):
if self.local:
self._condition = obj
else:
raise AttributeError("condition attribute is read-only")
condition = property(_get_condition, _set_condition)
class Delivery(Wrapper):
"""
Tracks and/or records the delivery of a message over a link.
"""
RECEIVED = Disposition.RECEIVED
ACCEPTED = Disposition.ACCEPTED
REJECTED = Disposition.REJECTED
RELEASED = Disposition.RELEASED
MODIFIED = Disposition.MODIFIED
@staticmethod
def wrap(impl):
if impl is None:
return None
else:
return Delivery(impl)
def __init__(self, impl):
Wrapper.__init__(self, impl, pn_delivery_attachments)
def _init(self):
self.local = Disposition(pn_delivery_local(self._impl), True)
self.remote = Disposition(pn_delivery_remote(self._impl), False)
@property
def tag(self):
"""The identifier for the delivery."""
return pn_delivery_tag(self._impl)
@property
def writable(self):
"""Returns true for an outgoing delivery to which data can now be written."""
return pn_delivery_writable(self._impl)
@property
def readable(self):
"""Returns true for an incoming delivery that has data to read."""
return pn_delivery_readable(self._impl)
@property
def updated(self):
"""Returns true if the state of the delivery has been updated
(e.g. it has been settled and/or accepted, rejected etc)."""
return pn_delivery_updated(self._impl)
def update(self, state):
"""
Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED.
"""
obj2dat(self.local._data, pn_disposition_data(self.local._impl))
obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
pn_delivery_update(self._impl, state)
@property
def pending(self):
return pn_delivery_pending(self._impl)
@property
def partial(self):
"""
Returns true for an incoming delivery if not all the data is
yet available.
"""
return pn_delivery_partial(self._impl)
@property
def local_state(self):
"""Returns the local state of the delivery."""
return DispositionType.get(pn_delivery_local_state(self._impl))
@property
def remote_state(self):
"""
Returns the state of the delivery as indicated by the remote
peer.
"""
return DispositionType.get(pn_delivery_remote_state(self._impl))
@property
def settled(self):
"""
Returns true if the delivery has been settled by the remote peer.
"""
return pn_delivery_settled(self._impl)
def settle(self):
"""
Settles the delivery locally. This indicates the aplication
considers the delivery complete and does not wish to receive any
further events about it. Every delivery should be settled locally.
"""
pn_delivery_settle(self._impl)
@property
def work_next(self):
return Delivery.wrap(pn_work_next(self._impl))
@property
def link(self):
"""
Returns the link on which the delivery was sent or received.
"""
return Link.wrap(pn_delivery_link(self._impl))
@property
def session(self):
"""
Returns the session over which the delivery was sent or received.
"""
return self.link.session
@property
def connection(self):
"""
Returns the connection over which the delivery was sent or received.
"""
return self.session.connection
@property
def transport(self):
return self.connection.transport
class TransportException(ProtonException):
pass
class TraceAdapter:
def __init__(self, tracer):
self.tracer = tracer
def __call__(self, trans_impl, message):
self.tracer(Transport.wrap(trans_impl), message)
class Transport(Wrapper):
TRACE_OFF = PN_TRACE_OFF
TRACE_DRV = PN_TRACE_DRV
TRACE_FRM = PN_TRACE_FRM
TRACE_RAW = PN_TRACE_RAW
CLIENT = 1
SERVER = 2
@staticmethod
def wrap(impl):
if impl is None:
return None
else:
return Transport(_impl=impl)
def __init__(self, mode=None, _impl = pn_transport):
Wrapper.__init__(self, _impl, pn_transport_attachments)
if mode == Transport.SERVER:
pn_transport_set_server(self._impl)
elif mode is None or mode==Transport.CLIENT:
pass
else:
raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
def _init(self):
self._sasl = None
self._ssl = None
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, TransportException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl))))
else:
return err
def _set_tracer(self, tracer):
pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
def _get_tracer(self):
adapter = pn_transport_get_pytracer(self._impl)
if adapter:
return adapter.tracer
else:
return None
tracer = property(_get_tracer, _set_tracer,
doc="""
A callback for trace logging. The callback is passed the transport and log message.
""")
def log(self, message):
pn_transport_log(self._impl, message)
def require_auth(self, bool):
pn_transport_require_auth(self._impl, bool)
def bind(self, connection):
"""Assign a connection to the transport"""
self._check(pn_transport_bind(self._impl, connection._impl))
def unbind(self):
"""Release the connection"""
self._check(pn_transport_unbind(self._impl))
def trace(self, n):
pn_transport_trace(self._impl, n)
def tick(self, now):
"""Process any timed events (like heartbeat generation).
now = seconds since epoch (float).
"""
return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
def capacity(self):
c = pn_transport_capacity(self._impl)
if c >= PN_EOS:
return c
else:
return self._check(c)
def push(self, binary):
n = self._check(pn_transport_push(self._impl, binary))
if n != len(binary):
raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
def close_tail(self):
self._check(pn_transport_close_tail(self._impl))
def pending(self):
p = pn_transport_pending(self._impl)
if p >= PN_EOS:
return p
else:
return self._check(p)
def peek(self, size):
cd, out = pn_transport_peek(self._impl, size)
if cd == PN_EOS:
return None
else:
self._check(cd)
return out
def pop(self, size):
pn_transport_pop(self._impl, size)
def close_head(self):
self._check(pn_transport_close_head(self._impl))
@property
def closed(self):
return pn_transport_closed(self._impl)
# AMQP 1.0 max-frame-size
def _get_max_frame_size(self):
return pn_transport_get_max_frame(self._impl)
def _set_max_frame_size(self, value):
pn_transport_set_max_frame(self._impl, value)
max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
doc="""
Sets the maximum size for received frames (in bytes).
""")
@property
def remote_max_frame_size(self):
return pn_transport_get_remote_max_frame(self._impl)
def _get_channel_max(self):
return pn_transport_get_channel_max(self._impl)
def _set_channel_max(self, value):
if pn_transport_set_channel_max(self._impl, value):
raise SessionException("Too late to change channel max.")
channel_max = property(_get_channel_max, _set_channel_max,
doc="""
Sets the maximum channel that may be used on the transport.
""")
@property
def remote_channel_max(self):
return pn_transport_remote_channel_max(self._impl)
# AMQP 1.0 idle-time-out
def _get_idle_timeout(self):
return millis2secs(pn_transport_get_idle_timeout(self._impl))
def _set_idle_timeout(self, sec):
pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
doc="""
The idle timeout of the connection (float, in seconds).
""")
@property
def remote_idle_timeout(self):
return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
@property
def frames_output(self):
return pn_transport_get_frames_output(self._impl)
@property
def frames_input(self):
return pn_transport_get_frames_input(self._impl)
def sasl(self):
return SASL(self)
def ssl(self, domain=None, session_details=None):
# SSL factory (singleton for this transport)
if not self._ssl:
self._ssl = SSL(self, domain, session_details)
return self._ssl
@property
def condition(self):
return cond2obj(pn_transport_condition(self._impl))
@property
def connection(self):
return Connection.wrap(pn_transport_connection(self._impl))
class SASLException(TransportException):
pass
class SASL(Wrapper):
OK = PN_SASL_OK
AUTH = PN_SASL_AUTH
SYS = PN_SASL_SYS
PERM = PN_SASL_PERM
TEMP = PN_SASL_TEMP
@staticmethod
def extended():
return pn_sasl_extended()
def __init__(self, transport):
Wrapper.__init__(self, transport._impl, pn_transport_attachments)
self._sasl = pn_sasl(transport._impl)
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, SASLException)
raise exc("[%s]" % (err))
else:
return err
@property
def user(self):
return pn_sasl_get_user(self._sasl)
@property
def mech(self):
return pn_sasl_get_mech(self._sasl)
@property
def outcome(self):
outcome = pn_sasl_outcome(self._sasl)
if outcome == PN_SASL_NONE:
return None