| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| """ |
| The proton module defines a suite of APIs that implement the AMQP 1.0 |
| protocol. |
| |
| The proton APIs consist of the following classes: |
| |
| - L{Messenger} -- A messaging endpoint. |
| - L{Message} -- A class for creating and/or accessing AMQP message content. |
| - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded |
| data. |
| |
| """ |
| |
| from cproton import * |
| import uuid |
| |
| class ProtonException(Exception): |
| """ |
| The root of the proton exception hierarchy. All proton exception |
| classes derive from this exception. |
| """ |
| pass |
| |
| class Timeout(ProtonException): |
| """ |
| A timeout exception indicates that a blocking operation has timed |
| out. |
| """ |
| pass |
| |
| class MessengerException(ProtonException): |
| """ |
| The root of the messenger exception hierarchy. All exceptions |
| generated by the messenger class derive from this exception. |
| """ |
| pass |
| |
| class MessageException(ProtonException): |
| """ |
| The MessageException class is the root of the message exception |
| hierarhcy. All exceptions generated by the Message class derive from |
| this exception. |
| """ |
| pass |
| |
| EXCEPTIONS = { |
| PN_TIMEOUT: Timeout |
| } |
| |
| class Messenger(object): |
| """ |
| The L{Messenger} class defines a high level interface for sending |
| and receiving L{Messages<Message>}. Every L{Messenger} contains a |
| single logical queue of incoming messages and a single logical queue |
| of outgoing messages. These messages in these queues may be destined |
| for, or originate from, a variety of addresses. |
| |
| Address Syntax |
| ============== |
| |
| An address has the following form:: |
| |
| [ amqp[s]:// ] [user[:password]@] domain [/[name]] |
| |
| Where domain can be one of:: |
| |
| host | host:port | ip | ip:port | name |
| |
| The following are valid examples of addresses: |
| |
| - example.org |
| - example.org:1234 |
| - amqp://example.org |
| - amqps://example.org |
| - example.org/incoming |
| - amqps://example.org/outgoing |
| - amqps://fred:trustno1@example.org |
| - 127.0.0.1:1234 |
| - amqps://127.0.0.1:1234 |
| |
| Sending & Receiving Messages |
| ============================ |
| |
| The L{Messenger} class works in conjuction with the L{Message} |
| class. The L{Message} class is a mutable holder of message content. |
| The L{put} method will encode the content in a given L{Message} |
| object into the outgoing message queue leaving that L{Message} |
| object free to be modified or discarded without having any impact on |
| the content in the outgoing queue. |
| |
| >>> message = Message() |
| >>> for i in range(3): |
| ... message.address = "amqp://host/queue" |
| ... message.subject = "Hello World %i" % i |
| ... messenger.put(message) |
| >>> messenger.send() |
| |
| Similarly, the L{get} method will decode the content in the incoming |
| message queue into the supplied L{Message} object. |
| |
| >>> message = Message() |
| >>> messenger.recv(10): |
| >>> while messenger.incoming > 0: |
| ... messenger.get(message) |
| ... print message.subject |
| Hello World 0 |
| Hello World 1 |
| Hello World 2 |
| """ |
| |
| def __init__(self, name=None): |
| """ |
| Construct a new L{Messenger} with the given name. The name has |
| global scope. If a NULL name is supplied, a L{uuid.UUID} based |
| name will be chosen. |
| |
| @type name: string |
| @param name: the name of the messenger or None |
| """ |
| self._mng = pn_messenger(name) |
| |
| def __del__(self): |
| if hasattr(self, "_mng"): |
| pn_messenger_free(self._mng) |
| del self._mng |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, MessengerException) |
| raise exc("[%s]: %s" % (err, pn_messenger_error(self._mng))) |
| else: |
| return err |
| |
| @property |
| def name(self): |
| """ |
| The name of the L{Messenger}. |
| """ |
| return pn_messenger_name(self._mng) |
| |
| def _get_certificate(self): |
| return pn_messenger_get_certificate(self._mng) |
| |
| def _set_certificate(self, value): |
| self._check(pn_messenger_set_certificate(self._mng, value)) |
| |
| certificate = property(_get_certificate, _set_certificate, |
| doc=""" |
| Path to a certificate file for the L{Messenger}. This certificate is |
| used when the L{Messenger} accepts or establishes SSL/TLS connections. |
| This property must be specified for the L{Messenger} to accept |
| incoming SSL/TLS connections and to establish client authenticated |
| outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS |
| connections do not require this property. |
| """) |
| |
| def _get_private_key(self): |
| return pn_messenger_get_private_key(self._mng) |
| |
| def _set_private_key(self, value): |
| self._check(pn_messenger_set_private_key(self._mng, value)) |
| |
| private_key = property(_get_private_key, _set_private_key, |
| doc=""" |
| Path to a private key file for the L{Messenger's<Messenger>} |
| certificate. This property must be specified for the L{Messenger} to |
| accept incoming SSL/TLS connections and to establish client |
| authenticated outgoing SSL/TLS connection. Non client authenticated |
| SSL/TLS connections do not require this property. |
| """) |
| |
| def _get_password(self): |
| return pn_messenger_get_password(self._mng) |
| |
| def _set_password(self, value): |
| self._check(pn_messenger_set_password(self._mng, value)) |
| |
| password = property(_get_password, _set_password, |
| doc=""" |
| This property contains the password for the L{Messenger.private_key} |
| file, or None if the file is not encrypted. |
| """) |
| |
| def _get_trusted_certificates(self): |
| return pn_messenger_get_trusted_certificates(self._mng) |
| |
| def _set_trusted_certificates(self, value): |
| self._check(pn_messenger_set_trusted_certificates(self._mng, value)) |
| |
| trusted_certificates = property(_get_trusted_certificates, |
| _set_trusted_certificates, |
| doc=""" |
| A path do a database of trusted certificates for use in verifying the |
| peer on an SSL/TLS connection. If this property is None, then the peer |
| will not be verified. |
| """) |
| |
| def _get_timeout(self): |
| return pn_messenger_get_timeout(self._mng) |
| |
| def _set_timeout(self, value): |
| self._check(pn_messenger_set_timeout(self._mng, value)) |
| |
| timeout = property(_get_timeout, _set_timeout, |
| doc=""" |
| The timeout property contains the default timeout for blocking |
| operations performed by the L{Messenger}. |
| """) |
| |
| def start(self): |
| """ |
| Transitions the L{Messenger} to an active state. A L{Messenger} is |
| initially created in an inactive state. When inactive a |
| L{Messenger} will not send or receive messages from its internal |
| queues. A L{Messenger} must be started before calling L{send} or |
| L{recv}. |
| """ |
| self._check(pn_messenger_start(self._mng)) |
| |
| def stop(self): |
| """ |
| Transitions the L{Messenger} to an inactive state. An inactive |
| L{Messenger} will not send or receive messages from its internal |
| queues. A L{Messenger} should be stopped before being discarded to |
| ensure a clean shutdown handshake occurs on any internally managed |
| connections. |
| """ |
| self._check(pn_messenger_stop(self._mng)) |
| |
| def subscribe(self, source): |
| """ |
| Subscribes the L{Messenger} to messages originating from the |
| specified source. The source is an address as specified in the |
| L{Messenger} introduction with the following addition. If the |
| domain portion of the address begins with the '~' character, the |
| L{Messenger} will interpret the domain as host/port, bind to it, |
| and listen for incoming messages. For example "~0.0.0.0", |
| "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any |
| local interface and listen for incoming messages with the last |
| variant only permitting incoming SSL connections. |
| |
| @type source: string |
| @param source: the source of messages to subscribe to |
| """ |
| self._check(pn_messenger_subscribe(self._mng, source)) |
| |
| def put(self, message): |
| """ |
| Places the content contained in the message onto the outgoing |
| queue of the L{Messenger}. This method will never block, however |
| it will send any unblocked L{Messages<Message>} in the outgoing |
| queue immediately and leave any blocked L{Messages<Message>} |
| remaining in the outgoing queue. The L{send} call may be used to |
| block until the outgoing queue is empty. The L{outgoing} property |
| may be used to check the depth of the outgoing queue. |
| |
| @type message: Message |
| @param message: the message to place in the outgoing queue |
| """ |
| message._pre_encode() |
| self._check(pn_messenger_put(self._mng, message._msg)) |
| |
| def send(self): |
| """ |
| Blocks until the outgoing queue is empty or the operation times |
| out. The L{timeout} property controls how long a L{Messenger} will |
| block before timing out. |
| """ |
| self._check(pn_messenger_send(self._mng)) |
| |
| def recv(self, n): |
| """ |
| Receives up to I{n} messages into the incoming queue of the |
| L{Messenger}. This method will block until at least one message is |
| available or the operation times out. |
| """ |
| self._check(pn_messenger_recv(self._mng, n)) |
| |
| def get(self, message): |
| """ |
| Moves the message from the head of the incoming message queue into |
| the supplied message object. Any content in the message will be |
| overwritten. |
| |
| @type message: Message |
| @param message: the destination message object |
| """ |
| self._check(pn_messenger_get(self._mng, message._msg)) |
| message._post_decode() |
| |
| @property |
| def outgoing(self): |
| """ |
| The outgoing queue depth. |
| """ |
| return pn_messenger_outgoing(self._mng) |
| |
| @property |
| def incoming(self): |
| """ |
| The incoming queue depth. |
| """ |
| return pn_messenger_incoming(self._mng) |
| |
| class Message(object): |
| """ |
| The L{Message} class is a mutable holder of message content. |
| |
| @ivar instructions: delivery instructions for the message |
| @type instructions: dict |
| @ivar annotations: infrastructure defined message annotations |
| @type annotations: dict |
| @ivar properties: application defined message properties |
| @type properties: dict |
| @ivar body: message body |
| @type body: bytes | unicode | dict | list | int | long | float | UUID |
| """ |
| |
| DATA = PN_DATA |
| TEXT = PN_TEXT |
| AMQP = PN_AMQP |
| JSON = PN_JSON |
| |
| DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY |
| |
| def __init__(self): |
| self._msg = pn_message() |
| self._id = Data(pn_message_id(self._msg)) |
| self._correlation_id = Data(pn_message_correlation_id(self._msg)) |
| self.instructions = None |
| self.annotations = None |
| self.properties = None |
| self.body = None |
| |
| def __del__(self): |
| if hasattr(self, "_msg"): |
| pn_message_free(self._msg) |
| del self._msg |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, MessageException) |
| raise exc("[%s]: %s" % (err, pn_message_error(self._msg))) |
| else: |
| return err |
| |
| def _pre_encode(self): |
| inst = Data(pn_message_instructions(self._msg)) |
| ann = Data(pn_message_annotations(self._msg)) |
| props = Data(pn_message_properties(self._msg)) |
| body = Data(pn_message_body(self._msg)) |
| |
| inst.clear() |
| if self.instructions is not None: |
| inst.put_object(self.instructions) |
| ann.clear() |
| if self.annotations is not None: |
| ann.put_object(self.annotations) |
| props.clear() |
| if self.properties is not None: |
| props.put_object(self.properties) |
| if self.body is not None: |
| # XXX: move this out when load/save are gone |
| body.clear() |
| body.put_object(self.body) |
| |
| def _post_decode(self): |
| inst = Data(pn_message_instructions(self._msg)) |
| ann = Data(pn_message_annotations(self._msg)) |
| props = Data(pn_message_properties(self._msg)) |
| body = Data(pn_message_body(self._msg)) |
| |
| if inst.next(): |
| self.instructions = inst.get_object() |
| else: |
| self.instructions = None |
| if ann.next(): |
| self.annotations = ann.get_object() |
| else: |
| self.annotations = None |
| if props.next(): |
| self.properties = props.get_object() |
| else: |
| self.properties = None |
| if body.next(): |
| self.body = body.get_object() |
| else: |
| self.body = None |
| |
| def clear(self): |
| """ |
| Clears the contents of the L{Message}. All fields will be reset to |
| their default values. |
| """ |
| pn_message_clear(self._msg) |
| self.instructions = None |
| self.annotations = None |
| self.properties = None |
| self.body = None |
| |
| def _is_inferred(self): |
| return pn_message_is_inferred(self._msg) |
| |
| def _set_inferred(self, value): |
| self._check(pn_message_set_inferred(self._msg, bool(value))) |
| |
| inferred = property(_is_inferred, _set_inferred) |
| |
| def _is_durable(self): |
| return pn_message_is_durable(self._msg) |
| |
| def _set_durable(self, value): |
| self._check(pn_message_set_durable(self._msg, bool(value))) |
| |
| durable = property(_is_durable, _set_durable, |
| doc=""" |
| The durable property indicates that the message should be held durably |
| by any intermediaries taking responsibility for the message. |
| """) |
| |
| def _get_priority(self): |
| return pn_message_get_priority(self._msg) |
| |
| def _set_priority(self, value): |
| self._check(pn_message_set_priority(self._msg, value)) |
| |
| priority = property(_get_priority, _set_priority, |
| doc=""" |
| The priority of the message. |
| """) |
| |
| def _get_ttl(self): |
| return pn_message_get_ttl(self._msg) |
| |
| def _set_ttl(self, value): |
| self._check(pn_message_set_ttl(self._msg, value)) |
| |
| ttl = property(_get_ttl, _set_ttl, |
| doc=""" |
| The time to live of the message measured in milliseconds. Expired |
| messages may be dropped. |
| """) |
| |
| def _is_first_acquirer(self): |
| return pn_message_is_first_acquirer(self._msg) |
| |
| def _set_first_acquirer(self, value): |
| self._check(pn_message_set_first_acquirer(self._msg, bool(value))) |
| |
| first_acquirer = property(_is_first_acquirer, _set_first_acquirer, |
| doc=""" |
| True iff the recipient is the first to acquire the message. |
| """) |
| |
| def _get_delivery_count(self): |
| return pn_message_get_delivery_count(self._msg) |
| |
| def _set_delivery_count(self, value): |
| self._check(pn_message_set_delivery_count(self._msg, value)) |
| |
| delivery_count = property(_get_delivery_count, _set_delivery_count, |
| doc=""" |
| The number of delivery attempts made for this message. |
| """) |
| |
| |
| def _get_id(self): |
| return self._id.get_object() |
| def _set_id(self, value): |
| if type(value) in (int, long): |
| value = ulong(value) |
| self._id.rewind() |
| self._id.put_object(value) |
| id = property(_get_id, _set_id, |
| doc=""" |
| The id of the message. |
| """) |
| |
| def _get_user_id(self): |
| return pn_message_get_user_id(self._msg) |
| |
| def _set_user_id(self, value): |
| self._check(pn_message_set_user_id(self._msg, value)) |
| |
| user_id = property(_get_user_id, _set_user_id, |
| doc=""" |
| The user id of the message creator. |
| """) |
| |
| def _get_address(self): |
| return pn_message_get_address(self._msg) |
| |
| def _set_address(self, value): |
| self._check(pn_message_set_address(self._msg, value)) |
| |
| address = property(_get_address, _set_address, |
| doc=""" |
| The address of the message. |
| """) |
| |
| def _get_subject(self): |
| return pn_message_get_subject(self._msg) |
| |
| def _set_subject(self, value): |
| self._check(pn_message_set_subject(self._msg, value)) |
| |
| subject = property(_get_subject, _set_subject, |
| doc=""" |
| The subject of the message. |
| """) |
| |
| def _get_reply_to(self): |
| return pn_message_get_reply_to(self._msg) |
| |
| def _set_reply_to(self, value): |
| self._check(pn_message_set_reply_to(self._msg, value)) |
| |
| reply_to = property(_get_reply_to, _set_reply_to, |
| doc=""" |
| The reply-to address for the message. |
| """) |
| |
| def _get_correlation_id(self): |
| return self._correlation_id.get_object() |
| def _set_correlation_id(self, value): |
| if type(value) in (int, long): |
| value = ulong(value) |
| self._correlation_id.rewind() |
| self._correlation_id.put_object(value) |
| |
| correlation_id = property(_get_correlation_id, _set_correlation_id, |
| doc=""" |
| The correlation-id for the message. |
| """) |
| |
| def _get_content_type(self): |
| return pn_message_get_content_type(self._msg) |
| |
| def _set_content_type(self, value): |
| self._check(pn_message_set_content_type(self._msg, value)) |
| |
| content_type = property(_get_content_type, _set_content_type, |
| doc=""" |
| The content-type of the message. |
| """) |
| |
| def _get_content_encoding(self): |
| return pn_message_get_content_encoding(self._msg) |
| |
| def _set_content_encoding(self, value): |
| self._check(pn_message_set_content_encoding(self._msg, value)) |
| |
| content_encoding = property(_get_content_encoding, _set_content_encoding, |
| doc=""" |
| The content-encoding of the message. |
| """) |
| |
| def _get_expiry_time(self): |
| return pn_message_get_expiry_time(self._msg) |
| |
| def _set_expiry_time(self, value): |
| self._check(pn_message_set_expiry_time(self._msg, value)) |
| |
| expiry_time = property(_get_expiry_time, _set_expiry_time, |
| doc=""" |
| The expiry time of the message. |
| """) |
| |
| def _get_creation_time(self): |
| return pn_message_get_creation_time(self._msg) |
| |
| def _set_creation_time(self, value): |
| self._check(pn_message_set_creation_time(self._msg, value)) |
| |
| creation_time = property(_get_creation_time, _set_creation_time, |
| doc=""" |
| The creation time of the message. |
| """) |
| |
| def _get_group_id(self): |
| return pn_message_get_group_id(self._msg) |
| |
| def _set_group_id(self, value): |
| self._check(pn_message_set_group_id(self._msg, value)) |
| |
| group_id = property(_get_group_id, _set_group_id, |
| doc=""" |
| The group id of the message. |
| """) |
| |
| def _get_group_sequence(self): |
| return pn_message_get_group_sequence(self._msg) |
| |
| def _set_group_sequence(self, value): |
| self._check(pn_message_set_group_sequence(self._msg, value)) |
| |
| group_sequence = property(_get_group_sequence, _set_group_sequence, |
| doc=""" |
| The sequence of the message within its group. |
| """) |
| |
| def _get_reply_to_group_id(self): |
| return pn_message_get_reply_to_group_id(self._msg) |
| |
| def _set_reply_to_group_id(self, value): |
| self._check(pn_message_set_reply_to_group_id(self._msg, value)) |
| |
| reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, |
| doc=""" |
| The group-id for any replies. |
| """) |
| |
| # XXX |
| def _get_format(self): |
| return pn_message_get_format(self._msg) |
| |
| def _set_format(self, value): |
| self._check(pn_message_set_format(self._msg, value)) |
| |
| format = property(_get_format, _set_format, |
| doc=""" |
| The format of the message. |
| """) |
| |
| def encode(self): |
| self._pre_encode() |
| sz = 16 |
| while True: |
| err, data = pn_message_encode(self._msg, sz) |
| if err == PN_OVERFLOW: |
| sz *= 2 |
| continue |
| else: |
| self._check(err) |
| return data |
| |
| def decode(self, data): |
| self._check(pn_message_decode(self._msg, data, len(data))) |
| self._post_decode() |
| |
| def load(self, data): |
| self._check(pn_message_load(self._msg, data)) |
| |
| def save(self): |
| sz = 16 |
| while True: |
| err, data = pn_message_save(self._msg, sz) |
| if err == PN_OVERFLOW: |
| sz *= 2 |
| continue |
| else: |
| self._check(err) |
| return data |
| |
| class DataException(ProtonException): |
| """ |
| The DataException class is the root of the Data exception hierarchy. |
| All exceptions raised by the Data class extend this exception. |
| """ |
| pass |
| |
| class UnmappedType: |
| |
| def __init__(self, msg): |
| self.msg = msg |
| |
| def __repr__(self): |
| return "UnmappedType(%s)" % self.msg |
| |
| class ulong(long): |
| |
| def __repr__(self): |
| return "ulong(%s)" % long.__repr__(self) |
| |
| class timestamp(long): |
| |
| def __repr__(self): |
| return "timestamp(%s)" % long.__repr__(self) |
| |
| class symbol(unicode): |
| |
| def __repr__(self): |
| return "symbol(%s)" % unicode.__repr__(self) |
| |
| class char(unicode): |
| |
| def __repr__(self): |
| return "char(%s)" % unicode.__repr__(self) |
| |
| class Described(object): |
| |
| def __init__(self, descriptor, value): |
| self.descriptor = descriptor |
| self.value = value |
| |
| def __repr__(self): |
| return "Described(%r, %r)" % (self.descriptor, self.value) |
| |
| def __eq__(self, o): |
| if isinstance(o, Described): |
| return self.descriptor == o.descriptor and self.value == o.value |
| else: |
| return False |
| |
| class Constant(object): |
| |
| def __init__(self, name): |
| self.name = name |
| |
| def __repr__(self): |
| return self.name |
| |
| UNDESCRIBED = Constant("UNDESCRIBED") |
| |
| class Array(object): |
| |
| def __init__(self, descriptor, type, *elements): |
| self.descriptor = descriptor |
| self.type = type |
| self.elements = elements |
| |
| def __repr__(self): |
| if self.elements: |
| els = ", %s" % (", ".join(map(repr, self.elements))) |
| else: |
| els = "" |
| return "Array(%r, %r%s)" % (self.descriptor, self.type, els) |
| |
| def __eq__(self, o): |
| if isinstance(o, Array): |
| return self.descriptor == o.descriptor and \ |
| self.type == o.type and self.elements == o.elements |
| else: |
| return False |
| |
| class Data: |
| """ |
| The L{Data} class provides an interface for decoding, extracting, |
| creating, and encoding arbitrary AMQP data. A L{Data} object |
| contains a tree of AMQP values. Leaf nodes in this tree correspond |
| to scalars in the AMQP type system such as L{ints<INT>} or |
| L{strings<STRING>}. Non-leaf nodes in this tree correspond to |
| compound values in the AMQP type system such as L{lists<LIST>}, |
| L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. |
| The root node of the tree is the L{Data} object itself and can have |
| an arbitrary number of children. |
| |
| A L{Data} object maintains the notion of the current sibling node |
| and a current parent node. Siblings are ordered within their parent. |
| Values are accessed and/or added by using the L{next}, L{prev}, |
| L{enter}, and L{exit} methods to navigate to the desired location in |
| the tree and using the supplied variety of put_*/get_* methods to |
| access or add a value of the desired type. |
| |
| The put_* methods will always add a value I{after} the current node |
| in the tree. If the current node has a next sibling the put_* method |
| will overwrite the value on this node. If there is no current node |
| or the current node has no next sibling then one will be added. The |
| put_* methods always set the added/modified node to the current |
| node. The get_* methods read the value of the current node and do |
| not change which node is current. |
| |
| The following types of scalar values are supported: |
| |
| - L{NULL} |
| - L{BOOL} |
| - L{UBYTE} |
| - L{USHORT} |
| - L{SHORT} |
| - L{UINT} |
| - L{INT} |
| - L{ULONG} |
| - L{LONG} |
| - L{FLOAT} |
| - L{DOUBLE} |
| - L{BINARY} |
| - L{STRING} |
| - L{SYMBOL} |
| |
| The following types of compound values are supported: |
| |
| - L{DESCRIBED} |
| - L{ARRAY} |
| - L{LIST} |
| - L{MAP} |
| """ |
| |
| NULL = PN_NULL; "A null value." |
| BOOL = PN_BOOL; "A boolean value." |
| UBYTE = PN_UBYTE; "An unsigned byte value." |
| BYTE = PN_BYTE; "A signed byte value." |
| USHORT = PN_USHORT; "An unsigned short value." |
| SHORT = PN_SHORT; "A short value." |
| UINT = PN_UINT; "An unsigned int value." |
| INT = PN_INT; "A signed int value." |
| CHAR = PN_CHAR; "A character value." |
| ULONG = PN_ULONG; "An unsigned long value." |
| LONG = PN_LONG; "A signed long value." |
| TIMESTAMP = PN_TIMESTAMP; "A timestamp value." |
| FLOAT = PN_FLOAT; "A float value." |
| DOUBLE = PN_DOUBLE; "A double value." |
| DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." |
| DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." |
| DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." |
| UUID = PN_UUID; "A UUID value." |
| BINARY = PN_BINARY; "A binary string." |
| STRING = PN_STRING; "A unicode string." |
| SYMBOL = PN_SYMBOL; "A symbolic string." |
| DESCRIBED = PN_DESCRIBED; "A described value." |
| ARRAY = PN_ARRAY; "An array value." |
| LIST = PN_LIST; "A list value." |
| MAP = PN_MAP; "A map value." |
| |
| def __init__(self, capacity=16): |
| if type(capacity) in (int, long): |
| self._data = pn_data(capacity) |
| self._free = True |
| else: |
| self._data = capacity |
| self._free = False |
| |
| def __del__(self): |
| if self._free and hasattr(self, "_data"): |
| pn_data_free(self._data) |
| del self._data |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, DataException) |
| raise exc("[%s]: %s" % (err, "xxx")) |
| else: |
| return err |
| |
| def clear(self): |
| """ |
| Clears the data object. |
| """ |
| pn_data_clear(self._data) |
| |
| def rewind(self): |
| """ |
| Clears current node and sets the parent to the root node. |
| """ |
| pn_data_rewind(self._data) |
| |
| def next(self): |
| """ |
| Advances the current node to its next sibling and returns its |
| type. If there is no next sibling the current node remains |
| unchanged and None is returned. |
| """ |
| found = pn_data_next(self._data) |
| if found: |
| return self.type() |
| else: |
| return None |
| |
| def prev(self): |
| """ |
| Advances the current node to its previous sibling and returns its |
| type. If there is no previous sibling the current node remains |
| unchanged and None is returned. |
| """ |
| found = pn_data_prev(self._data) |
| if found: |
| return self.type() |
| else: |
| return None |
| |
| def enter(self): |
| """ |
| Sets the parent node to the current node and clears the current node. |
| """ |
| return pn_data_enter(self._data) |
| |
| def exit(self): |
| """ |
| Sets the current node to the parent node and the parent node to |
| its own parent. |
| """ |
| return pn_data_exit(self._data) |
| |
| def type(self): |
| """ |
| Returns the type of the current node. |
| """ |
| dtype = pn_data_type(self._data) |
| if dtype == -1: |
| return None |
| else: |
| return dtype |
| |
| def encode(self): |
| """ |
| Returns a representation of the data encoded in AMQP format. |
| """ |
| size = 1024 |
| while True: |
| cd, enc = pn_data_encode(self._data, size) |
| if cd == PN_OVERFLOW: |
| size *= 2 |
| elif cd >= 0: |
| return enc |
| else: |
| self._check(cd) |
| |
| def decode(self, encoded): |
| """ |
| Decodes the first value from supplied AMQP data and returns the |
| number of bytes consumed. |
| |
| @type encoded: binary |
| @param encoded: AMQP encoded binary data |
| """ |
| return self._check(pn_data_decode(self._data, encoded)) |
| |
| def put_list(self): |
| """ |
| Puts a list value. Elements may be filled by entering the list |
| node and putting element values. |
| |
| >>> data = Data() |
| >>> data.put_list() |
| >>> data.enter() |
| >>> data.put_int(1) |
| >>> data.put_int(2) |
| >>> data.put_int(3) |
| >>> data.exit() |
| """ |
| self._check(pn_data_put_list(self._data)) |
| |
| def put_map(self): |
| """ |
| Puts a map value. Elements may be filled by entering the map node |
| and putting alternating key value pairs. |
| |
| >>> data = Data() |
| >>> data.put_map() |
| >>> data.enter() |
| >>> data.put_string("key") |
| >>> data.put_string("value") |
| >>> data.exit() |
| """ |
| self._check(pn_data_put_map(self._data)) |
| |
| def put_array(self, described, element_type): |
| """ |
| Puts an array value. Elements may be filled by entering the array |
| node and putting the element values. The values must all be of the |
| specified array element type. If an array is described then the |
| first child value of the array is the descriptor and may be of any |
| type. |
| |
| >>> data = Data() |
| >>> |
| >>> data.put_array(False, Data.INT) |
| >>> data.enter() |
| >>> data.put_int(1) |
| >>> data.put_int(2) |
| >>> data.put_int(3) |
| >>> data.exit() |
| >>> |
| >>> data.put_array(True, Data.DOUBLE) |
| >>> data.enter() |
| >>> data.put_symbol("array-descriptor") |
| >>> data.put_double(1.1) |
| >>> data.put_double(1.2) |
| >>> data.put_double(1.3) |
| >>> data.exit() |
| |
| @type described: bool |
| @param described: specifies whether the array is described |
| @type element_type: int |
| @param element_type: the type of the array elements |
| """ |
| self._check(pn_data_put_array(self._data, described, element_type)) |
| |
| def put_described(self): |
| """ |
| Puts a described value. A described node has two children, the |
| descriptor and the value. These are specified by entering the node |
| and putting the desired values. |
| |
| >>> data = Data() |
| >>> data.put_described() |
| >>> data.enter() |
| >>> data.put_symbol("value-descriptor") |
| >>> data.put_string("the value") |
| >>> data.exit() |
| """ |
| self._check(pn_data_put_described(self._data)) |
| |
| def put_null(self): |
| """ |
| Puts a null value. |
| """ |
| self._check(pn_data_put_null(self._data)) |
| |
| def put_bool(self, b): |
| """ |
| Puts a boolean value. |
| |
| @param b: a boolean value |
| """ |
| self._check(pn_data_put_bool(self._data, b)) |
| |
| def put_ubyte(self, ub): |
| """ |
| Puts an unsigned byte value. |
| |
| @param ub: an integral value |
| """ |
| self._check(pn_data_put_ubyte(self._data, ub)) |
| |
| def put_byte(self, b): |
| """ |
| Puts a signed byte value. |
| |
| @param b: an integral value |
| """ |
| self._check(pn_data_put_byte(self._data, b)) |
| |
| def put_ushort(self, us): |
| """ |
| Puts an unsigned short value. |
| |
| @param us: an integral value. |
| """ |
| self._check(pn_data_put_ushort(self._data, us)) |
| |
| def put_short(self, s): |
| """ |
| Puts a signed short value. |
| |
| @param s: an integral value |
| """ |
| self._check(pn_data_put_short(self._data, s)) |
| |
| def put_uint(self, ui): |
| """ |
| Puts an unsigned int value. |
| |
| @param ui: an integral value |
| """ |
| self._check(pn_data_put_uint(self._data, ui)) |
| |
| def put_int(self, i): |
| """ |
| Puts a signed int value. |
| |
| @param i: an integral value |
| """ |
| self._check(pn_data_put_int(self._data, i)) |
| |
| def put_char(self, c): |
| """ |
| Puts a char value. |
| |
| @param c: a single character |
| """ |
| self._check(pn_data_put_char(self._data, ord(c))) |
| |
| def put_ulong(self, ul): |
| """ |
| Puts an unsigned long value. |
| |
| @param ul: an integral value |
| """ |
| self._check(pn_data_put_ulong(self._data, ul)) |
| |
| def put_long(self, l): |
| """ |
| Puts a signed long value. |
| |
| @param l: an integral value |
| """ |
| self._check(pn_data_put_long(self._data, l)) |
| |
| def put_timestamp(self, t): |
| """ |
| Puts a timestamp value. |
| |
| @param t: an integral value |
| """ |
| self._check(pn_data_put_timestamp(self._data, t)) |
| |
| def put_float(self, f): |
| """ |
| Puts a float value. |
| |
| @param f: a floating point value |
| """ |
| self._check(pn_data_put_float(self._data, f)) |
| |
| def put_double(self, d): |
| """ |
| Puts a double value. |
| |
| @param d: a floating point value. |
| """ |
| self._check(pn_data_put_double(self._data, d)) |
| |
| def put_decimal32(self, d): |
| """ |
| Puts a decimal32 value. |
| |
| @param d: a decimal32 value |
| """ |
| self._check(pn_data_put_decimal32(self._data, d)) |
| |
| def put_decimal64(self, d): |
| """ |
| Puts a decimal64 value. |
| |
| @param d: a decimal64 value |
| """ |
| self._check(pn_data_put_decimal64(self._data, d)) |
| |
| def put_decimal128(self, d): |
| """ |
| Puts a decimal128 value. |
| |
| @param d: a decimal128 value |
| """ |
| self._check(pn_data_put_decimal128(self._data, d)) |
| |
| def put_uuid(self, u): |
| """ |
| Puts a UUID value. |
| |
| @param u: a uuid value |
| """ |
| self._check(pn_data_put_uuid(self._data, u.bytes)) |
| |
| def put_binary(self, b): |
| """ |
| Puts a binary value. |
| |
| @type b: binary |
| @param b: a binary value |
| """ |
| self._check(pn_data_put_binary(self._data, b)) |
| |
| def put_string(self, s): |
| """ |
| Puts a unicode value. |
| |
| @type s: unicode |
| @param s: a unicode value |
| """ |
| self._check(pn_data_put_string(self._data, s.encode("utf8"))) |
| |
| def put_symbol(self, s): |
| """ |
| Puts a symbolic value. |
| |
| @type s: string |
| @param s: the symbol name |
| """ |
| self._check(pn_data_put_symbol(self._data, s)) |
| |
| def get_list(self): |
| """ |
| If the current node is a list, return the number of elements, |
| otherwise return zero. List elements can be accessed by entering |
| the list. |
| |
| >>> count = data.get_list() |
| >>> data.enter() |
| >>> for i in range(count): |
| ... type = data.next() |
| ... if type == Data.STRING: |
| ... print data.get_string() |
| ... elif type == ...: |
| ... ... |
| >>> data.exit() |
| """ |
| return pn_data_get_list(self._data) |
| |
| def get_map(self): |
| """ |
| If the current node is a map, return the number of child elements, |
| otherwise return zero. Key value pairs can be accessed by entering |
| the map. |
| |
| >>> count = data.get_map() |
| >>> data.enter() |
| >>> for i in range(count/2): |
| ... type = data.next() |
| ... if type == Data.STRING: |
| ... print data.get_string() |
| ... elif type == ...: |
| ... ... |
| >>> data.exit() |
| """ |
| return pn_data_get_map(self._data) |
| |
| def get_array(self): |
| """ |
| If the current node is an array, return a tuple of the element |
| count, a boolean indicating whether the array is described, and |
| the type of each element, otherwise return (0, False, None). Array |
| data can be accessed by entering the array. |
| |
| >>> # read an array of strings with a symbolic descriptor |
| >>> count, described, type = data.get_array() |
| >>> data.enter() |
| >>> data.next() |
| >>> print "Descriptor:", data.get_symbol() |
| >>> for i in range(count): |
| ... data.next() |
| ... print "Element:", data.get_string() |
| >>> data.exit() |
| """ |
| count = pn_data_get_array(self._data) |
| described = pn_data_is_array_described(self._data) |
| type = pn_data_get_array_type(self._data) |
| if type == -1: |
| type = None |
| return count, described, type |
| |
| def is_described(self): |
| """ |
| Checks if the current node is a described value. The descriptor |
| and value may be accessed by entering the described value. |
| |
| >>> # read a symbolically described string |
| >>> assert data.is_described() # will error if the current node is not described |
| >>> data.enter() |
| >>> print data.get_symbol() |
| >>> print data.get_string() |
| >>> data.exit() |
| """ |
| return pn_data_is_described(self._data) |
| |
| def is_null(self): |
| """ |
| Checks if the current node is a null. |
| """ |
| self._check(pn_data_get_null(self._data)) |
| |
| def get_bool(self): |
| """ |
| If the current node is a boolean, returns its value, returns False |
| otherwise. |
| """ |
| return pn_data_get_bool(self._data) |
| |
| def get_ubyte(self): |
| """ |
| If the current node is an unsigned byte, returns its value, |
| returns 0 otherwise. |
| """ |
| return pn_data_get_ubyte(self._data) |
| |
| def get_byte(self): |
| """ |
| If the current node is a signed byte, returns its value, returns 0 |
| otherwise. |
| """ |
| return pn_data_get_byte(self._data) |
| |
| def get_ushort(self): |
| """ |
| If the current node is an unsigned short, returns its value, |
| returns 0 otherwise. |
| """ |
| return pn_data_get_ushort(self._data) |
| |
| def get_short(self): |
| """ |
| If the current node is a signed short, returns its value, returns |
| 0 otherwise. |
| """ |
| return pn_data_get_short(self._data) |
| |
| def get_uint(self): |
| """ |
| If the current node is an unsigned int, returns its value, returns |
| 0 otherwise. |
| """ |
| return pn_data_get_uint(self._data) |
| |
| def get_int(self): |
| """ |
| If the current node is a signed int, returns its value, returns 0 |
| otherwise. |
| """ |
| return pn_data_get_int(self._data) |
| |
| def get_char(self): |
| """ |
| If the current node is a char, returns its value, returns 0 |
| otherwise. |
| """ |
| return char(unichr(pn_data_get_char(self._data))) |
| |
| def get_ulong(self): |
| """ |
| If the current node is an unsigned long, returns its value, |
| returns 0 otherwise. |
| """ |
| return ulong(pn_data_get_ulong(self._data)) |
| |
| def get_long(self): |
| """ |
| If the current node is an signed long, returns its value, returns |
| 0 otherwise. |
| """ |
| return pn_data_get_long(self._data) |
| |
| def get_timestamp(self): |
| """ |
| If the current node is a timestamp, returns its value, returns 0 |
| otherwise. |
| """ |
| return timestamp(pn_data_get_timestamp(self._data)) |
| |
| def get_float(self): |
| """ |
| If the current node is a float, returns its value, raises 0 |
| otherwise. |
| """ |
| return pn_data_get_float(self._data) |
| |
| def get_double(self): |
| """ |
| If the current node is a double, returns its value, returns 0 |
| otherwise. |
| """ |
| return pn_data_get_double(self._data) |
| |
| # XXX: need to convert |
| def get_decimal32(self): |
| """ |
| If the current node is a decimal32, returns its value, returns 0 |
| otherwise. |
| """ |
| return pn_data_get_decimal32(self._data) |
| |
| # XXX: need to convert |
| def get_decimal64(self): |
| """ |
| If the current node is a decimal64, returns its value, returns 0 |
| otherwise. |
| """ |
| return pn_data_get_decimal64(self._data) |
| |
| # XXX: need to convert |
| def get_decimal128(self): |
| """ |
| If the current node is a decimal128, returns its value, returns 0 |
| otherwise. |
| """ |
| return pn_data_get_decimal128(self._data) |
| |
| def get_uuid(self): |
| """ |
| If the current node is a UUID, returns its value, returns None |
| otherwise. |
| """ |
| if pn_data_type(self._data) == Data.UUID: |
| return uuid.UUID(bytes=pn_data_get_uuid(self._data)) |
| else: |
| return None |
| |
| def get_binary(self): |
| """ |
| If the current node is binary, returns its value, returns "" |
| otherwise. |
| """ |
| return pn_data_get_binary(self._data) |
| |
| def get_string(self): |
| """ |
| If the current node is a string, returns its value, returns "" |
| otherwise. |
| """ |
| return pn_data_get_string(self._data).decode("utf8") |
| |
| def get_symbol(self): |
| """ |
| If the current node is a symbol, returns its value, returns "" |
| otherwise. |
| """ |
| return symbol(pn_data_get_symbol(self._data)) |
| |
| def copy(self, src): |
| self._check(pn_data_copy(self._data, src._data)) |
| |
| def format(self): |
| sz = 16 |
| while True: |
| err, result = pn_data_format(self._data, sz) |
| if err == PN_OVERFLOW: |
| sz *= 2 |
| continue |
| else: |
| self._check(err) |
| return result |
| |
| def dump(self): |
| pn_data_dump(self._data) |
| |
| def put_dict(self, d): |
| self.put_map() |
| self.enter() |
| try: |
| for k, v in d.items(): |
| self.put_object(k) |
| self.put_object(v) |
| finally: |
| self.exit() |
| |
| def get_dict(self): |
| if self.enter(): |
| try: |
| result = {} |
| while self.next(): |
| k = self.get_object() |
| if self.next(): |
| v = self.get_object() |
| else: |
| v = None |
| result[k] = v |
| finally: |
| self.exit() |
| return result |
| |
| def put_sequence(self, s): |
| self.put_list() |
| self.enter() |
| try: |
| for o in s: |
| self.put_object(o) |
| finally: |
| self.exit() |
| |
| def get_sequence(self): |
| if self.enter(): |
| try: |
| result = [] |
| while self.next(): |
| result.append(self.get_object()) |
| finally: |
| self.exit() |
| return result |
| |
| def get_py_described(self): |
| if self.enter(): |
| try: |
| self.next() |
| descriptor = self.get_object() |
| self.next() |
| value = self.get_object() |
| finally: |
| self.exit() |
| return Described(descriptor, value) |
| |
| def put_py_described(self, d): |
| self.put_described() |
| self.enter() |
| try: |
| self.put_object(d.descriptor) |
| self.put_object(d.value) |
| finally: |
| self.exit() |
| |
| def get_py_array(self): |
| count, described, type = self.get_array() |
| if self.enter(): |
| try: |
| if described: |
| self.next() |
| descriptor = self.get_object() |
| else: |
| descriptor = UNDESCRIBED |
| elements = [] |
| while self.next(): |
| elements.append(self.get_object()) |
| finally: |
| self.exit() |
| return Array(descriptor, type, *elements) |
| |
| def put_py_array(self, a): |
| self.put_array(a.descriptor != UNDESCRIBED, a.type) |
| self.enter() |
| try: |
| for e in a.elements: |
| self.put_object(e) |
| finally: |
| self.exit() |
| |
| put_mappings = { |
| None.__class__: lambda s, _: s.put_null(), |
| bool: put_bool, |
| dict: put_dict, |
| list: put_sequence, |
| tuple: put_sequence, |
| unicode: put_string, |
| bytes: put_binary, |
| symbol: put_symbol, |
| int: put_long, |
| char: put_char, |
| long: put_long, |
| ulong: put_ulong, |
| timestamp: put_timestamp, |
| float: put_double, |
| uuid.UUID: put_uuid, |
| Described: put_py_described, |
| Array: put_py_array |
| } |
| get_mappings = { |
| NULL: lambda s: None, |
| BOOL: get_bool, |
| BYTE: get_byte, |
| UBYTE: get_ubyte, |
| SHORT: get_short, |
| USHORT: get_ushort, |
| INT: get_int, |
| UINT: get_uint, |
| CHAR: get_char, |
| LONG: get_long, |
| ULONG: get_ulong, |
| TIMESTAMP: get_timestamp, |
| FLOAT: get_float, |
| DOUBLE: get_double, |
| DECIMAL32: get_decimal32, |
| DECIMAL64: get_decimal64, |
| DECIMAL128: get_decimal128, |
| UUID: get_uuid, |
| BINARY: get_binary, |
| STRING: get_string, |
| SYMBOL: get_symbol, |
| DESCRIBED: get_py_described, |
| ARRAY: get_py_array, |
| LIST: get_sequence, |
| MAP: get_dict |
| } |
| |
| def put_object(self, obj): |
| putter = self.put_mappings[obj.__class__] |
| putter(self, obj) |
| |
| def get_object(self): |
| type = self.type() |
| if type is None: return None |
| getter = self.get_mappings.get(type) |
| if getter: |
| return getter(self) |
| else: |
| self.dump() |
| return UnmappedType(str(type)) |
| |
| |
| class ConnectionException(ProtonException): |
| pass |
| |
| class Endpoint(object): |
| |
| LOCAL_UNINIT = PN_LOCAL_UNINIT |
| REMOTE_UNINIT = PN_REMOTE_UNINIT |
| LOCAL_ACTIVE = PN_LOCAL_ACTIVE |
| REMOTE_ACTIVE = PN_REMOTE_ACTIVE |
| LOCAL_CLOSED = PN_LOCAL_CLOSED |
| REMOTE_CLOSED = PN_REMOTE_CLOSED |
| |
| def wrap_connection(conn): |
| if not conn: return None |
| ctx = pn_connection_get_context(conn) |
| if ctx: return ctx |
| wrapper = Connection(_conn=conn) |
| return wrapper |
| |
| class Connection(Endpoint): |
| |
| def __init__(self, _conn=None): |
| if _conn: |
| self._conn = _conn |
| else: |
| self._conn = pn_connection() |
| pn_connection_set_context(self._conn, self) |
| |
| def __del__(self): |
| if hasattr(self, "_conn"): |
| pn_connection_free(self._conn) |
| del self._conn |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, ConnectionException) |
| raise exc("[%s]: %s" % (err, pn_connection_error(self._conn))) |
| else: |
| return err |
| |
| def _get_container(self): |
| return pn_connection_get_container(self._conn) |
| def _set_container(self, name): |
| return pn_connection_set_container(self._conn, name) |
| |
| container = property(_get_container, _set_container) |
| |
| def _get_hostname(self): |
| return pn_connection_get_hostname(self._conn) |
| def _set_hostname(self, name): |
| return pn_connection_set_hostname(self._conn, name) |
| |
| hostname = property(_get_hostname, _set_hostname) |
| |
| @property |
| def remote_container(self): |
| return pn_connection_remote_container(self._conn) |
| |
| @property |
| def remote_hostname(self): |
| return pn_connection_remote_hostname(self._conn) |
| |
| @property |
| def offered_capabilities(self): |
| return Data(pn_connection_offered_capabilities(self._conn)) |
| |
| @property |
| def desired_capabilities(self): |
| return Data(pn_connection_desired_capabilities(self._conn)) |
| |
| @property |
| def remote_offered_capabilities(self): |
| return Data(pn_connection_remote_offered_capabilities(self._conn)) |
| |
| @property |
| def remote_desired_capabilities(self): |
| return Data(pn_connection_remote_desired_capabilities(self._conn)) |
| |
| def open(self): |
| pn_connection_open(self._conn) |
| |
| def close(self): |
| pn_connection_close(self._conn) |
| |
| @property |
| def state(self): |
| return pn_connection_state(self._conn) |
| |
| @property |
| def writable(self): |
| return pn_connection_writable(self._conn) |
| |
| def session(self): |
| return wrap_session(pn_session(self._conn)) |
| |
| def session_head(self, mask): |
| return wrap_session(pn_session_head(self._conn, mask)) |
| |
| def link_head(self, mask): |
| return wrap_link(pn_link_head(self._conn, mask)) |
| |
| @property |
| def work_head(self): |
| return wrap_delivery(pn_work_head(self._conn)) |
| |
| class SessionException(ProtonException): |
| pass |
| |
| def wrap_session(ssn): |
| if ssn is None: return None |
| ctx = pn_session_get_context(ssn) |
| if ctx: |
| return ctx |
| else: |
| wrapper = Session(ssn) |
| pn_session_set_context(ssn, wrapper) |
| return wrapper |
| |
| class Session(Endpoint): |
| |
| def __init__(self, ssn): |
| self._ssn = ssn |
| |
| def __del__(self): |
| if hasattr(self, "_ssn"): |
| pn_session_free(self._ssn) |
| del self._ssn |
| |
| def open(self): |
| pn_session_open(self._ssn) |
| |
| def close(self): |
| pn_session_close(self._ssn) |
| |
| @property |
| def state(self): |
| return pn_session_state(self._ssn) |
| |
| @property |
| def connection(self): |
| return wrap_connection(pn_session_connection(self._ssn)) |
| |
| def sender(self, name): |
| return wrap_link(pn_sender(self._ssn, name)) |
| |
| def receiver(self, name): |
| return wrap_link(pn_receiver(self._ssn, name)) |
| |
| class LinkException(ProtonException): |
| pass |
| |
| def wrap_link(link): |
| if link is None: return None |
| ctx = pn_link_get_context(link) |
| if ctx: |
| return ctx |
| else: |
| if pn_link_is_sender(link): |
| wrapper = Sender(link) |
| else: |
| wrapper = Receiver(link) |
| pn_link_set_context(link, wrapper) |
| return wrapper |
| |
| class Link(Endpoint): |
| |
| def __init__(self, link): |
| self._link = link |
| |
| def __del__(self): |
| if hasattr(self, "_link"): |
| pn_link_free(self._link) |
| del self._link |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, LinkException) |
| raise exc("[%s]: %s" % (err, pn_link_error(self._link))) |
| else: |
| return err |
| |
| def open(self): |
| pn_link_open(self._link) |
| |
| def close(self): |
| pn_link_close(self._link) |
| |
| @property |
| def state(self): |
| return pn_link_state(self._link) |
| |
| @property |
| def source(self): |
| return Terminus(pn_link_source(self._link)) |
| |
| @property |
| def target(self): |
| return Terminus(pn_link_target(self._link)) |
| |
| @property |
| def remote_source(self): |
| return Terminus(pn_link_remote_source(self._link)) |
| @property |
| def remote_target(self): |
| return Terminus(pn_link_remote_target(self._link)) |
| |
| @property |
| def session(self): |
| return wrap_session(pn_link_session(self._link)) |
| |
| def delivery(self, tag): |
| return wrap_delivery(pn_delivery(self._link, tag)) |
| |
| @property |
| def current(self): |
| return wrap_delivery(pn_link_current(self._link)) |
| |
| def advance(self): |
| return pn_link_advance(self._link) |
| |
| @property |
| def unsettled(self): |
| return pn_link_unsettled(self._link) |
| |
| @property |
| def credit(self): |
| return pn_link_credit(self._link) |
| |
| @property |
| def available(self): |
| return pn_link_available(self._link) |
| |
| @property |
| def queued(self): |
| return pn_link_queued(self._link) |
| |
| def next(self, mask): |
| return wrap_link(pn_link_next(self._link, mask)) |
| |
| class Terminus(object): |
| |
| UNSPECIFIED = PN_UNSPECIFIED |
| SOURCE = PN_SOURCE |
| TARGET = PN_TARGET |
| COORDINATOR = PN_COORDINATOR |
| |
| NONDURABLE = PN_NONDURABLE |
| CONFIGURATION = PN_CONFIGURATION |
| DELIVERIES = PN_DELIVERIES |
| |
| def __init__(self, impl): |
| self._impl = impl |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, LinkException) |
| raise exc("[%s]" % err) |
| else: |
| return err |
| |
| def _get_type(self): |
| return pn_terminus_get_type(self._impl) |
| def _set_type(self, type): |
| self._check(pn_terminus_set_type(self._impl, type)) |
| type = property(_get_type, _set_type) |
| |
| def _get_address(self): |
| return pn_terminus_get_address(self._impl) |
| def _set_address(self, address): |
| self._check(pn_terminus_set_address(self._impl, address)) |
| address = property(_get_address, _set_address) |
| |
| def _get_durability(self): |
| return pn_terminus_get_durability(self._impl) |
| def _set_durability(self, seconds): |
| self._check(pn_terminus_set_durability(self._impl, seconds)) |
| durability = property(_get_durability, _set_durability) |
| |
| def _get_expiry_policy(self): |
| return pn_terminus_get_expiry_policy(self._impl) |
| def _set_expiry_policy(self, seconds): |
| self._check(pn_terminus_set_expiry_policy(self._impl, seconds)) |
| expiry_policy = property(_get_expiry_policy, _set_expiry_policy) |
| |
| def _get_timeout(self): |
| return pn_terminus_get_timeout(self._impl) |
| def _set_timeout(self, seconds): |
| self._check(pn_terminus_set_timeout(self._impl, seconds)) |
| timeout = property(_get_timeout, _set_timeout) |
| |
| def _is_dynamic(self): |
| return pn_terminus_is_dynamic(self._impl) |
| def _set_dynamic(self, dynamic): |
| self._check(pn_terminus_set_dynamic(self._impl, dynamic)) |
| dynamic = property(_is_dynamic, _set_dynamic) |
| |
| @property |
| def properties(self): |
| return Data(pn_terminus_properties(self._impl)) |
| |
| @property |
| def capabilities(self): |
| return Data(pn_terminus_capabilities(self._impl)) |
| |
| @property |
| def outcomes(self): |
| return Data(pn_terminus_outcomes(self._impl)) |
| |
| @property |
| def filter(self): |
| return Data(pn_terminus_filter(self._impl)) |
| |
| def copy(self, src): |
| self._check(pn_terminus_copy(self._impl, src._impl)) |
| |
| |
| class Sender(Link): |
| |
| def offered(self, n): |
| pn_link_offered(self._link, n) |
| |
| def send(self, bytes): |
| return self._check(pn_link_send(self._link, bytes)) |
| |
| def drained(self): |
| pn_link_drained(self._link) |
| |
| class Receiver(Link): |
| |
| def flow(self, n): |
| pn_link_flow(self._link, n) |
| |
| def recv(self, limit): |
| n, bytes = pn_link_recv(self._link, limit) |
| if n == PN_EOS: |
| return None |
| else: |
| self._check(n) |
| return bytes |
| |
| def drain(self, n): |
| pn_link_drain(self._link, n) |
| |
| def wrap_delivery(dlv): |
| if not dlv: return None |
| ctx = pn_delivery_get_context(dlv) |
| if ctx: return ctx |
| wrapper = Delivery(dlv) |
| pn_delivery_set_context(dlv, wrapper) |
| return wrapper |
| |
| class Delivery(object): |
| |
| ACCEPTED = PN_ACCEPTED |
| |
| def __init__(self, dlv): |
| self._dlv = dlv |
| |
| @property |
| def tag(self): |
| return pn_delivery_tag(self._dlv) |
| |
| @property |
| def writable(self): |
| return pn_delivery_writable(self._dlv) |
| |
| @property |
| def readable(self): |
| return pn_delivery_readable(self._dlv) |
| |
| @property |
| def updated(self): |
| return pn_delivery_updated(self._dlv) |
| |
| def update(self, state): |
| pn_delivery_update(self._dlv, state) |
| |
| @property |
| def local_state(self): |
| return pn_delivery_local_state(self._dlv) |
| |
| @property |
| def remote_state(self): |
| return pn_delivery_remote_state(self._dlv) |
| |
| @property |
| def settled(self): |
| return pn_delivery_settled(self._dlv) |
| |
| def settle(self): |
| pn_delivery_settle(self._dlv) |
| |
| @property |
| def work_next(self): |
| return wrap_delivery(pn_work_next(self._dlv)) |
| |
| class TransportException(ProtonException): |
| pass |
| |
| class Transport(object): |
| |
| TRACE_DRV = PN_TRACE_DRV |
| TRACE_FRM = PN_TRACE_FRM |
| TRACE_RAW = PN_TRACE_RAW |
| |
| def __init__(self): |
| self._trans = pn_transport() |
| |
| def __del__(self): |
| if hasattr(self, "_trans"): |
| pn_transport_free(self._trans) |
| del self._trans |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, TransportException) |
| raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._trans)))) |
| else: |
| return err |
| |
| def bind(self, connection): |
| self._check(pn_transport_bind(self._trans, connection._conn)) |
| |
| def trace(self, n): |
| pn_transport_trace(self._trans, n) |
| |
| def tick(self, now): |
| return pn_transport_tick(self._trans, now) |
| |
| def output(self, n): |
| cd, out = pn_transport_output(self._trans, n) |
| if cd == PN_EOS: |
| return None |
| else: |
| self._check(cd) |
| return out |
| |
| def input(self, binary): |
| n = pn_transport_input(self._trans, binary) |
| if n == PN_EOS: |
| return None |
| else: |
| return self._check(n) |
| |
| def _get_max_frame_size(self): |
| return pn_transport_get_max_frame(self._trans) |
| |
| def _set_max_frame_size(self, value): |
| pn_transport_set_max_frame(self._trans, value) |
| |
| max_frame_size = property(_get_max_frame_size, _set_max_frame_size, |
| doc=""" |
| Sets the maximum size for received frames (in bytes). |
| """) |
| |
| @property |
| def remote_max_frame_size(self): |
| return pn_transport_get_remote_max_frame(self._trans) |
| |
| class SASLException(TransportException): |
| pass |
| |
| class SASL(object): |
| |
| OK = PN_SASL_OK |
| AUTH = PN_SASL_AUTH |
| |
| def __init__(self, transport): |
| self._sasl = pn_sasl(transport._trans) |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, SASLException) |
| raise exc("[%s]" % (err)) |
| else: |
| return err |
| |
| def mechanisms(self, mechs): |
| pn_sasl_mechanisms(self._sasl, mechs) |
| |
| def client(self): |
| pn_sasl_client(self._sasl) |
| |
| def server(self): |
| pn_sasl_server(self._sasl) |
| |
| def plain(self, user, password): |
| pn_sasl_plain(self._sasl, user, password) |
| |
| def send(self, data): |
| self._check(pn_sasl_send(self._sasl, data, len(data))) |
| |
| def recv(self): |
| sz = 16 |
| while True: |
| n, data = pn_sasl_recv(self._sasl, sz) |
| if n == PN_OVERFLOW: |
| sz *= 2 |
| continue |
| elif n == PN_EOS: |
| return None |
| else: |
| self._check(n) |
| return data |
| |
| @property |
| def outcome(self): |
| outcome = pn_sasl_outcome(self._sasl) |
| if outcome == PN_SASL_NONE: |
| return None |
| else: |
| return outcome |
| |
| def done(self, outcome): |
| pn_sasl_done(self._sasl, outcome) |
| |
| class SSLException(TransportException): |
| pass |
| |
| class SSLUnavailable(SSLException): |
| pass |
| |
| class SSL(object): |
| |
| def _check(self, err): |
| if err < 0: |
| exc = EXCEPTIONS.get(err, SSLException) |
| raise exc("SSL failure.") |
| else: |
| return err |
| |
| def __init__(self, transport): |
| self._ssl = pn_ssl(transport._trans) |
| if self._ssl is None: |
| raise SSLUnavailable() |
| |
| MODE_CLIENT = PN_SSL_MODE_CLIENT |
| MODE_SERVER = PN_SSL_MODE_SERVER |
| |
| def init(self, mode): |
| return self._check( pn_ssl_init(self._ssl, mode) ) |
| |
| def set_credentials(self, cert_file, key_file, password): |
| return self._check( pn_ssl_set_credentials(self._ssl, cert_file, key_file, |
| password) ) |
| |
| def set_trusted_ca_db(self, certificate_db): |
| return self._check( pn_ssl_set_trusted_ca_db(self._ssl, certificate_db) ) |
| |
| def allow_unsecured_client(self): |
| return self._check( pn_ssl_allow_unsecured_client(self._ssl) ) |
| |
| VERIFY_PEER = PN_SSL_VERIFY_PEER |
| ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER |
| |
| def set_peer_authentication(self, verify_mode, trusted_CAs=None): |
| return self._check( pn_ssl_set_peer_authentication(self._ssl, verify_mode, |
| trusted_CAs) ) |
| |
| def peer_authentication(self): |
| # @TODO: fix up buffer return value... |
| pass |
| |
| def cipher_name(self): |
| rc, name = pn_ssl_get_cipher_name( self._ssl, 128 ) |
| if rc: |
| return name |
| return None |
| |
| def protocol_name(self): |
| rc, name = pn_ssl_get_protocol_name( self._ssl, 128 ) |
| if rc: |
| return name |
| return None |
| |
| __all__ = ["Messenger", "Message", "ProtonException", "MessengerException", |
| "MessageException", "Timeout", "Data", "Endpoint", "Connection", |
| "Session", "Link", "Terminus", "Sender", "Receiver", "Delivery", |
| "Transport", "TransportException", "SASL", "SSL", "Described", |
| "Array", "symbol", "char", "timestamp", "ulong", "UNDESCRIBED", |
| "SSLUnavailable", "PN_SESSION_WINDOW"] |