| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| """ |
| The proton module defines a suite of APIs that implement the AMQP 1.0 |
| protocol. |
| |
| The proton APIs consist of the following classes: |
| |
| - L{Message} -- A class for creating and/or accessing AMQP message content. |
| - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded |
| data. |
| |
| """ |
| from __future__ import absolute_import |
| |
| from cproton import * |
| from .wrapper import Wrapper |
| from proton import _compat |
| |
| import weakref, socket, sys, threading |
| |
| try: |
| import uuid |
| |
| def generate_uuid(): |
| return uuid.uuid4() |
| |
| except ImportError: |
| """ |
| No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. |
| """ |
| import struct |
| class uuid: |
| class UUID: |
| def __init__(self, hex=None, bytes=None): |
| if [hex, bytes].count(None) != 1: |
| raise TypeError("need one of hex or bytes") |
| if bytes is not None: |
| self.bytes = bytes |
| elif hex is not None: |
| fields=hex.split("-") |
| fields[4:5] = [fields[4][:4], fields[4][4:]] |
| self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields]) |
| |
| def __cmp__(self, other): |
| if isinstance(other, uuid.UUID): |
| return cmp(self.bytes, other.bytes) |
| else: |
| return -1 |
| |
| def __str__(self): |
| return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes) |
| |
| def __repr__(self): |
| return "UUID(%r)" % str(self) |
| |
| def __hash__(self): |
| return self.bytes.__hash__() |
| |
| import os, random, time |
| rand = random.Random() |
| rand.seed((os.getpid(), time.time(), socket.gethostname())) |
| def random_uuid(): |
| data = [rand.randint(0, 255) for i in xrange(16)] |
| |
| # From RFC4122, the version bits are set to 0100 |
| data[6] &= 0x0F |
| data[6] |= 0x40 |
| |
| # From RFC4122, the top two bits of byte 8 get set to 01 |
| data[8] &= 0x3F |
| data[8] |= 0x80 |
| return "".join(map(chr, data)) |
| |
| def uuid4(): |
| return uuid.UUID(bytes=random_uuid()) |
| |
| def generate_uuid(): |
| return uuid4() |
| |
| # |
| # Hacks to provide Python2 <---> Python3 compatibility |
| # |
| try: |
| bytes() |
| except NameError: |
| bytes = str |
| try: |
| long() |
| except NameError: |
| long = int |
| try: |
| unicode() |
| except NameError: |
| unicode = str |
| |
| |
| VERSION_MAJOR = PN_VERSION_MAJOR |
| VERSION_MINOR = PN_VERSION_MINOR |
| VERSION_POINT = PN_VERSION_POINT |
| VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT) |
| API_LANGUAGE = "C" |
| IMPLEMENTATION_LANGUAGE = "C" |
| |
| class Constant(object): |
| |
| def __init__(self, name): |
| self.name = name |
| |
| def __repr__(self): |
| return self.name |
| |
| class ProtonException(Exception): |
| """ |
| The root of the proton exception hierarchy. All proton exception |
| classes derive from this exception. |
| """ |
| pass |
| |
| class Timeout(ProtonException): |
| """ |
| A timeout exception indicates that a blocking operation has timed |
| out. |
| """ |
| pass |
| |
| class Interrupt(ProtonException): |
| """ |
| An interrupt exception indicaes that a blocking operation was interrupted. |
| """ |
| pass |
| |
| class MessageException(ProtonException): |
| """ |
| The MessageException class is the root of the message exception |
| hierarhcy. All exceptions generated by the Message class derive from |
| this exception. |
| """ |
| pass |
| |
| EXCEPTIONS = { |
| PN_TIMEOUT: Timeout, |
| PN_INTR: Interrupt |
| } |
| |
| |
| AUTOMATIC = Constant("AUTOMATIC") |
| MANUAL = Constant("MANUAL") |
| |
| |
| class Message(object): |
| """The L{Message} class is a mutable holder of message content. |
| |
| @ivar instructions: delivery instructions for the message |
| @type instructions: dict |
| @ivar annotations: infrastructure defined message annotations |
| @type annotations: dict |
| @ivar properties: application defined message properties |
| @type properties: dict |
| @ivar body: message body |
| @type body: bytes | unicode | dict | list | int | long | float | UUID |
| """ |
| |
| DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY |
| |
| def __init__(self, body=None, **kwargs): |
| """ |
| @param kwargs: Message property name/value pairs to initialise the Message |
| """ |
| self._msg = pn_message() |
| self._id = Data(pn_message_id(self._msg)) |
| self._correlation_id = Data(pn_message_correlation_id(self._msg)) |
| self.instructions = None |
| self.annotations = None |
| self.properties = None |
| self.body = body |
| for k,v in _compat.iteritems(kwargs): |
| getattr(self, k) # Raise exception if it's not a valid attribute. |
| setattr(self, k, v) |
| |
| def __del__(self): |
| if hasattr(self, "_msg"): |
| pn_message_free(self._msg) |
| del self._msg |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, MessageException) |
| raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) |
| else: |
| return err |
| |
| def _pre_encode(self): |
| inst = Data(pn_message_instructions(self._msg)) |
| ann = Data(pn_message_annotations(self._msg)) |
| props = Data(pn_message_properties(self._msg)) |
| body = Data(pn_message_body(self._msg)) |
| |
| inst.clear() |
| if self.instructions is not None: |
| inst.put_object(self.instructions) |
| ann.clear() |
| if self.annotations is not None: |
| ann.put_object(self.annotations) |
| props.clear() |
| if self.properties is not None: |
| props.put_object(self.properties) |
| body.clear() |
| if self.body is not None: |
| body.put_object(self.body) |
| |
| def _post_decode(self): |
| inst = Data(pn_message_instructions(self._msg)) |
| ann = Data(pn_message_annotations(self._msg)) |
| props = Data(pn_message_properties(self._msg)) |
| body = Data(pn_message_body(self._msg)) |
| |
| if inst.next(): |
| self.instructions = inst.get_object() |
| else: |
| self.instructions = None |
| if ann.next(): |
| self.annotations = ann.get_object() |
| else: |
| self.annotations = None |
| if props.next(): |
| self.properties = props.get_object() |
| else: |
| self.properties = None |
| if body.next(): |
| self.body = body.get_object() |
| else: |
| self.body = None |
| |
| def clear(self): |
| """ |
| Clears the contents of the L{Message}. All fields will be reset to |
| their default values. |
| """ |
| pn_message_clear(self._msg) |
| self.instructions = None |
| self.annotations = None |
| self.properties = None |
| self.body = None |
| |
| def _is_inferred(self): |
| return pn_message_is_inferred(self._msg) |
| |
| def _set_inferred(self, value): |
| self._check(pn_message_set_inferred(self._msg, bool(value))) |
| |
| inferred = property(_is_inferred, _set_inferred, doc=""" |
| The inferred flag for a message indicates how the message content |
| is encoded into AMQP sections. If inferred is true then binary and |
| list values in the body of the message will be encoded as AMQP DATA |
| and AMQP SEQUENCE sections, respectively. If inferred is false, |
| then all values in the body of the message will be encoded as AMQP |
| VALUE sections regardless of their type. |
| """) |
| |
| def _is_durable(self): |
| return pn_message_is_durable(self._msg) |
| |
| def _set_durable(self, value): |
| self._check(pn_message_set_durable(self._msg, bool(value))) |
| |
| durable = property(_is_durable, _set_durable, |
| doc=""" |
| The durable property indicates that the message should be held durably |
| by any intermediaries taking responsibility for the message. |
| """) |
| |
| def _get_priority(self): |
| return pn_message_get_priority(self._msg) |
| |
| def _set_priority(self, value): |
| self._check(pn_message_set_priority(self._msg, value)) |
| |
| priority = property(_get_priority, _set_priority, |
| doc=""" |
| The priority of the message. |
| """) |
| |
| def _get_ttl(self): |
| return millis2secs(pn_message_get_ttl(self._msg)) |
| |
| def _set_ttl(self, value): |
| self._check(pn_message_set_ttl(self._msg, secs2millis(value))) |
| |
| ttl = property(_get_ttl, _set_ttl, |
| doc=""" |
| The time to live of the message measured in seconds. Expired messages |
| may be dropped. |
| """) |
| |
| def _is_first_acquirer(self): |
| return pn_message_is_first_acquirer(self._msg) |
| |
| def _set_first_acquirer(self, value): |
| self._check(pn_message_set_first_acquirer(self._msg, bool(value))) |
| |
| first_acquirer = property(_is_first_acquirer, _set_first_acquirer, |
| doc=""" |
| True iff the recipient is the first to acquire the message. |
| """) |
| |
| def _get_delivery_count(self): |
| return pn_message_get_delivery_count(self._msg) |
| |
| def _set_delivery_count(self, value): |
| self._check(pn_message_set_delivery_count(self._msg, value)) |
| |
| delivery_count = property(_get_delivery_count, _set_delivery_count, |
| doc=""" |
| The number of delivery attempts made for this message. |
| """) |
| |
| |
| def _get_id(self): |
| return self._id.get_object() |
| def _set_id(self, value): |
| if type(value) in _compat.INT_TYPES: |
| value = ulong(value) |
| self._id.rewind() |
| self._id.put_object(value) |
| id = property(_get_id, _set_id, |
| doc=""" |
| The id of the message. |
| """) |
| |
| def _get_user_id(self): |
| return pn_message_get_user_id(self._msg) |
| |
| def _set_user_id(self, value): |
| self._check(pn_message_set_user_id(self._msg, value)) |
| |
| user_id = property(_get_user_id, _set_user_id, |
| doc=""" |
| The user id of the message creator. |
| """) |
| |
| def _get_address(self): |
| return utf82unicode(pn_message_get_address(self._msg)) |
| |
| def _set_address(self, value): |
| self._check(pn_message_set_address(self._msg, unicode2utf8(value))) |
| |
| address = property(_get_address, _set_address, |
| doc=""" |
| The address of the message. |
| """) |
| |
| def _get_subject(self): |
| return utf82unicode(pn_message_get_subject(self._msg)) |
| |
| def _set_subject(self, value): |
| self._check(pn_message_set_subject(self._msg, unicode2utf8(value))) |
| |
| subject = property(_get_subject, _set_subject, |
| doc=""" |
| The subject of the message. |
| """) |
| |
| def _get_reply_to(self): |
| return utf82unicode(pn_message_get_reply_to(self._msg)) |
| |
| def _set_reply_to(self, value): |
| self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value))) |
| |
| reply_to = property(_get_reply_to, _set_reply_to, |
| doc=""" |
| The reply-to address for the message. |
| """) |
| |
| def _get_correlation_id(self): |
| return self._correlation_id.get_object() |
| def _set_correlation_id(self, value): |
| if type(value) in _compat.INT_TYPES: |
| value = ulong(value) |
| self._correlation_id.rewind() |
| self._correlation_id.put_object(value) |
| |
| correlation_id = property(_get_correlation_id, _set_correlation_id, |
| doc=""" |
| The correlation-id for the message. |
| """) |
| |
| def _get_content_type(self): |
| return symbol(utf82unicode(pn_message_get_content_type(self._msg))) |
| |
| def _set_content_type(self, value): |
| self._check(pn_message_set_content_type(self._msg, unicode2utf8(value))) |
| |
| content_type = property(_get_content_type, _set_content_type, |
| doc=""" |
| The content-type of the message. |
| """) |
| |
| def _get_content_encoding(self): |
| return symbol(utf82unicode(pn_message_get_content_encoding(self._msg))) |
| |
| def _set_content_encoding(self, value): |
| self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value))) |
| |
| content_encoding = property(_get_content_encoding, _set_content_encoding, |
| doc=""" |
| The content-encoding of the message. |
| """) |
| |
| def _get_expiry_time(self): |
| return millis2secs(pn_message_get_expiry_time(self._msg)) |
| |
| def _set_expiry_time(self, value): |
| self._check(pn_message_set_expiry_time(self._msg, secs2millis(value))) |
| |
| expiry_time = property(_get_expiry_time, _set_expiry_time, |
| doc=""" |
| The expiry time of the message. |
| """) |
| |
| def _get_creation_time(self): |
| return millis2secs(pn_message_get_creation_time(self._msg)) |
| |
| def _set_creation_time(self, value): |
| self._check(pn_message_set_creation_time(self._msg, secs2millis(value))) |
| |
| creation_time = property(_get_creation_time, _set_creation_time, |
| doc=""" |
| The creation time of the message. |
| """) |
| |
| def _get_group_id(self): |
| return utf82unicode(pn_message_get_group_id(self._msg)) |
| |
| def _set_group_id(self, value): |
| self._check(pn_message_set_group_id(self._msg, unicode2utf8(value))) |
| |
| group_id = property(_get_group_id, _set_group_id, |
| doc=""" |
| The group id of the message. |
| """) |
| |
| def _get_group_sequence(self): |
| return pn_message_get_group_sequence(self._msg) |
| |
| def _set_group_sequence(self, value): |
| self._check(pn_message_set_group_sequence(self._msg, value)) |
| |
| group_sequence = property(_get_group_sequence, _set_group_sequence, |
| doc=""" |
| The sequence of the message within its group. |
| """) |
| |
| def _get_reply_to_group_id(self): |
| return utf82unicode(pn_message_get_reply_to_group_id(self._msg)) |
| |
| def _set_reply_to_group_id(self, value): |
| self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value))) |
| |
| reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, |
| doc=""" |
| The group-id for any replies. |
| """) |
| |
| def encode(self): |
| self._pre_encode() |
| sz = 16 |
| while True: |
| err, data = pn_message_encode(self._msg, sz) |
| if err == PN_OVERFLOW: |
| sz *= 2 |
| continue |
| else: |
| self._check(err) |
| return data |
| |
| def decode(self, data): |
| self._check(pn_message_decode(self._msg, data)) |
| self._post_decode() |
| |
| def send(self, sender, tag=None): |
| dlv = sender.delivery(tag or sender.delivery_tag()) |
| encoded = self.encode() |
| sender.stream(encoded) |
| sender.advance() |
| if sender.snd_settle_mode == Link.SND_SETTLED: |
| dlv.settle() |
| return dlv |
| |
| def recv(self, link): |
| """ |
| Receives and decodes the message content for the current delivery |
| from the link. Upon success it will return the current delivery |
| for the link. If there is no current delivery, or if the current |
| delivery is incomplete, or if the link is not a receiver, it will |
| return None. |
| |
| @type link: Link |
| @param link: the link to receive a message from |
| @return the delivery associated with the decoded message (or None) |
| |
| """ |
| if link.is_sender: return None |
| dlv = link.current |
| if not dlv or dlv.partial: return None |
| dlv.encoded = link.recv(dlv.pending) |
| link.advance() |
| # the sender has already forgotten about the delivery, so we might |
| # as well too |
| if link.remote_snd_settle_mode == Link.SND_SETTLED: |
| dlv.settle() |
| self.decode(dlv.encoded) |
| return dlv |
| |
| def __repr2__(self): |
| props = [] |
| for attr in ("inferred", "address", "reply_to", "durable", "ttl", |
| "priority", "first_acquirer", "delivery_count", "id", |
| "correlation_id", "user_id", "group_id", "group_sequence", |
| "reply_to_group_id", "instructions", "annotations", |
| "properties", "body"): |
| value = getattr(self, attr) |
| if value: props.append("%s=%r" % (attr, value)) |
| return "Message(%s)" % ", ".join(props) |
| |
| def __repr__(self): |
| tmp = pn_string(None) |
| err = pn_inspect(self._msg, tmp) |
| result = pn_string_get(tmp) |
| pn_free(tmp) |
| self._check(err) |
| return result |
| |
| class Subscription(object): |
| |
| def __init__(self, impl): |
| self._impl = impl |
| |
| @property |
| def address(self): |
| return pn_subscription_address(self._impl) |
| |
| _DEFAULT = object() |
| |
| class Selectable(Wrapper): |
| |
| @staticmethod |
| def wrap(impl): |
| if impl is None: |
| return None |
| else: |
| return Selectable(impl) |
| |
| def __init__(self, impl): |
| Wrapper.__init__(self, impl, pn_selectable_attachments) |
| |
| def _init(self): |
| pass |
| |
| def fileno(self, fd = _DEFAULT): |
| if fd is _DEFAULT: |
| return pn_selectable_get_fd(self._impl) |
| elif fd is None: |
| pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET) |
| else: |
| pn_selectable_set_fd(self._impl, fd) |
| |
| def _is_reading(self): |
| return pn_selectable_is_reading(self._impl) |
| |
| def _set_reading(self, val): |
| pn_selectable_set_reading(self._impl, bool(val)) |
| |
| reading = property(_is_reading, _set_reading) |
| |
| def _is_writing(self): |
| return pn_selectable_is_writing(self._impl) |
| |
| def _set_writing(self, val): |
| pn_selectable_set_writing(self._impl, bool(val)) |
| |
| writing = property(_is_writing, _set_writing) |
| |
| def _get_deadline(self): |
| tstamp = pn_selectable_get_deadline(self._impl) |
| if tstamp: |
| return millis2secs(tstamp) |
| else: |
| return None |
| |
| def _set_deadline(self, deadline): |
| pn_selectable_set_deadline(self._impl, secs2millis(deadline)) |
| |
| deadline = property(_get_deadline, _set_deadline) |
| |
| def readable(self): |
| pn_selectable_readable(self._impl) |
| |
| def writable(self): |
| pn_selectable_writable(self._impl) |
| |
| def expired(self): |
| pn_selectable_expired(self._impl) |
| |
| def _is_registered(self): |
| return pn_selectable_is_registered(self._impl) |
| |
| def _set_registered(self, registered): |
| pn_selectable_set_registered(self._impl, registered) |
| |
| registered = property(_is_registered, _set_registered, |
| doc=""" |
| The registered property may be get/set by an I/O polling system to |
| indicate whether the fd has been registered or not. |
| """) |
| |
| @property |
| def is_terminal(self): |
| return pn_selectable_is_terminal(self._impl) |
| |
| def terminate(self): |
| pn_selectable_terminate(self._impl) |
| |
| def release(self): |
| pn_selectable_release(self._impl) |
| |
| class DataException(ProtonException): |
| """ |
| The DataException class is the root of the Data exception hierarchy. |
| All exceptions raised by the Data class extend this exception. |
| """ |
| pass |
| |
| class UnmappedType: |
| |
| def __init__(self, msg): |
| self.msg = msg |
| |
| def __repr__(self): |
| return "UnmappedType(%s)" % self.msg |
| |
| class ulong(long): |
| |
| def __repr__(self): |
| return "ulong(%s)" % long.__repr__(self) |
| |
| class timestamp(long): |
| |
| def __repr__(self): |
| return "timestamp(%s)" % long.__repr__(self) |
| |
| class symbol(unicode): |
| |
| def __repr__(self): |
| return "symbol(%s)" % unicode.__repr__(self) |
| |
| class char(unicode): |
| |
| def __repr__(self): |
| return "char(%s)" % unicode.__repr__(self) |
| |
| class byte(int): |
| |
| def __repr__(self): |
| return "byte(%s)" % int.__repr__(self) |
| |
| class short(int): |
| |
| def __repr__(self): |
| return "short(%s)" % int.__repr__(self) |
| |
| class int32(int): |
| |
| def __repr__(self): |
| return "int32(%s)" % int.__repr__(self) |
| |
| class ubyte(int): |
| |
| def __repr__(self): |
| return "ubyte(%s)" % int.__repr__(self) |
| |
| class ushort(int): |
| |
| def __repr__(self): |
| return "ushort(%s)" % int.__repr__(self) |
| |
| class uint(long): |
| |
| def __repr__(self): |
| return "uint(%s)" % long.__repr__(self) |
| |
| class float32(float): |
| |
| def __repr__(self): |
| return "float32(%s)" % float.__repr__(self) |
| |
| class decimal32(int): |
| |
| def __repr__(self): |
| return "decimal32(%s)" % int.__repr__(self) |
| |
| class decimal64(long): |
| |
| def __repr__(self): |
| return "decimal64(%s)" % long.__repr__(self) |
| |
| class decimal128(bytes): |
| |
| def __repr__(self): |
| return "decimal128(%s)" % bytes.__repr__(self) |
| |
| class Described(object): |
| |
| def __init__(self, descriptor, value): |
| self.descriptor = descriptor |
| self.value = value |
| |
| def __repr__(self): |
| return "Described(%r, %r)" % (self.descriptor, self.value) |
| |
| def __eq__(self, o): |
| if isinstance(o, Described): |
| return self.descriptor == o.descriptor and self.value == o.value |
| else: |
| return False |
| |
| UNDESCRIBED = Constant("UNDESCRIBED") |
| |
| class Array(object): |
| |
| def __init__(self, descriptor, type, *elements): |
| self.descriptor = descriptor |
| self.type = type |
| self.elements = elements |
| |
| def __iter__(self): |
| return iter(self.elements) |
| |
| def __repr__(self): |
| if self.elements: |
| els = ", %s" % (", ".join(map(repr, self.elements))) |
| else: |
| els = "" |
| return "Array(%r, %r%s)" % (self.descriptor, self.type, els) |
| |
| def __eq__(self, o): |
| if isinstance(o, Array): |
| return self.descriptor == o.descriptor and \ |
| self.type == o.type and self.elements == o.elements |
| else: |
| return False |
| |
| class Data: |
| """ |
| The L{Data} class provides an interface for decoding, extracting, |
| creating, and encoding arbitrary AMQP data. A L{Data} object |
| contains a tree of AMQP values. Leaf nodes in this tree correspond |
| to scalars in the AMQP type system such as L{ints<INT>} or |
| L{strings<STRING>}. Non-leaf nodes in this tree correspond to |
| compound values in the AMQP type system such as L{lists<LIST>}, |
| L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. |
| The root node of the tree is the L{Data} object itself and can have |
| an arbitrary number of children. |
| |
| A L{Data} object maintains the notion of the current sibling node |
| and a current parent node. Siblings are ordered within their parent. |
| Values are accessed and/or added by using the L{next}, L{prev}, |
| L{enter}, and L{exit} methods to navigate to the desired location in |
| the tree and using the supplied variety of put_*/get_* methods to |
| access or add a value of the desired type. |
| |
| The put_* methods will always add a value I{after} the current node |
| in the tree. If the current node has a next sibling the put_* method |
| will overwrite the value on this node. If there is no current node |
| or the current node has no next sibling then one will be added. The |
| put_* methods always set the added/modified node to the current |
| node. The get_* methods read the value of the current node and do |
| not change which node is current. |
| |
| The following types of scalar values are supported: |
| |
| - L{NULL} |
| - L{BOOL} |
| - L{UBYTE} |
| - L{USHORT} |
| - L{SHORT} |
| - L{UINT} |
| - L{INT} |
| - L{ULONG} |
| - L{LONG} |
| - L{FLOAT} |
| - L{DOUBLE} |
| - L{BINARY} |
| - L{STRING} |
| - L{SYMBOL} |
| |
| The following types of compound values are supported: |
| |
| - L{DESCRIBED} |
| - L{ARRAY} |
| - L{LIST} |
| - L{MAP} |
| """ |
| |
| NULL = PN_NULL; "A null value." |
| BOOL = PN_BOOL; "A boolean value." |
| UBYTE = PN_UBYTE; "An unsigned byte value." |
| BYTE = PN_BYTE; "A signed byte value." |
| USHORT = PN_USHORT; "An unsigned short value." |
| SHORT = PN_SHORT; "A short value." |
| UINT = PN_UINT; "An unsigned int value." |
| INT = PN_INT; "A signed int value." |
| CHAR = PN_CHAR; "A character value." |
| ULONG = PN_ULONG; "An unsigned long value." |
| LONG = PN_LONG; "A signed long value." |
| TIMESTAMP = PN_TIMESTAMP; "A timestamp value." |
| FLOAT = PN_FLOAT; "A float value." |
| DOUBLE = PN_DOUBLE; "A double value." |
| DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." |
| DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." |
| DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." |
| UUID = PN_UUID; "A UUID value." |
| BINARY = PN_BINARY; "A binary string." |
| STRING = PN_STRING; "A unicode string." |
| SYMBOL = PN_SYMBOL; "A symbolic string." |
| DESCRIBED = PN_DESCRIBED; "A described value." |
| ARRAY = PN_ARRAY; "An array value." |
| LIST = PN_LIST; "A list value." |
| MAP = PN_MAP; "A map value." |
| |
| type_names = { |
| NULL: "null", |
| BOOL: "bool", |
| BYTE: "byte", |
| UBYTE: "ubyte", |
| SHORT: "short", |
| USHORT: "ushort", |
| INT: "int", |
| UINT: "uint", |
| CHAR: "char", |
| LONG: "long", |
| ULONG: "ulong", |
| TIMESTAMP: "timestamp", |
| FLOAT: "float", |
| DOUBLE: "double", |
| DECIMAL32: "decimal32", |
| DECIMAL64: "decimal64", |
| DECIMAL128: "decimal128", |
| UUID: "uuid", |
| BINARY: "binary", |
| STRING: "string", |
| SYMBOL: "symbol", |
| DESCRIBED: "described", |
| ARRAY: "array", |
| LIST: "list", |
| MAP: "map" |
| } |
| |
| @classmethod |
| def type_name(type): return Data.type_names[type] |
| |
| def __init__(self, capacity=16): |
| if type(capacity) in _compat.INT_TYPES: |
| self._data = pn_data(capacity) |
| self._free = True |
| else: |
| self._data = capacity |
| self._free = False |
| |
| def __del__(self): |
| if self._free and hasattr(self, "_data"): |
| pn_data_free(self._data) |
| del self._data |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, DataException) |
| raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) |
| else: |
| return err |
| |
| def clear(self): |
| """ |
| Clears the data object. |
| """ |
| pn_data_clear(self._data) |
| |
| def rewind(self): |
| """ |
| Clears current node and sets the parent to the root node. Clearing the |
| current node sets it _before_ the first node, calling next() will advance to |
| the first node. |
| """ |
| assert self._data is not None |
| pn_data_rewind(self._data) |
| |
| def next(self): |
| """ |
| Advances the current node to its next sibling and returns its |
| type. If there is no next sibling the current node remains |
| unchanged and None is returned. |
| """ |
| found = pn_data_next(self._data) |
| if found: |
| return self.type() |
| else: |
| return None |
| |
| def prev(self): |
| """ |
| Advances the current node to its previous sibling and returns its |
| type. If there is no previous sibling the current node remains |
| unchanged and None is returned. |
| """ |
| found = pn_data_prev(self._data) |
| if found: |
| return self.type() |
| else: |
| return None |
| |
| def enter(self): |
| """ |
| Sets the parent node to the current node and clears the current node. |
| Clearing the current node sets it _before_ the first child, |
| call next() advances to the first child. |
| """ |
| return pn_data_enter(self._data) |
| |
| def exit(self): |
| """ |
| Sets the current node to the parent node and the parent node to |
| its own parent. |
| """ |
| return pn_data_exit(self._data) |
| |
| def lookup(self, name): |
| return pn_data_lookup(self._data, name) |
| |
| def narrow(self): |
| pn_data_narrow(self._data) |
| |
| def widen(self): |
| pn_data_widen(self._data) |
| |
| def type(self): |
| """ |
| Returns the type of the current node. |
| """ |
| dtype = pn_data_type(self._data) |
| if dtype == -1: |
| return None |
| else: |
| return dtype |
| |
| def encoded_size(self): |
| """ |
| Returns the size in bytes needed to encode the data in AMQP format. |
| """ |
| return pn_data_encoded_size(self._data) |
| |
| def encode(self): |
| """ |
| Returns a representation of the data encoded in AMQP format. |
| """ |
| size = 1024 |
| while True: |
| cd, enc = pn_data_encode(self._data, size) |
| if cd == PN_OVERFLOW: |
| size *= 2 |
| elif cd >= 0: |
| return enc |
| else: |
| self._check(cd) |
| |
| def decode(self, encoded): |
| """ |
| Decodes the first value from supplied AMQP data and returns the |
| number of bytes consumed. |
| |
| @type encoded: binary |
| @param encoded: AMQP encoded binary data |
| """ |
| return self._check(pn_data_decode(self._data, encoded)) |
| |
| def put_list(self): |
| """ |
| Puts a list value. Elements may be filled by entering the list |
| node and putting element values. |
| |
| >>> data = Data() |
| >>> data.put_list() |
| >>> data.enter() |
| >>> data.put_int(1) |
| >>> data.put_int(2) |
| >>> data.put_int(3) |
| >>> data.exit() |
| """ |
| self._check(pn_data_put_list(self._data)) |
| |
| def put_map(self): |
| """ |
| Puts a map value. Elements may be filled by entering the map node |
| and putting alternating key value pairs. |
| |
| >>> data = Data() |
| >>> data.put_map() |
| >>> data.enter() |
| >>> data.put_string("key") |
| >>> data.put_string("value") |
| >>> data.exit() |
| """ |
| self._check(pn_data_put_map(self._data)) |
| |
| def put_array(self, described, element_type): |
| """ |
| Puts an array value. Elements may be filled by entering the array |
| node and putting the element values. The values must all be of the |
| specified array element type. If an array is described then the |
| first child value of the array is the descriptor and may be of any |
| type. |
| |
| >>> data = Data() |
| >>> |
| >>> data.put_array(False, Data.INT) |
| >>> data.enter() |
| >>> data.put_int(1) |
| >>> data.put_int(2) |
| >>> data.put_int(3) |
| >>> data.exit() |
| >>> |
| >>> data.put_array(True, Data.DOUBLE) |
| >>> data.enter() |
| >>> data.put_symbol("array-descriptor") |
| >>> data.put_double(1.1) |
| >>> data.put_double(1.2) |
| >>> data.put_double(1.3) |
| >>> data.exit() |
| |
| @type described: bool |
| @param described: specifies whether the array is described |
| @type element_type: int |
| @param element_type: the type of the array elements |
| """ |
| self._check(pn_data_put_array(self._data, described, element_type)) |
| |
| def put_described(self): |
| """ |
| Puts a described value. A described node has two children, the |
| descriptor and the value. These are specified by entering the node |
| and putting the desired values. |
| |
| >>> data = Data() |
| >>> data.put_described() |
| >>> data.enter() |
| >>> data.put_symbol("value-descriptor") |
| >>> data.put_string("the value") |
| >>> data.exit() |
| """ |
| self._check(pn_data_put_described(self._data)) |
| |
| def put_null(self): |
| """ |
| Puts a null value. |
| """ |
| self._check(pn_data_put_null(self._data)) |
| |
| def put_bool(self, b): |
| """ |
| Puts a boolean value. |
| |
| @param b: a boolean value |
| """ |
| self._check(pn_data_put_bool(self._data, b)) |
| |
| def put_ubyte(self, ub): |
| """ |
| Puts an unsigned byte value. |
| |
| @param ub: an integral value |
| """ |
| self._check(pn_data_put_ubyte(self._data, ub)) |
| |
| def put_byte(self, b): |
| """ |
| Puts a signed byte value. |
| |
| @param b: an integral value |
| """ |
| self._check(pn_data_put_byte(self._data, b)) |
| |
| def put_ushort(self, us): |
| """ |
| Puts an unsigned short value. |
| |
| @param us: an integral value. |
| """ |
| self._check(pn_data_put_ushort(self._data, us)) |
| |
| def put_short(self, s): |
| """ |
| Puts a signed short value. |
| |
| @param s: an integral value |
| """ |
| self._check(pn_data_put_short(self._data, s)) |
| |
| def put_uint(self, ui): |
| """ |
| Puts an unsigned int value. |
| |
| @param ui: an integral value |
| """ |
| self._check(pn_data_put_uint(self._data, ui)) |
| |
| def put_int(self, i): |
| """ |
| Puts a signed int value. |
| |
| @param i: an integral value |
| """ |
| self._check(pn_data_put_int(self._data, i)) |
| |
| def put_char(self, c): |
| """ |
| Puts a char value. |
| |
| @param c: a single character |
| """ |
| self._check(pn_data_put_char(self._data, ord(c))) |
| |
| def put_ulong(self, ul): |
| """ |
| Puts an unsigned long value. |
| |
| @param ul: an integral value |
| """ |
| self._check(pn_data_put_ulong(self._data, ul)) |
| |
| def put_long(self, l): |
| """ |
| Puts a signed long value. |
| |
| @param l: an integral value |
| """ |
| self._check(pn_data_put_long(self._data, l)) |
| |
| def put_timestamp(self, t): |
| """ |
| Puts a timestamp value. |
| |
| @param t: an integral value |
| """ |
| self._check(pn_data_put_timestamp(self._data, t)) |
| |
| def put_float(self, f): |
| """ |
| Puts a float value. |
| |
| @param f: a floating point value |
| """ |
| self._check(pn_data_put_float(self._data, f)) |
| |
| def put_double(self, d): |
| """ |
| Puts a double value. |
| |
| @param d: a floating point value. |
| """ |
| self._check(pn_data_put_double(self._data, d)) |
| |
| def put_decimal32(self, d): |
| """ |
| Puts a decimal32 value. |
| |
| @param d: a decimal32 value |
| """ |
| self._check(pn_data_put_decimal32(self._data, d)) |
| |
| def put_decimal64(self, d): |
| """ |
| Puts a decimal64 value. |
| |
| @param d: a decimal64 value |
| """ |
| self._check(pn_data_put_decimal64(self._data, d)) |
| |
| def put_decimal128(self, d): |
| """ |
| Puts a decimal128 value. |
| |
| @param d: a decimal128 value |
| """ |
| self._check(pn_data_put_decimal128(self._data, d)) |
| |
| def put_uuid(self, u): |
| """ |
| Puts a UUID value. |
| |
| @param u: a uuid value |
| """ |
| self._check(pn_data_put_uuid(self._data, u.bytes)) |
| |
| def put_binary(self, b): |
| """ |
| Puts a binary value. |
| |
| @type b: binary |
| @param b: a binary value |
| """ |
| self._check(pn_data_put_binary(self._data, bytes(b))) |
| |
| def put_string(self, s): |
| """ |
| Puts a unicode value. |
| |
| @type s: unicode |
| @param s: a unicode value |
| """ |
| self._check(pn_data_put_string(self._data, s.encode("utf8"))) |
| |
| def put_symbol(self, s): |
| """ |
| Puts a symbolic value. |
| |
| @type s: string |
| @param s: the symbol name |
| """ |
| self._check(pn_data_put_symbol(self._data, s.encode('ascii'))) |
| |
| def get_list(self): |
| """ |
| If the current node is a list, return the number of elements, |
| otherwise return zero. List elements can be accessed by entering |
| the list. |
| |
| >>> count = data.get_list() |
| >>> data.enter() |
| >>> for i in range(count): |
| ... type = data.next() |
| ... if type == Data.STRING: |
| ... print data.get_string() |
| ... elif type == ...: |
| ... ... |
| >>> data.exit() |
| """ |
| return pn_data_get_list(self._data) |
| |
| def get_map(self): |
| """ |
| If the current node is a map, return the number of child elements, |
| otherwise return zero. Key value pairs can be accessed by entering |
| the map. |
| |
| >>> count = data.get_map() |
| >>> data.enter() |
| >>> for i in range(count/2): |
| ... type = data.next() |
| ... if type == Data.STRING: |
| ... print data.get_string() |
| ... elif type == ...: |
| ... ... |
| >>> data.exit() |
| """ |
| return pn_data_get_map(self._data) |
| |
| def get_array(self): |
| """ |
| If the current node is an array, return a tuple of the element |
| count, a boolean indicating whether the array is described, and |
| the type of each element, otherwise return (0, False, None). Array |
| data can be accessed by entering the array. |
| |
| >>> # read an array of strings with a symbolic descriptor |
| >>> count, described, type = data.get_array() |
| >>> data.enter() |
| >>> data.next() |
| >>> print "Descriptor:", data.get_symbol() |
| >>> for i in range(count): |
| ... data.next() |
| ... print "Element:", data.get_string() |
| >>> data.exit() |
| """ |
| count = pn_data_get_array(self._data) |
| described = pn_data_is_array_described(self._data) |
| type = pn_data_get_array_type(self._data) |
| if type == -1: |
| type = None |
| return count, described, type |
| |
| def is_described(self): |
| """ |
| Checks if the current node is a described value. The descriptor |
| and value may be accessed by entering the described value. |
| |
| >>> # read a symbolically described string |
| >>> assert data.is_described() # will error if the current node is not described |
| >>> data.enter() |
| >>> data.next() |
| >>> print data.get_symbol() |
| >>> data.next() |
| >>> print data.get_string() |
| >>> data.exit() |
| """ |
| return pn_data_is_described(self._data) |
| |
| def is_null(self): |
| """ |
| Checks if the current node is a null. |
| """ |
| return pn_data_is_null(self._data) |
| |
| def get_bool(self): |
| """ |
| If the current node is a boolean, returns its value, returns False |
| otherwise. |
| """ |
| return pn_data_get_bool(self._data) |
| |
| def get_ubyte(self): |
| """ |
| If the current node is an unsigned byte, returns its value, |
| returns 0 otherwise. |
| """ |
| return ubyte(pn_data_get_ubyte(self._data)) |
| |
| def get_byte(self): |
| """ |
| If the current node is a signed byte, returns its value, returns 0 |
| otherwise. |
| """ |
| return byte(pn_data_get_byte(self._data)) |
| |
| def get_ushort(self): |
| """ |
| If the current node is an unsigned short, returns its value, |
| returns 0 otherwise. |
| """ |
| return ushort(pn_data_get_ushort(self._data)) |
| |
| def get_short(self): |
| """ |
| If the current node is a signed short, returns its value, returns |
| 0 otherwise. |
| """ |
| return short(pn_data_get_short(self._data)) |
| |
| def get_uint(self): |
| """ |
| If the current node is an unsigned int, returns its value, returns |
| 0 otherwise. |
| """ |
| return uint(pn_data_get_uint(self._data)) |
| |
| def get_int(self): |
| """ |
| If the current node is a signed int, returns its value, returns 0 |
| otherwise. |
| """ |
| return int32(pn_data_get_int(self._data)) |
| |
| def get_char(self): |
| """ |
| If the current node is a char, returns its value, returns 0 |
| otherwise. |
| """ |
| return char(_compat.unichar(pn_data_get_char(self._data))) |
| |
| def get_ulong(self): |
| """ |
| If the current node is an unsigned long, returns its value, |
| returns 0 otherwise. |
| """ |
| return ulong(pn_data_get_ulong(self._data)) |
| |
| def get_long(self): |
| """ |
| If the current node is an signed long, returns its value, returns |
| 0 otherwise. |
| """ |
| return long(pn_data_get_long(self._data)) |
| |
| def get_timestamp(self): |
| """ |
| If the current node is a timestamp, returns its value, returns 0 |
| otherwise. |
| """ |
| return timestamp(pn_data_get_timestamp(self._data)) |
| |
| def get_float(self): |
| """ |
| If the current node is a float, returns its value, raises 0 |
| otherwise. |
| """ |
| return float32(pn_data_get_float(self._data)) |
| |
| def get_double(self): |
| """ |
| If the current node is a double, returns its value, returns 0 |
| otherwise. |
| """ |
| return pn_data_get_double(self._data) |
| |
| # XXX: need to convert |
| def get_decimal32(self): |
| """ |
| If the current node is a decimal32, returns its value, returns 0 |
| otherwise. |
| """ |
| return decimal32(pn_data_get_decimal32(self._data)) |
| |
| # XXX: need to convert |
| def get_decimal64(self): |
| """ |
| If the current node is a decimal64, returns its value, returns 0 |
| otherwise. |
| """ |
| return decimal64(pn_data_get_decimal64(self._data)) |
| |
| # XXX: need to convert |
| def get_decimal128(self): |
| """ |
| If the current node is a decimal128, returns its value, returns 0 |
| otherwise. |
| """ |
| return decimal128(pn_data_get_decimal128(self._data)) |
| |
| def get_uuid(self): |
| """ |
| If the current node is a UUID, returns its value, returns None |
| otherwise. |
| """ |
| if pn_data_type(self._data) == Data.UUID: |
| return uuid.UUID(bytes=pn_data_get_uuid(self._data)) |
| else: |
| return None |
| |
| def get_binary(self): |
| """ |
| If the current node is binary, returns its value, returns "" |
| otherwise. |
| """ |
| return pn_data_get_binary(self._data) |
| |
| def get_string(self): |
| """ |
| If the current node is a string, returns its value, returns "" |
| otherwise. |
| """ |
| return pn_data_get_string(self._data).decode("utf8") |
| |
| def get_symbol(self): |
| """ |
| If the current node is a symbol, returns its value, returns "" |
| otherwise. |
| """ |
| return symbol(pn_data_get_symbol(self._data).decode('ascii')) |
| |
| def copy(self, src): |
| self._check(pn_data_copy(self._data, src._data)) |
| |
| def format(self): |
| sz = 16 |
| while True: |
| err, result = pn_data_format(self._data, sz) |
| if err == PN_OVERFLOW: |
| sz *= 2 |
| continue |
| else: |
| self._check(err) |
| return result |
| |
| def dump(self): |
| pn_data_dump(self._data) |
| |
| def put_dict(self, d): |
| self.put_map() |
| self.enter() |
| try: |
| for k, v in d.items(): |
| self.put_object(k) |
| self.put_object(v) |
| finally: |
| self.exit() |
| |
| def get_dict(self): |
| if self.enter(): |
| try: |
| result = {} |
| while self.next(): |
| k = self.get_object() |
| if self.next(): |
| v = self.get_object() |
| else: |
| v = None |
| result[k] = v |
| finally: |
| self.exit() |
| return result |
| |
| def put_sequence(self, s): |
| self.put_list() |
| self.enter() |
| try: |
| for o in s: |
| self.put_object(o) |
| finally: |
| self.exit() |
| |
| def get_sequence(self): |
| if self.enter(): |
| try: |
| result = [] |
| while self.next(): |
| result.append(self.get_object()) |
| finally: |
| self.exit() |
| return result |
| |
| def get_py_described(self): |
| if self.enter(): |
| try: |
| self.next() |
| descriptor = self.get_object() |
| self.next() |
| value = self.get_object() |
| finally: |
| self.exit() |
| return Described(descriptor, value) |
| |
| def put_py_described(self, d): |
| self.put_described() |
| self.enter() |
| try: |
| self.put_object(d.descriptor) |
| self.put_object(d.value) |
| finally: |
| self.exit() |
| |
| def get_py_array(self): |
| """ |
| If the current node is an array, return an Array object |
| representing the array and its contents. Otherwise return None. |
| This is a convenience wrapper around get_array, enter, etc. |
| """ |
| |
| count, described, type = self.get_array() |
| if type is None: return None |
| if self.enter(): |
| try: |
| if described: |
| self.next() |
| descriptor = self.get_object() |
| else: |
| descriptor = UNDESCRIBED |
| elements = [] |
| while self.next(): |
| elements.append(self.get_object()) |
| finally: |
| self.exit() |
| return Array(descriptor, type, *elements) |
| |
| def put_py_array(self, a): |
| described = a.descriptor != UNDESCRIBED |
| self.put_array(described, a.type) |
| self.enter() |
| try: |
| if described: |
| self.put_object(a.descriptor) |
| for e in a.elements: |
| self.put_object(e) |
| finally: |
| self.exit() |
| |
| put_mappings = { |
| None.__class__: lambda s, _: s.put_null(), |
| bool: put_bool, |
| ubyte: put_ubyte, |
| ushort: put_ushort, |
| uint: put_uint, |
| ulong: put_ulong, |
| byte: put_byte, |
| short: put_short, |
| int32: put_int, |
| long: put_long, |
| float32: put_float, |
| float: put_double, |
| decimal32: put_decimal32, |
| decimal64: put_decimal64, |
| decimal128: put_decimal128, |
| char: put_char, |
| timestamp: put_timestamp, |
| uuid.UUID: put_uuid, |
| bytes: put_binary, |
| unicode: put_string, |
| symbol: put_symbol, |
| list: put_sequence, |
| tuple: put_sequence, |
| dict: put_dict, |
| Described: put_py_described, |
| Array: put_py_array |
| } |
| # for python 3.x, long is merely an alias for int, but for python 2.x |
| # we need to add an explicit int since it is a different type |
| if int not in put_mappings: |
| put_mappings[int] = put_int |
| # For python 3.x use 'memoryview', for <=2.5 use 'buffer'. Python >=2.6 has both. |
| if getattr(__builtins__, 'memoryview', None): |
| put_mappings[memoryview] = put_binary |
| if getattr(__builtins__, 'buffer', None): |
| put_mappings[buffer] = put_binary |
| |
| get_mappings = { |
| NULL: lambda s: None, |
| BOOL: get_bool, |
| BYTE: get_byte, |
| UBYTE: get_ubyte, |
| SHORT: get_short, |
| USHORT: get_ushort, |
| INT: get_int, |
| UINT: get_uint, |
| CHAR: get_char, |
| LONG: get_long, |
| ULONG: get_ulong, |
| TIMESTAMP: get_timestamp, |
| FLOAT: get_float, |
| DOUBLE: get_double, |
| DECIMAL32: get_decimal32, |
| DECIMAL64: get_decimal64, |
| DECIMAL128: get_decimal128, |
| UUID: get_uuid, |
| BINARY: get_binary, |
| STRING: get_string, |
| SYMBOL: get_symbol, |
| DESCRIBED: get_py_described, |
| ARRAY: get_py_array, |
| LIST: get_sequence, |
| MAP: get_dict |
| } |
| |
| |
| def put_object(self, obj): |
| putter = self.put_mappings[obj.__class__] |
| putter(self, obj) |
| |
| def get_object(self): |
| type = self.type() |
| if type is None: return None |
| getter = self.get_mappings.get(type) |
| if getter: |
| return getter(self) |
| else: |
| return UnmappedType(str(type)) |
| |
| class ConnectionException(ProtonException): |
| pass |
| |
| class Endpoint(object): |
| |
| LOCAL_UNINIT = PN_LOCAL_UNINIT |
| REMOTE_UNINIT = PN_REMOTE_UNINIT |
| LOCAL_ACTIVE = PN_LOCAL_ACTIVE |
| REMOTE_ACTIVE = PN_REMOTE_ACTIVE |
| LOCAL_CLOSED = PN_LOCAL_CLOSED |
| REMOTE_CLOSED = PN_REMOTE_CLOSED |
| |
| def _init(self): |
| self.condition = None |
| |
| def _update_cond(self): |
| obj2cond(self.condition, self._get_cond_impl()) |
| |
| @property |
| def remote_condition(self): |
| return cond2obj(self._get_remote_cond_impl()) |
| |
| # the following must be provided by subclasses |
| def _get_cond_impl(self): |
| assert False, "Subclass must override this!" |
| |
| def _get_remote_cond_impl(self): |
| assert False, "Subclass must override this!" |
| |
| def _get_handler(self): |
| from . import reactor |
| ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) |
| if ractor: |
| on_error = ractor.on_error |
| else: |
| on_error = None |
| record = self._get_attachments() |
| return WrappedHandler.wrap(pn_record_get_handler(record), on_error) |
| |
| def _set_handler(self, handler): |
| from . import reactor |
| ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) |
| if ractor: |
| on_error = ractor.on_error |
| else: |
| on_error = None |
| impl = _chandler(handler, on_error) |
| record = self._get_attachments() |
| pn_record_set_handler(record, impl) |
| pn_decref(impl) |
| |
| handler = property(_get_handler, _set_handler) |
| |
| @property |
| def transport(self): |
| return self.connection.transport |
| |
| class Condition: |
| |
| def __init__(self, name, description=None, info=None): |
| self.name = name |
| self.description = description |
| self.info = info |
| |
| def __repr__(self): |
| return "Condition(%s)" % ", ".join([repr(x) for x in |
| (self.name, self.description, self.info) |
| if x]) |
| |
| def __eq__(self, o): |
| if not isinstance(o, Condition): return False |
| return self.name == o.name and \ |
| self.description == o.description and \ |
| self.info == o.info |
| |
| def obj2cond(obj, cond): |
| pn_condition_clear(cond) |
| if obj: |
| pn_condition_set_name(cond, str(obj.name)) |
| pn_condition_set_description(cond, obj.description) |
| info = Data(pn_condition_info(cond)) |
| if obj.info: |
| info.put_object(obj.info) |
| |
| def cond2obj(cond): |
| if pn_condition_is_set(cond): |
| return Condition(pn_condition_get_name(cond), |
| pn_condition_get_description(cond), |
| dat2obj(pn_condition_info(cond))) |
| else: |
| return None |
| |
| def dat2obj(dimpl): |
| if dimpl: |
| d = Data(dimpl) |
| d.rewind() |
| d.next() |
| obj = d.get_object() |
| d.rewind() |
| return obj |
| |
| def obj2dat(obj, dimpl): |
| if obj is not None: |
| d = Data(dimpl) |
| d.put_object(obj) |
| |
| def secs2millis(secs): |
| return long(secs*1000) |
| |
| def millis2secs(millis): |
| return float(millis)/1000.0 |
| |
| def timeout2millis(secs): |
| if secs is None: return PN_MILLIS_MAX |
| return secs2millis(secs) |
| |
| def millis2timeout(millis): |
| if millis == PN_MILLIS_MAX: return None |
| return millis2secs(millis) |
| |
| def unicode2utf8(string): |
| """Some Proton APIs expect a null terminated string. Convert python text |
| types to UTF8 to avoid zero bytes introduced by other multi-byte encodings. |
| This method will throw if the string cannot be converted. |
| """ |
| if string is None: |
| return None |
| if _compat.IS_PY2: |
| if isinstance(string, unicode): |
| return string.encode('utf-8') |
| elif isinstance(string, str): |
| return string |
| else: |
| # decoding a string results in bytes |
| if isinstance(string, str): |
| string = string.encode('utf-8') |
| # fall through |
| if isinstance(string, bytes): |
| return string.decode('utf-8') |
| raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string))) |
| |
| def utf82unicode(string): |
| """Covert C strings returned from proton-c into python unicode""" |
| if string is None: |
| return None |
| if isinstance(string, _compat.TEXT_TYPES): |
| # already unicode |
| return string |
| elif isinstance(string, _compat.BINARY_TYPES): |
| return string.decode('utf8') |
| else: |
| raise TypeError("Unrecognized string type") |
| |
| class Connection(Wrapper, Endpoint): |
| """ |
| A representation of an AMQP connection |
| """ |
| |
| @staticmethod |
| def wrap(impl): |
| if impl is None: |
| return None |
| else: |
| return Connection(impl) |
| |
| def __init__(self, impl = pn_connection): |
| Wrapper.__init__(self, impl, pn_connection_attachments) |
| |
| def _init(self): |
| Endpoint._init(self) |
| self.offered_capabilities = None |
| self.desired_capabilities = None |
| self.properties = None |
| |
| def _get_attachments(self): |
| return pn_connection_attachments(self._impl) |
| |
| @property |
| def connection(self): |
| return self |
| |
| @property |
| def transport(self): |
| return Transport.wrap(pn_connection_transport(self._impl)) |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, ConnectionException) |
| raise exc("[%s]: %s" % (err, pn_connection_error(self._impl))) |
| else: |
| return err |
| |
| def _get_cond_impl(self): |
| return pn_connection_condition(self._impl) |
| |
| def _get_remote_cond_impl(self): |
| return pn_connection_remote_condition(self._impl) |
| |
| def collect(self, collector): |
| if collector is None: |
| pn_connection_collect(self._impl, None) |
| else: |
| pn_connection_collect(self._impl, collector._impl) |
| self._collector = weakref.ref(collector) |
| |
| def _get_container(self): |
| return utf82unicode(pn_connection_get_container(self._impl)) |
| def _set_container(self, name): |
| return pn_connection_set_container(self._impl, unicode2utf8(name)) |
| |
| container = property(_get_container, _set_container) |
| |
| def _get_hostname(self): |
| return utf82unicode(pn_connection_get_hostname(self._impl)) |
| def _set_hostname(self, name): |
| return pn_connection_set_hostname(self._impl, unicode2utf8(name)) |
| |
| hostname = property(_get_hostname, _set_hostname, |
| doc=""" |
| Set the name of the host (either fully qualified or relative) to which this |
| connection is connecting to. This information may be used by the remote |
| peer to determine the correct back-end service to connect the client to. |
| This value will be sent in the Open performative, and will be used by SSL |
| and SASL layers to identify the peer. |
| """) |
| |
| def _get_user(self): |
| return utf82unicode(pn_connection_get_user(self._impl)) |
| def _set_user(self, name): |
| return pn_connection_set_user(self._impl, unicode2utf8(name)) |
| |
| user = property(_get_user, _set_user) |
| |
| def _get_password(self): |
| return None |
| def _set_password(self, name): |
| return pn_connection_set_password(self._impl, unicode2utf8(name)) |
| |
| password = property(_get_password, _set_password) |
| |
| @property |
| def remote_container(self): |
| """The container identifier specified by the remote peer for this connection.""" |
| return pn_connection_remote_container(self._impl) |
| |
| @property |
| def remote_hostname(self): |
| """The hostname specified by the remote peer for this connection.""" |
| return pn_connection_remote_hostname(self._impl) |
| |
| @property |
| def remote_offered_capabilities(self): |
| """The capabilities offered by the remote peer for this connection.""" |
| return dat2obj(pn_connection_remote_offered_capabilities(self._impl)) |
| |
| @property |
| def remote_desired_capabilities(self): |
| """The capabilities desired by the remote peer for this connection.""" |
| return dat2obj(pn_connection_remote_desired_capabilities(self._impl)) |
| |
| @property |
| def remote_properties(self): |
| """The properties specified by the remote peer for this connection.""" |
| return dat2obj(pn_connection_remote_properties(self._impl)) |
| |
| def open(self): |
| """ |
| Opens the connection. |
| |
| In more detail, this moves the local state of the connection to |
| the ACTIVE state and triggers an open frame to be sent to the |
| peer. A connection is fully active once both peers have opened it. |
| """ |
| obj2dat(self.offered_capabilities, |
| pn_connection_offered_capabilities(self._impl)) |
| obj2dat(self.desired_capabilities, |
| pn_connection_desired_capabilities(self._impl)) |
| obj2dat(self.properties, pn_connection_properties(self._impl)) |
| pn_connection_open(self._impl) |
| |
| def close(self): |
| """ |
| Closes the connection. |
| |
| In more detail, this moves the local state of the connection to |
| the CLOSED state and triggers a close frame to be sent to the |
| peer. A connection is fully closed once both peers have closed it. |
| """ |
| self._update_cond() |
| pn_connection_close(self._impl) |
| |
| @property |
| def state(self): |
| """ |
| The state of the connection as a bit field. The state has a local |
| and a remote component. Each of these can be in one of three |
| states: UNINIT, ACTIVE or CLOSED. These can be tested by masking |
| against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, |
| REMOTE_ACTIVE and REMOTE_CLOSED. |
| """ |
| return pn_connection_state(self._impl) |
| |
| def session(self): |
| """ |
| Returns a new session on this connection. |
| """ |
| ssn = pn_session(self._impl) |
| if ssn is None: |
| raise(SessionException("Session allocation failed.")) |
| else: |
| return Session(ssn) |
| |
| def session_head(self, mask): |
| return Session.wrap(pn_session_head(self._impl, mask)) |
| |
| def link_head(self, mask): |
| return Link.wrap(pn_link_head(self._impl, mask)) |
| |
| @property |
| def work_head(self): |
| return Delivery.wrap(pn_work_head(self._impl)) |
| |
| @property |
| def error(self): |
| return pn_error_code(pn_connection_error(self._impl)) |
| |
| def free(self): |
| pn_connection_release(self._impl) |
| |
| class SessionException(ProtonException): |
| pass |
| |
| class Session(Wrapper, Endpoint): |
| |
| @staticmethod |
| def wrap(impl): |
| if impl is None: |
| return None |
| else: |
| return Session(impl) |
| |
| def __init__(self, impl): |
| Wrapper.__init__(self, impl, pn_session_attachments) |
| |
| def _get_attachments(self): |
| return pn_session_attachments(self._impl) |
| |
| def _get_cond_impl(self): |
| return pn_session_condition(self._impl) |
| |
| def _get_remote_cond_impl(self): |
| return pn_session_remote_condition(self._impl) |
| |
| def _get_incoming_capacity(self): |
| return pn_session_get_incoming_capacity(self._impl) |
| |
| def _set_incoming_capacity(self, capacity): |
| pn_session_set_incoming_capacity(self._impl, capacity) |
| |
| incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) |
| |
| def _get_outgoing_window(self): |
| return pn_session_get_outgoing_window(self._impl) |
| |
| def _set_outgoing_window(self, window): |
| pn_session_set_outgoing_window(self._impl, window) |
| |
| outgoing_window = property(_get_outgoing_window, _set_outgoing_window) |
| |
| @property |
| def outgoing_bytes(self): |
| return pn_session_outgoing_bytes(self._impl) |
| |
| @property |
| def incoming_bytes(self): |
| return pn_session_incoming_bytes(self._impl) |
| |
| def open(self): |
| pn_session_open(self._impl) |
| |
| def close(self): |
| self._update_cond() |
| pn_session_close(self._impl) |
| |
| def next(self, mask): |
| return Session.wrap(pn_session_next(self._impl, mask)) |
| |
| @property |
| def state(self): |
| return pn_session_state(self._impl) |
| |
| @property |
| def connection(self): |
| return Connection.wrap(pn_session_connection(self._impl)) |
| |
| def sender(self, name): |
| return Sender(pn_sender(self._impl, unicode2utf8(name))) |
| |
| def receiver(self, name): |
| return Receiver(pn_receiver(self._impl, unicode2utf8(name))) |
| |
| def free(self): |
| pn_session_free(self._impl) |
| |
| class LinkException(ProtonException): |
| pass |
| |
| class Link(Wrapper, Endpoint): |
| """ |
| A representation of an AMQP link, of which there are two concrete |
| implementations, Sender and Receiver. |
| """ |
| |
| SND_UNSETTLED = PN_SND_UNSETTLED |
| SND_SETTLED = PN_SND_SETTLED |
| SND_MIXED = PN_SND_MIXED |
| |
| RCV_FIRST = PN_RCV_FIRST |
| RCV_SECOND = PN_RCV_SECOND |
| |
| @staticmethod |
| def wrap(impl): |
| if impl is None: return None |
| if pn_link_is_sender(impl): |
| return Sender(impl) |
| else: |
| return Receiver(impl) |
| |
| def __init__(self, impl): |
| Wrapper.__init__(self, impl, pn_link_attachments) |
| |
| def _get_attachments(self): |
| return pn_link_attachments(self._impl) |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, LinkException) |
| raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl)))) |
| else: |
| return err |
| |
| def _get_cond_impl(self): |
| return pn_link_condition(self._impl) |
| |
| def _get_remote_cond_impl(self): |
| return pn_link_remote_condition(self._impl) |
| |
| def open(self): |
| """ |
| Opens the link. |
| |
| In more detail, this moves the local state of the link to the |
| ACTIVE state and triggers an attach frame to be sent to the |
| peer. A link is fully active once both peers have attached it. |
| """ |
| pn_link_open(self._impl) |
| |
| def close(self): |
| """ |
| Closes the link. |
| |
| In more detail, this moves the local state of the link to the |
| CLOSED state and triggers an detach frame (with the closed flag |
| set) to be sent to the peer. A link is fully closed once both |
| peers have detached it. |
| """ |
| self._update_cond() |
| pn_link_close(self._impl) |
| |
| @property |
| def state(self): |
| """ |
| The state of the link as a bit field. The state has a local |
| and a remote component. Each of these can be in one of three |
| states: UNINIT, ACTIVE or CLOSED. These can be tested by masking |
| against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, |
| REMOTE_ACTIVE and REMOTE_CLOSED. |
| """ |
| return pn_link_state(self._impl) |
| |
| @property |
| def source(self): |
| """The source of the link as described by the local peer.""" |
| return Terminus(pn_link_source(self._impl)) |
| |
| @property |
| def target(self): |
| """The target of the link as described by the local peer.""" |
| return Terminus(pn_link_target(self._impl)) |
| |
| @property |
| def remote_source(self): |
| """The source of the link as described by the remote peer.""" |
| return Terminus(pn_link_remote_source(self._impl)) |
| @property |
| def remote_target(self): |
| """The target of the link as described by the remote peer.""" |
| return Terminus(pn_link_remote_target(self._impl)) |
| |
| @property |
| def session(self): |
| return Session.wrap(pn_link_session(self._impl)) |
| |
| @property |
| def connection(self): |
| """The connection on which this link was attached.""" |
| return self.session.connection |
| |
| def delivery(self, tag): |
| return Delivery(pn_delivery(self._impl, tag)) |
| |
| @property |
| def current(self): |
| return Delivery.wrap(pn_link_current(self._impl)) |
| |
| def advance(self): |
| return pn_link_advance(self._impl) |
| |
| @property |
| def unsettled(self): |
| return pn_link_unsettled(self._impl) |
| |
| @property |
| def credit(self): |
| """The amount of oustanding credit on this link.""" |
| return pn_link_credit(self._impl) |
| |
| @property |
| def available(self): |
| return pn_link_available(self._impl) |
| |
| @property |
| def queued(self): |
| return pn_link_queued(self._impl) |
| |
| def next(self, mask): |
| return Link.wrap(pn_link_next(self._impl, mask)) |
| |
| @property |
| def name(self): |
| """Returns the name of the link""" |
| return utf82unicode(pn_link_name(self._impl)) |
| |
| @property |
| def is_sender(self): |
| """Returns true if this link is a sender.""" |
| return pn_link_is_sender(self._impl) |
| |
| @property |
| def is_receiver(self): |
| """Returns true if this link is a receiver.""" |
| return pn_link_is_receiver(self._impl) |
| |
| @property |
| def remote_snd_settle_mode(self): |
| return pn_link_remote_snd_settle_mode(self._impl) |
| |
| @property |
| def remote_rcv_settle_mode(self): |
| return pn_link_remote_rcv_settle_mode(self._impl) |
| |
| def _get_snd_settle_mode(self): |
| return pn_link_snd_settle_mode(self._impl) |
| def _set_snd_settle_mode(self, mode): |
| pn_link_set_snd_settle_mode(self._impl, mode) |
| snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode) |
| |
| def _get_rcv_settle_mode(self): |
| return pn_link_rcv_settle_mode(self._impl) |
| def _set_rcv_settle_mode(self, mode): |
| pn_link_set_rcv_settle_mode(self._impl, mode) |
| rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode) |
| |
| def _get_drain(self): |
| return pn_link_get_drain(self._impl) |
| |
| def _set_drain(self, b): |
| pn_link_set_drain(self._impl, bool(b)) |
| |
| drain_mode = property(_get_drain, _set_drain) |
| |
| def drained(self): |
| return pn_link_drained(self._impl) |
| |
| @property |
| def remote_max_message_size(self): |
| return pn_link_remote_max_message_size(self._impl) |
| |
| def _get_max_message_size(self): |
| return pn_link_max_message_size(self._impl) |
| def _set_max_message_size(self, mode): |
| pn_link_set_max_message_size(self._impl, mode) |
| max_message_size = property(_get_max_message_size, _set_max_message_size) |
| |
| def detach(self): |
| return pn_link_detach(self._impl) |
| |
| def free(self): |
| pn_link_free(self._impl) |
| |
| class Terminus(object): |
| |
| UNSPECIFIED = PN_UNSPECIFIED |
| SOURCE = PN_SOURCE |
| TARGET = PN_TARGET |
| COORDINATOR = PN_COORDINATOR |
| |
| NONDURABLE = PN_NONDURABLE |
| CONFIGURATION = PN_CONFIGURATION |
| DELIVERIES = PN_DELIVERIES |
| |
| DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED |
| DIST_MODE_COPY = PN_DIST_MODE_COPY |
| DIST_MODE_MOVE = PN_DIST_MODE_MOVE |
| |
| EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK |
| EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION |
| EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION |
| EXPIRE_NEVER = PN_EXPIRE_NEVER |
| |
| def __init__(self, impl): |
| self._impl = impl |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, LinkException) |
| raise exc("[%s]" % err) |
| else: |
| return err |
| |
| def _get_type(self): |
| return pn_terminus_get_type(self._impl) |
| def _set_type(self, type): |
| self._check(pn_terminus_set_type(self._impl, type)) |
| type = property(_get_type, _set_type) |
| |
| def _get_address(self): |
| """The address that identifies the source or target node""" |
| return utf82unicode(pn_terminus_get_address(self._impl)) |
| def _set_address(self, address): |
| self._check(pn_terminus_set_address(self._impl, unicode2utf8(address))) |
| address = property(_get_address, _set_address) |
| |
| def _get_durability(self): |
| return pn_terminus_get_durability(self._impl) |
| def _set_durability(self, seconds): |
| self._check(pn_terminus_set_durability(self._impl, seconds)) |
| durability = property(_get_durability, _set_durability) |
| |
| def _get_expiry_policy(self): |
| return pn_terminus_get_expiry_policy(self._impl) |
| def _set_expiry_policy(self, seconds): |
| self._check(pn_terminus_set_expiry_policy(self._impl, seconds)) |
| expiry_policy = property(_get_expiry_policy, _set_expiry_policy) |
| |
| def _get_timeout(self): |
| return pn_terminus_get_timeout(self._impl) |
| def _set_timeout(self, seconds): |
| self._check(pn_terminus_set_timeout(self._impl, seconds)) |
| timeout = property(_get_timeout, _set_timeout) |
| |
| def _is_dynamic(self): |
| """Indicates whether the source or target node was dynamically |
| created""" |
| return pn_terminus_is_dynamic(self._impl) |
| def _set_dynamic(self, dynamic): |
| self._check(pn_terminus_set_dynamic(self._impl, dynamic)) |
| dynamic = property(_is_dynamic, _set_dynamic) |
| |
| def _get_distribution_mode(self): |
| return pn_terminus_get_distribution_mode(self._impl) |
| def _set_distribution_mode(self, mode): |
| self._check(pn_terminus_set_distribution_mode(self._impl, mode)) |
| distribution_mode = property(_get_distribution_mode, _set_distribution_mode) |
| |
| @property |
| def properties(self): |
| """Properties of a dynamic source or target.""" |
| return Data(pn_terminus_properties(self._impl)) |
| |
| @property |
| def capabilities(self): |
| """Capabilities of the source or target.""" |
| return Data(pn_terminus_capabilities(self._impl)) |
| |
| @property |
| def outcomes(self): |
| return Data(pn_terminus_outcomes(self._impl)) |
| |
| @property |
| def filter(self): |
| """A filter on a source allows the set of messages transfered over |
| the link to be restricted""" |
| return Data(pn_terminus_filter(self._impl)) |
| |
| def copy(self, src): |
| self._check(pn_terminus_copy(self._impl, src._impl)) |
| |
| class Sender(Link): |
| """ |
| A link over which messages are sent. |
| """ |
| |
| def offered(self, n): |
| pn_link_offered(self._impl, n) |
| |
| def stream(self, data): |
| """ |
| Send specified data as part of the current delivery |
| |
| @type data: binary |
| @param data: data to send |
| """ |
| return self._check(pn_link_send(self._impl, data)) |
| |
| def send(self, obj, tag=None): |
| """ |
| Send specified object over this sender; the object is expected to |
| have a send() method on it that takes the sender and an optional |
| tag as arguments. |
| |
| Where the object is a Message, this will send the message over |
| this link, creating a new delivery for the purpose. |
| """ |
| if hasattr(obj, 'send'): |
| return obj.send(self, tag=tag) |
| else: |
| # treat object as bytes |
| return self.stream(obj) |
| |
| def delivery_tag(self): |
| if not hasattr(self, 'tag_generator'): |
| def simple_tags(): |
| count = 1 |
| while True: |
| yield str(count) |
| count += 1 |
| self.tag_generator = simple_tags() |
| return next(self.tag_generator) |
| |
| class Receiver(Link): |
| """ |
| A link over which messages are received. |
| """ |
| |
| def flow(self, n): |
| """Increases the credit issued to the remote sender by the specified number of messages.""" |
| pn_link_flow(self._impl, n) |
| |
| def recv(self, limit): |
| n, binary = pn_link_recv(self._impl, limit) |
| if n == PN_EOS: |
| return None |
| else: |
| self._check(n) |
| return binary |
| |
| def drain(self, n): |
| pn_link_drain(self._impl, n) |
| |
| def draining(self): |
| return pn_link_draining(self._impl) |
| |
| class NamedInt(int): |
| |
| values = {} |
| |
| def __new__(cls, i, name): |
| ni = super(NamedInt, cls).__new__(cls, i) |
| cls.values[i] = ni |
| return ni |
| |
| def __init__(self, i, name): |
| self.name = name |
| |
| def __repr__(self): |
| return self.name |
| |
| def __str__(self): |
| return self.name |
| |
| @classmethod |
| def get(cls, i): |
| return cls.values.get(i, i) |
| |
| class DispositionType(NamedInt): |
| values = {} |
| |
| class Disposition(object): |
| |
| RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED") |
| ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED") |
| REJECTED = DispositionType(PN_REJECTED, "REJECTED") |
| RELEASED = DispositionType(PN_RELEASED, "RELEASED") |
| MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED") |
| |
| def __init__(self, impl, local): |
| self._impl = impl |
| self.local = local |
| self._data = None |
| self._condition = None |
| self._annotations = None |
| |
| @property |
| def type(self): |
| return DispositionType.get(pn_disposition_type(self._impl)) |
| |
| def _get_section_number(self): |
| return pn_disposition_get_section_number(self._impl) |
| def _set_section_number(self, n): |
| pn_disposition_set_section_number(self._impl, n) |
| section_number = property(_get_section_number, _set_section_number) |
| |
| def _get_section_offset(self): |
| return pn_disposition_get_section_offset(self._impl) |
| def _set_section_offset(self, n): |
| pn_disposition_set_section_offset(self._impl, n) |
| section_offset = property(_get_section_offset, _set_section_offset) |
| |
| def _get_failed(self): |
| return pn_disposition_is_failed(self._impl) |
| def _set_failed(self, b): |
| pn_disposition_set_failed(self._impl, b) |
| failed = property(_get_failed, _set_failed) |
| |
| def _get_undeliverable(self): |
| return pn_disposition_is_undeliverable(self._impl) |
| def _set_undeliverable(self, b): |
| pn_disposition_set_undeliverable(self._impl, b) |
| undeliverable = property(_get_undeliverable, _set_undeliverable) |
| |
| def _get_data(self): |
| if self.local: |
| return self._data |
| else: |
| return dat2obj(pn_disposition_data(self._impl)) |
| def _set_data(self, obj): |
| if self.local: |
| self._data = obj |
| else: |
| raise AttributeError("data attribute is read-only") |
| data = property(_get_data, _set_data) |
| |
| def _get_annotations(self): |
| if self.local: |
| return self._annotations |
| else: |
| return dat2obj(pn_disposition_annotations(self._impl)) |
| def _set_annotations(self, obj): |
| if self.local: |
| self._annotations = obj |
| else: |
| raise AttributeError("annotations attribute is read-only") |
| annotations = property(_get_annotations, _set_annotations) |
| |
| def _get_condition(self): |
| if self.local: |
| return self._condition |
| else: |
| return cond2obj(pn_disposition_condition(self._impl)) |
| def _set_condition(self, obj): |
| if self.local: |
| self._condition = obj |
| else: |
| raise AttributeError("condition attribute is read-only") |
| condition = property(_get_condition, _set_condition) |
| |
| class Delivery(Wrapper): |
| """ |
| Tracks and/or records the delivery of a message over a link. |
| """ |
| |
| RECEIVED = Disposition.RECEIVED |
| ACCEPTED = Disposition.ACCEPTED |
| REJECTED = Disposition.REJECTED |
| RELEASED = Disposition.RELEASED |
| MODIFIED = Disposition.MODIFIED |
| |
| @staticmethod |
| def wrap(impl): |
| if impl is None: |
| return None |
| else: |
| return Delivery(impl) |
| |
| def __init__(self, impl): |
| Wrapper.__init__(self, impl, pn_delivery_attachments) |
| |
| def _init(self): |
| self.local = Disposition(pn_delivery_local(self._impl), True) |
| self.remote = Disposition(pn_delivery_remote(self._impl), False) |
| |
| @property |
| def tag(self): |
| """The identifier for the delivery.""" |
| return pn_delivery_tag(self._impl) |
| |
| @property |
| def writable(self): |
| """Returns true for an outgoing delivery to which data can now be written.""" |
| return pn_delivery_writable(self._impl) |
| |
| @property |
| def readable(self): |
| """Returns true for an incoming delivery that has data to read.""" |
| return pn_delivery_readable(self._impl) |
| |
| @property |
| def updated(self): |
| """Returns true if the state of the delivery has been updated |
| (e.g. it has been settled and/or accepted, rejected etc).""" |
| return pn_delivery_updated(self._impl) |
| |
| def update(self, state): |
| """ |
| Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED. |
| """ |
| obj2dat(self.local._data, pn_disposition_data(self.local._impl)) |
| obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) |
| obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) |
| pn_delivery_update(self._impl, state) |
| |
| @property |
| def pending(self): |
| return pn_delivery_pending(self._impl) |
| |
| @property |
| def partial(self): |
| """ |
| Returns true for an incoming delivery if not all the data is |
| yet available. |
| """ |
| return pn_delivery_partial(self._impl) |
| |
| @property |
| def local_state(self): |
| """Returns the local state of the delivery.""" |
| return DispositionType.get(pn_delivery_local_state(self._impl)) |
| |
| @property |
| def remote_state(self): |
| """ |
| Returns the state of the delivery as indicated by the remote |
| peer. |
| """ |
| return DispositionType.get(pn_delivery_remote_state(self._impl)) |
| |
| @property |
| def settled(self): |
| """ |
| Returns true if the delivery has been settled by the remote peer. |
| """ |
| return pn_delivery_settled(self._impl) |
| |
| def settle(self): |
| """ |
| Settles the delivery locally. This indicates the aplication |
| considers the delivery complete and does not wish to receive any |
| further events about it. Every delivery should be settled locally. |
| """ |
| pn_delivery_settle(self._impl) |
| |
| @property |
| def work_next(self): |
| return Delivery.wrap(pn_work_next(self._impl)) |
| |
| @property |
| def link(self): |
| """ |
| Returns the link on which the delivery was sent or received. |
| """ |
| return Link.wrap(pn_delivery_link(self._impl)) |
| |
| @property |
| def session(self): |
| """ |
| Returns the session over which the delivery was sent or received. |
| """ |
| return self.link.session |
| |
| @property |
| def connection(self): |
| """ |
| Returns the connection over which the delivery was sent or received. |
| """ |
| return self.session.connection |
| |
| @property |
| def transport(self): |
| return self.connection.transport |
| |
| class TransportException(ProtonException): |
| pass |
| |
| class TraceAdapter: |
| |
| def __init__(self, tracer): |
| self.tracer = tracer |
| |
| def __call__(self, trans_impl, message): |
| self.tracer(Transport.wrap(trans_impl), message) |
| |
| class Transport(Wrapper): |
| |
| TRACE_OFF = PN_TRACE_OFF |
| TRACE_DRV = PN_TRACE_DRV |
| TRACE_FRM = PN_TRACE_FRM |
| TRACE_RAW = PN_TRACE_RAW |
| |
| CLIENT = 1 |
| SERVER = 2 |
| |
| @staticmethod |
| def wrap(impl): |
| if impl is None: |
| return None |
| else: |
| return Transport(_impl=impl) |
| |
| def __init__(self, mode=None, _impl = pn_transport): |
| Wrapper.__init__(self, _impl, pn_transport_attachments) |
| if mode == Transport.SERVER: |
| pn_transport_set_server(self._impl) |
| elif mode is None or mode==Transport.CLIENT: |
| pass |
| else: |
| raise TransportException("Cannot initialise Transport from mode: %s" % str(mode)) |
| |
| def _init(self): |
| self._sasl = None |
| self._ssl = None |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, TransportException) |
| raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl)))) |
| else: |
| return err |
| |
| def _set_tracer(self, tracer): |
| pn_transport_set_pytracer(self._impl, TraceAdapter(tracer)); |
| |
| def _get_tracer(self): |
| adapter = pn_transport_get_pytracer(self._impl) |
| if adapter: |
| return adapter.tracer |
| else: |
| return None |
| |
| tracer = property(_get_tracer, _set_tracer, |
| doc=""" |
| A callback for trace logging. The callback is passed the transport and log message. |
| """) |
| |
| def log(self, message): |
| pn_transport_log(self._impl, message) |
| |
| def require_auth(self, bool): |
| pn_transport_require_auth(self._impl, bool) |
| |
| def bind(self, connection): |
| """Assign a connection to the transport""" |
| self._check(pn_transport_bind(self._impl, connection._impl)) |
| |
| def unbind(self): |
| """Release the connection""" |
| self._check(pn_transport_unbind(self._impl)) |
| |
| def trace(self, n): |
| pn_transport_trace(self._impl, n) |
| |
| def tick(self, now): |
| """Process any timed events (like heartbeat generation). |
| now = seconds since epoch (float). |
| """ |
| return millis2secs(pn_transport_tick(self._impl, secs2millis(now))) |
| |
| def capacity(self): |
| c = pn_transport_capacity(self._impl) |
| if c >= PN_EOS: |
| return c |
| else: |
| return self._check(c) |
| |
| def push(self, binary): |
| n = self._check(pn_transport_push(self._impl, binary)) |
| if n != len(binary): |
| raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary))) |
| |
| def close_tail(self): |
| self._check(pn_transport_close_tail(self._impl)) |
| |
| def pending(self): |
| p = pn_transport_pending(self._impl) |
| if p >= PN_EOS: |
| return p |
| else: |
| return self._check(p) |
| |
| def peek(self, size): |
| cd, out = pn_transport_peek(self._impl, size) |
| if cd == PN_EOS: |
| return None |
| else: |
| self._check(cd) |
| return out |
| |
| def pop(self, size): |
| pn_transport_pop(self._impl, size) |
| |
| def close_head(self): |
| self._check(pn_transport_close_head(self._impl)) |
| |
| @property |
| def closed(self): |
| return pn_transport_closed(self._impl) |
| |
| # AMQP 1.0 max-frame-size |
| def _get_max_frame_size(self): |
| return pn_transport_get_max_frame(self._impl) |
| |
| def _set_max_frame_size(self, value): |
| pn_transport_set_max_frame(self._impl, value) |
| |
| max_frame_size = property(_get_max_frame_size, _set_max_frame_size, |
| doc=""" |
| Sets the maximum size for received frames (in bytes). |
| """) |
| |
| @property |
| def remote_max_frame_size(self): |
| return pn_transport_get_remote_max_frame(self._impl) |
| |
| def _get_channel_max(self): |
| return pn_transport_get_channel_max(self._impl) |
| |
| def _set_channel_max(self, value): |
| if pn_transport_set_channel_max(self._impl, value): |
| raise SessionException("Too late to change channel max.") |
| |
| channel_max = property(_get_channel_max, _set_channel_max, |
| doc=""" |
| Sets the maximum channel that may be used on the transport. |
| """) |
| |
| @property |
| def remote_channel_max(self): |
| return pn_transport_remote_channel_max(self._impl) |
| |
| # AMQP 1.0 idle-time-out |
| def _get_idle_timeout(self): |
| return millis2secs(pn_transport_get_idle_timeout(self._impl)) |
| |
| def _set_idle_timeout(self, sec): |
| pn_transport_set_idle_timeout(self._impl, secs2millis(sec)) |
| |
| idle_timeout = property(_get_idle_timeout, _set_idle_timeout, |
| doc=""" |
| The idle timeout of the connection (float, in seconds). |
| """) |
| |
| @property |
| def remote_idle_timeout(self): |
| return millis2secs(pn_transport_get_remote_idle_timeout(self._impl)) |
| |
| @property |
| def frames_output(self): |
| return pn_transport_get_frames_output(self._impl) |
| |
| @property |
| def frames_input(self): |
| return pn_transport_get_frames_input(self._impl) |
| |
| def sasl(self): |
| return SASL(self) |
| |
| def ssl(self, domain=None, session_details=None): |
| # SSL factory (singleton for this transport) |
| if not self._ssl: |
| self._ssl = SSL(self, domain, session_details) |
| return self._ssl |
| |
| @property |
| def condition(self): |
| return cond2obj(pn_transport_condition(self._impl)) |
| |
| @property |
| def connection(self): |
| return Connection.wrap(pn_transport_connection(self._impl)) |
| |
| class SASLException(TransportException): |
| pass |
| |
| class SASL(Wrapper): |
| |
| OK = PN_SASL_OK |
| AUTH = PN_SASL_AUTH |
| SYS = PN_SASL_SYS |
| PERM = PN_SASL_PERM |
| TEMP = PN_SASL_TEMP |
| |
| @staticmethod |
| def extended(): |
| return pn_sasl_extended() |
| |
| def __init__(self, transport): |
| Wrapper.__init__(self, transport._impl, pn_transport_attachments) |
| self._sasl = pn_sasl(transport._impl) |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, SASLException) |
| raise exc("[%s]" % (err)) |
| else: |
| return err |
| |
| @property |
| def user(self): |
| return pn_sasl_get_user(self._sasl) |
| |
| @property |
| def mech(self): |
| return pn_sasl_get_mech(self._sasl) |
| |
| @property |
| def outcome(self): |
| outcome = pn_sasl_outcome(self._sasl) |
| if outcome == PN_SASL_NONE: |
| return None |
| else: |
| return outcome |
| |
| def allowed_mechs(self, mechs): |
| pn_sasl_allowed_mechs(self._sasl, mechs) |
| |
| def _get_allow_insecure_mechs(self): |
| return pn_sasl_get_allow_insecure_mechs(self._sasl) |
| |
| def _set_allow_insecure_mechs(self, insecure): |
| pn_sasl_set_allow_insecure_mechs(self._sasl, insecure) |
| |
| allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs, |
| doc=""" |
| Allow unencrypted cleartext passwords (PLAIN mech) |
| """) |
| |
| def done(self, outcome): |
| pn_sasl_done(self._sasl, outcome) |
| |
| def config_name(self, name): |
| pn_sasl_config_name(self._sasl, name) |
| |
| def config_path(self, path): |
| pn_sasl_config_path(self._sasl, path) |
| |
| class SSLException(TransportException): |
| pass |
| |
| class SSLUnavailable(SSLException): |
| pass |
| |
| class SSLDomain(object): |
| |
| MODE_CLIENT = PN_SSL_MODE_CLIENT |
| MODE_SERVER = PN_SSL_MODE_SERVER |
| VERIFY_PEER = PN_SSL_VERIFY_PEER |
| VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME |
| ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER |
| |
| def __init__(self, mode): |
| self._domain = pn_ssl_domain(mode) |
| if self._domain is None: |
| raise SSLUnavailable() |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, SSLException) |
| raise exc("SSL failure.") |
| else: |
| return err |
| |
| def set_credentials(self, cert_file, key_file, password): |
| return self._check( pn_ssl_domain_set_credentials(self._domain, |
| cert_file, key_file, |
| password) ) |
| def set_trusted_ca_db(self, certificate_db): |
| return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain, |
| certificate_db) ) |
| def set_peer_authentication(self, verify_mode, trusted_CAs=None): |
| return self._check( pn_ssl_domain_set_peer_authentication(self._domain, |
| verify_mode, |
| trusted_CAs) ) |
| |
| def allow_unsecured_client(self): |
| return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) ) |
| |
| def __del__(self): |
| pn_ssl_domain_free(self._domain) |
| |
| class SSL(object): |
| |
| @staticmethod |
| def present(): |
| return pn_ssl_present() |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, SSLException) |
| raise exc("SSL failure.") |
| else: |
| return err |
| |
| def __new__(cls, transport, domain, session_details=None): |
| """Enforce a singleton SSL object per Transport""" |
| if transport._ssl: |
| # unfortunately, we've combined the allocation and the configuration in a |
| # single step. So catch any attempt by the application to provide what |
| # may be a different configuration than the original (hack) |
| ssl = transport._ssl |
| if (domain and (ssl._domain is not domain) or |
| session_details and (ssl._session_details is not session_details)): |
| raise SSLException("Cannot re-configure existing SSL object!") |
| else: |
| obj = super(SSL, cls).__new__(cls) |
| obj._domain = domain |
| obj._session_details = session_details |
| session_id = None |
| if session_details: |
| session_id = session_details.get_session_id() |
| obj._ssl = pn_ssl( transport._impl ) |
| if obj._ssl is None: |
| raise SSLUnavailable() |
| if domain: |
| pn_ssl_init( obj._ssl, domain._domain, session_id ) |
| transport._ssl = obj |
| return transport._ssl |
| |
| def cipher_name(self): |
| rc, name = pn_ssl_get_cipher_name( self._ssl, 128 ) |
| if rc: |
| return name |
| return None |
| |
| def protocol_name(self): |
| rc, name = pn_ssl_get_protocol_name( self._ssl, 128 ) |
| if rc: |
| return name |
| return None |
| |
| SHA1 = PN_SSL_SHA1 |
| SHA256 = PN_SSL_SHA256 |
| SHA512 = PN_SSL_SHA512 |
| MD5 = PN_SSL_MD5 |
| |
| CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME |
| CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE |
| CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY |
| CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME |
| CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT |
| CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME |
| |
| def get_cert_subject_subfield(self, subfield_name): |
| subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name) |
| return subfield_value |
| |
| def get_cert_subject(self): |
| subject = pn_ssl_get_remote_subject(self._ssl) |
| return subject |
| |
| def _get_cert_subject_unknown_subfield(self): |
| # Pass in an unhandled enum |
| return self.get_cert_subject_subfield(10) |
| |
| # Convenience functions for obtaining the subfields of the subject field. |
| def get_cert_common_name(self): |
| return self.get_cert_subject_subfield(SSL.CERT_COMMON_NAME) |
| |
| def get_cert_organization(self): |
| return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_NAME) |
| |
| def get_cert_organization_unit(self): |
| return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_UNIT) |
| |
| def get_cert_locality_or_city(self): |
| return self.get_cert_subject_subfield(SSL.CERT_CITY_OR_LOCALITY) |
| |
| def get_cert_country(self): |
| return self.get_cert_subject_subfield(SSL.CERT_COUNTRY_NAME) |
| |
| def get_cert_state_or_province(self): |
| return self.get_cert_subject_subfield(SSL.CERT_STATE_OR_PROVINCE) |
| |
| def get_cert_fingerprint(self, fingerprint_length, digest_name): |
| rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name) |
| if rc == PN_OK: |
| return fingerprint_str |
| return None |
| |
| # Convenience functions for obtaining fingerprint for specific hashing algorithms |
| def _get_cert_fingerprint_unknown_hash_alg(self): |
| return self.get_cert_fingerprint(41, 10) |
| |
| def get_cert_fingerprint_sha1(self): |
| return self.get_cert_fingerprint(41, SSL.SHA1) |
| |
| def get_cert_fingerprint_sha256(self): |
| # sha256 produces a fingerprint that is 64 characters long |
| return self.get_cert_fingerprint(65, SSL.SHA256) |
| |
| def get_cert_fingerprint_sha512(self): |
| # sha512 produces a fingerprint that is 128 characters long |
| return self.get_cert_fingerprint(129, SSL.SHA512) |
| |
| def get_cert_fingerprint_md5(self): |
| return self.get_cert_fingerprint(33, SSL.MD5) |
| |
| @property |
| def remote_subject(self): |
| return pn_ssl_get_remote_subject( self._ssl ) |
| |
| RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN |
| RESUME_NEW = PN_SSL_RESUME_NEW |
| RESUME_REUSED = PN_SSL_RESUME_REUSED |
| |
| def resume_status(self): |
| return pn_ssl_resume_status( self._ssl ) |
| |
| def _set_peer_hostname(self, hostname): |
| self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) )) |
| def _get_peer_hostname(self): |
| err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 ) |
| self._check(err) |
| return utf82unicode(name) |
| peer_hostname = property(_get_peer_hostname, _set_peer_hostname, |
| doc=""" |
| Manage the expected name of the remote peer. Used to authenticate the remote. |
| """) |
| |
| |
| class SSLSessionDetails(object): |
| """ Unique identifier for the SSL session. Used to resume previous session on a new |
| SSL connection. |
| """ |
| |
| def __init__(self, session_id): |
| self._session_id = session_id |
| |
| def get_session_id(self): |
| return self._session_id |
| |
| |
| wrappers = { |
| "pn_void": lambda x: pn_void2py(x), |
| "pn_pyref": lambda x: pn_void2py(x), |
| "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)), |
| "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)), |
| "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)), |
| "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)), |
| "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)), |
| "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x)) |
| } |
| |
| class Collector: |
| |
| def __init__(self): |
| self._impl = pn_collector() |
| |
| def put(self, obj, etype): |
| pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number) |
| |
| def peek(self): |
| return Event.wrap(pn_collector_peek(self._impl)) |
| |
| def pop(self): |
| ev = self.peek() |
| pn_collector_pop(self._impl) |
| |
| def __del__(self): |
| pn_collector_free(self._impl) |
| del self._impl |
| |
| if "TypeExtender" not in globals(): |
| class TypeExtender: |
| def __init__(self, number): |
| self.number = number |
| def next(self): |
| try: |
| return self.number |
| finally: |
| self.number += 1 |
| |
| class EventType(object): |
| |
| _lock = threading.Lock() |
| _extended = TypeExtender(10000) |
| TYPES = {} |
| |
| def __init__(self, name=None, number=None, method=None): |
| if name is None and number is None: |
| raise TypeError("extended events require a name") |
| try: |
| self._lock.acquire() |
| if name is None: |
| name = pn_event_type_name(number) |
| |
| if number is None: |
| number = self._extended.next() |
| |
| if method is None: |
| method = "on_%s" % name |
| |
| self.name = name |
| self.number = number |
| self.method = method |
| |
| self.TYPES[number] = self |
| finally: |
| self._lock.release() |
| |
| def __repr__(self): |
| return self.name |
| |
| def dispatch(handler, method, *args): |
| m = getattr(handler, method, None) |
| if m: |
| return m(*args) |
| elif hasattr(handler, "on_unhandled"): |
| return handler.on_unhandled(method, *args) |
| |
| class EventBase(object): |
| |
| def __init__(self, clazz, context, type): |
| self.clazz = clazz |
| self.context = context |
| self.type = type |
| |
| def dispatch(self, handler): |
| return dispatch(handler, self.type.method, self) |
| |
| def _none(x): return None |
| |
| DELEGATED = Constant("DELEGATED") |
| |
| def _core(number, method): |
| return EventType(number=number, method=method) |
| |
| class Event(Wrapper, EventBase): |
| |
| REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init") |
| REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced") |
| REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final") |
| |
| TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task") |
| |
| CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init") |
| CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound") |
| CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound") |
| CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open") |
| CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close") |
| CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open") |
| CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close") |
| CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final") |
| |
| SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init") |
| SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open") |
| SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close") |
| SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open") |
| SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close") |
| SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final") |
| |
| LINK_INIT = _core(PN_LINK_INIT, "on_link_init") |
| LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open") |
| LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close") |
| LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach") |
| LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open") |
| LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close") |
| LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach") |
| LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow") |
| LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final") |
| |
| DELIVERY = _core(PN_DELIVERY, "on_delivery") |
| |
| TRANSPORT = _core(PN_TRANSPORT, "on_transport") |
| TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error") |
| TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed") |
| TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed") |
| TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed") |
| |
| SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init") |
| SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated") |
| SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable") |
| SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable") |
| SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired") |
| SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error") |
| SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final") |
| |
| @staticmethod |
| def wrap(impl, number=None): |
| if impl is None: |
| return None |
| |
| if number is None: |
| number = pn_event_type(impl) |
| |
| event = Event(impl, number) |
| |
| # check for an application defined ApplicationEvent and return that. This |
| # avoids an expensive wrap operation invoked by event.context |
| if pn_event_class(impl) == PN_PYREF and \ |
| isinstance(event.context, EventBase): |
| return event.context |
| else: |
| return event |
| |
| def __init__(self, impl, number): |
| Wrapper.__init__(self, impl, pn_event_attachments) |
| self.__dict__["type"] = EventType.TYPES[number] |
| |
| def _init(self): |
| pass |
| |
| def copy(self): |
| copy = pn_event_copy(self._impl) |
| return Event.wrap(copy) |
| |
| @property |
| def clazz(self): |
| cls = pn_event_class(self._impl) |
| if cls: |
| return pn_class_name(cls) |
| else: |
| return None |
| |
| @property |
| def root(self): |
| return WrappedHandler.wrap(pn_event_root(self._impl)) |
| |
| @property |
| def context(self): |
| """Returns the context object associated with the event. The type of this depend on the type of event.""" |
| return wrappers[self.clazz](pn_event_context(self._impl)) |
| |
| def dispatch(self, handler, type=None): |
| type = type or self.type |
| if isinstance(handler, WrappedHandler): |
| pn_handler_dispatch(handler._impl, self._impl, type.number) |
| else: |
| result = dispatch(handler, type.method, self) |
| if result != DELEGATED and hasattr(handler, "handlers"): |
| for h in handler.handlers: |
| self.dispatch(h, type) |
| |
| |
| @property |
| def reactor(self): |
| """Returns the reactor associated with the event.""" |
| return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl)) |
| |
| def __getattr__(self, name): |
| r = self.reactor |
| if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name: |
| return r |
| else: |
| return super(Event, self).__getattr__(name) |
| |
| @property |
| def transport(self): |
| """Returns the transport associated with the event, or null if none is associated with it.""" |
| return Transport.wrap(pn_event_transport(self._impl)) |
| |
| @property |
| def connection(self): |
| """Returns the connection associated with the event, or null if none is associated with it.""" |
| return Connection.wrap(pn_event_connection(self._impl)) |
| |
| @property |
| def session(self): |
| """Returns the session associated with the event, or null if none is associated with it.""" |
| return Session.wrap(pn_event_session(self._impl)) |
| |
| @property |
| def link(self): |
| """Returns the link associated with the event, or null if none is associated with it.""" |
| return Link.wrap(pn_event_link(self._impl)) |
| |
| @property |
| def sender(self): |
| """Returns the sender link associated with the event, or null if |
| none is associated with it. This is essentially an alias for |
| link(), that does an additional checkon the type of the |
| link.""" |
| l = self.link |
| if l and l.is_sender: |
| return l |
| else: |
| return None |
| |
| @property |
| def receiver(self): |
| """Returns the receiver link associated with the event, or null if |
| none is associated with it. This is essentially an alias for |
| link(), that does an additional checkon the type of the link.""" |
| l = self.link |
| if l and l.is_receiver: |
| return l |
| else: |
| return None |
| |
| @property |
| def delivery(self): |
| """Returns the delivery associated with the event, or null if none is associated with it.""" |
| return Delivery.wrap(pn_event_delivery(self._impl)) |
| |
| def __repr__(self): |
| return "%s(%s)" % (self.type, self.context) |
| |
| class LazyHandlers(object): |
| def __get__(self, obj, clazz): |
| if obj is None: |
| return self |
| ret = [] |
| obj.__dict__['handlers'] = ret |
| return ret |
| |
| class Handler(object): |
| handlers = LazyHandlers() |
| |
| def on_unhandled(self, method, *args): |
| pass |
| |
| class _cadapter: |
| |
| def __init__(self, handler, on_error=None): |
| self.handler = handler |
| self.on_error = on_error |
| |
| def dispatch(self, cevent, ctype): |
| ev = Event.wrap(cevent, ctype) |
| ev.dispatch(self.handler) |
| |
| def exception(self, exc, val, tb): |
| if self.on_error is None: |
| _compat.raise_(exc, val, tb) |
| else: |
| self.on_error((exc, val, tb)) |
| |
| class WrappedHandlersChildSurrogate: |
| def __init__(self, delegate): |
| self.handlers = [] |
| self.delegate = weakref.ref(delegate) |
| |
| def on_unhandled(self, method, event): |
| delegate = self.delegate() |
| if delegate: |
| dispatch(delegate, method, event) |
| |
| |
| class WrappedHandlersProperty(object): |
| def __get__(self, obj, clazz): |
| if obj is None: |
| return None |
| return self.surrogate(obj).handlers |
| |
| def __set__(self, obj, value): |
| self.surrogate(obj).handlers = value |
| |
| def surrogate(self, obj): |
| key = "_surrogate" |
| objdict = obj.__dict__ |
| surrogate = objdict.get(key, None) |
| if surrogate is None: |
| objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj) |
| obj.add(surrogate) |
| return surrogate |
| |
| class WrappedHandler(Wrapper): |
| |
| handlers = WrappedHandlersProperty() |
| |
| @classmethod |
| def wrap(cls, impl, on_error=None): |
| if impl is None: |
| return None |
| else: |
| handler = cls(impl) |
| handler.__dict__["on_error"] = on_error |
| return handler |
| |
| def __init__(self, impl_or_constructor): |
| Wrapper.__init__(self, impl_or_constructor) |
| if list(self.__class__.__mro__).index(WrappedHandler) > 1: |
| # instantiate the surrogate |
| self.handlers.extend([]) |
| |
| def _on_error(self, info): |
| on_error = getattr(self, "on_error", None) |
| if on_error is None: |
| _compat.raise_(info[0], info[1], info[2]) |
| else: |
| on_error(info) |
| |
| def add(self, handler): |
| if handler is None: return |
| impl = _chandler(handler, self._on_error) |
| pn_handler_add(self._impl, impl) |
| pn_decref(impl) |
| |
| def clear(self): |
| pn_handler_clear(self._impl) |
| |
| def _chandler(obj, on_error=None): |
| if obj is None: |
| return None |
| elif isinstance(obj, WrappedHandler): |
| impl = obj._impl |
| pn_incref(impl) |
| return impl |
| else: |
| return pn_pyhandler(_cadapter(obj, on_error)) |
| |
| class Url(object): |
| """ |
| Simple URL parser/constructor, handles URLs of the form: |
| |
| <scheme>://<user>:<password>@<host>:<port>/<path> |
| |
| All components can be None if not specifeid in the URL string. |
| |
| The port can be specified as a service name, e.g. 'amqp' in the |
| URL string but Url.port always gives the integer value. |
| |
| @ivar scheme: Url scheme e.g. 'amqp' or 'amqps' |
| @ivar user: Username |
| @ivar password: Password |
| @ivar host: Host name, ipv6 literal or ipv4 dotted quad. |
| @ivar port: Integer port. |
| @ivar host_port: Returns host:port |
| """ |
| |
| AMQPS = "amqps" |
| AMQP = "amqp" |
| |
| class Port(int): |
| """An integer port number that can be constructed from a service name string""" |
| |
| def __new__(cls, value): |
| """@param value: integer port number or string service name.""" |
| port = super(Url.Port, cls).__new__(cls, cls._port_int(value)) |
| setattr(port, 'name', str(value)) |
| return port |
| |
| def __eq__(self, x): return str(self) == x or int(self) == x |
| def __ne__(self, x): return not self == x |
| def __str__(self): return str(self.name) |
| |
| @staticmethod |
| def _port_int(value): |
| """Convert service, an integer or a service name, into an integer port number.""" |
| try: |
| return int(value) |
| except ValueError: |
| try: |
| return socket.getservbyname(value) |
| except socket.error: |
| # Not every system has amqp/amqps defined as a service |
| if value == Url.AMQPS: return 5671 |
| elif value == Url.AMQP: return 5672 |
| else: |
| raise ValueError("Not a valid port number or service name: '%s'" % value) |
| |
| def __init__(self, url=None, defaults=True, **kwargs): |
| """ |
| @param url: URL string to parse. |
| @param defaults: If true, fill in missing default values in the URL. |
| If false, you can fill them in later by calling self.defaults() |
| @param kwargs: scheme, user, password, host, port, path. |
| If specified, replaces corresponding part in url string. |
| """ |
| if url: |
| self._url = pn_url_parse(unicode2utf8(str(url))) |
| if not self._url: raise ValueError("Invalid URL '%s'" % url) |
| else: |
| self._url = pn_url() |
| for k in kwargs: # Let kwargs override values parsed from url |
| getattr(self, k) # Check for invalid kwargs |
| setattr(self, k, kwargs[k]) |
| if defaults: self.defaults() |
| |
| class PartDescriptor(object): |
| def __init__(self, part): |
| self.getter = globals()["pn_url_get_%s" % part] |
| self.setter = globals()["pn_url_set_%s" % part] |
| def __get__(self, obj, type=None): return self.getter(obj._url) |
| def __set__(self, obj, value): return self.setter(obj._url, str(value)) |
| |
| scheme = PartDescriptor('scheme') |
| username = PartDescriptor('username') |
| password = PartDescriptor('password') |
| host = PartDescriptor('host') |
| path = PartDescriptor('path') |
| |
| def _get_port(self): |
| portstr = pn_url_get_port(self._url) |
| return portstr and Url.Port(portstr) |
| |
| def _set_port(self, value): |
| if value is None: pn_url_set_port(self._url, None) |
| else: pn_url_set_port(self._url, str(Url.Port(value))) |
| |
| port = property(_get_port, _set_port) |
| |
| def __str__(self): return pn_url_str(self._url) |
| |
| def __repr__(self): return "Url(%r)" % str(self) |
| |
| def __eq__(self, x): return str(self) == str(x) |
| def __ne__(self, x): return not self == x |
| |
| def __del__(self): |
| pn_url_free(self._url); |
| del self._url |
| |
| def defaults(self): |
| """ |
| Fill in missing values (scheme, host or port) with defaults |
| @return: self |
| """ |
| self.scheme = self.scheme or self.AMQP |
| self.host = self.host or '0.0.0.0' |
| self.port = self.port or self.Port(self.scheme) |
| return self |
| |
| __all__ = [ |
| "API_LANGUAGE", |
| "IMPLEMENTATION_LANGUAGE", |
| "ABORTED", |
| "ACCEPTED", |
| "AUTOMATIC", |
| "PENDING", |
| "MANUAL", |
| "REJECTED", |
| "RELEASED", |
| "MODIFIED", |
| "SETTLED", |
| "UNDESCRIBED", |
| "Array", |
| "Collector", |
| "Condition", |
| "Connection", |
| "Data", |
| "Delivery", |
| "Disposition", |
| "Described", |
| "Endpoint", |
| "Event", |
| "EventType", |
| "Handler", |
| "Link", |
| "Message", |
| "MessageException", |
| "ProtonException", |
| "VERSION_MAJOR", |
| "VERSION_MINOR", |
| "Receiver", |
| "SASL", |
| "Sender", |
| "Session", |
| "SessionException", |
| "SSL", |
| "SSLDomain", |
| "SSLSessionDetails", |
| "SSLUnavailable", |
| "SSLException", |
| "Terminus", |
| "Timeout", |
| "Interrupt", |
| "Transport", |
| "TransportException", |
| "Url", |
| "char", |
| "dispatch", |
| "symbol", |
| "timestamp", |
| "ulong", |
| "byte", |
| "short", |
| "int32", |
| "ubyte", |
| "ushort", |
| "uint", |
| "float32", |
| "decimal32", |
| "decimal64", |
| "decimal128" |
| ] |