blob: 40e6637a74da37c8157b26f3e44c7c4652e85f85 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
The proton module defines a suite of APIs that implement the AMQP 1.0
protocol.
The proton APIs consist of the following classes:
- L{Messenger} -- A messaging endpoint.
- L{Message} -- A class for creating and/or accessing AMQP message content.
- L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
data.
"""
from cproton import *
import uuid
class ProtonException(Exception):
"""
The root of the proton exception hierarchy. All proton exception
classes derive from this exception.
"""
pass
class Timeout(ProtonException):
"""
A timeout exception indicates that a blocking operation has timed
out.
"""
pass
class MessengerException(ProtonException):
"""
The root of the messenger exception hierarchy. All exceptions
generated by the messenger class derive from this exception.
"""
pass
class MessageException(ProtonException):
"""
The MessageException class is the root of the message exception
hierarhcy. All exceptions generated by the Message class derive from
this exception.
"""
pass
EXCEPTIONS = {
PN_TIMEOUT: Timeout
}
class Messenger(object):
"""
The L{Messenger} class defines a high level interface for sending
and receiving L{Messages<Message>}. Every L{Messenger} contains a
single logical queue of incoming messages and a single logical queue
of outgoing messages. These messages in these queues may be destined
for, or originate from, a variety of addresses.
Address Syntax
==============
An address has the following form::
[ amqp[s]:// ] [user[:password]@] domain [/[name]]
Where domain can be one of::
host | host:port | ip | ip:port | name
The following are valid examples of addresses:
- example.org
- example.org:1234
- amqp://example.org
- amqps://example.org
- example.org/incoming
- amqps://example.org/outgoing
- amqps://fred:trustno1@example.org
- 127.0.0.1:1234
- amqps://127.0.0.1:1234
Sending & Receiving Messages
============================
The L{Messenger} class works in conjuction with the L{Message}
class. The L{Message} class is a mutable holder of message content.
The L{put} method will encode the content in a given L{Message}
object into the outgoing message queue leaving that L{Message}
object free to be modified or discarded without having any impact on
the content in the outgoing queue.
>>> message = Message()
>>> for i in range(3):
... message.address = "amqp://host/queue"
... message.subject = "Hello World %i" % i
... messenger.put(message)
>>> messenger.send()
Similarly, the L{get} method will decode the content in the incoming
message queue into the supplied L{Message} object.
>>> message = Message()
>>> messenger.recv(10):
>>> while messenger.incoming > 0:
... messenger.get(message)
... print message.subject
Hello World 0
Hello World 1
Hello World 2
"""
def __init__(self, name=None):
"""
Construct a new L{Messenger} with the given name. The name has
global scope. If a NULL name is supplied, a L{uuid.UUID} based
name will be chosen.
@type name: string
@param name: the name of the messenger or None
"""
self._mng = pn_messenger(name)
def __del__(self):
if hasattr(self, "_mng"):
pn_messenger_free(self._mng)
del self._mng
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, MessengerException)
raise exc("[%s]: %s" % (err, pn_messenger_error(self._mng)))
else:
return err
@property
def name(self):
"""
The name of the L{Messenger}.
"""
return pn_messenger_name(self._mng)
def _get_certificate(self):
return pn_messenger_get_certificate(self._mng)
def _set_certificate(self, value):
self._check(pn_messenger_set_certificate(self._mng, value))
certificate = property(_get_certificate, _set_certificate,
doc="""
Path to a certificate file for the L{Messenger}. This certificate is
used when the L{Messenger} accepts or establishes SSL/TLS connections.
This property must be specified for the L{Messenger} to accept
incoming SSL/TLS connections and to establish client authenticated
outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
connections do not require this property.
""")
def _get_private_key(self):
return pn_messenger_get_private_key(self._mng)
def _set_private_key(self, value):
self._check(pn_messenger_set_private_key(self._mng, value))
private_key = property(_get_private_key, _set_private_key,
doc="""
Path to a private key file for the L{Messenger's<Messenger>}
certificate. This property must be specified for the L{Messenger} to
accept incoming SSL/TLS connections and to establish client
authenticated outgoing SSL/TLS connection. Non client authenticated
SSL/TLS connections do not require this property.
""")
def _get_password(self):
return pn_messenger_get_password(self._mng)
def _set_password(self, value):
self._check(pn_messenger_set_password(self._mng, value))
password = property(_get_password, _set_password,
doc="""
This property contains the password for the L{Messenger.private_key}
file, or None if the file is not encrypted.
""")
def _get_trusted_certificates(self):
return pn_messenger_get_trusted_certificates(self._mng)
def _set_trusted_certificates(self, value):
self._check(pn_messenger_set_trusted_certificates(self._mng, value))
trusted_certificates = property(_get_trusted_certificates,
_set_trusted_certificates,
doc="""
A path do a database of trusted certificates for use in verifying the
peer on an SSL/TLS connection. If this property is None, then the peer
will not be verified.
""")
def _get_timeout(self):
return pn_messenger_get_timeout(self._mng)
def _set_timeout(self, value):
self._check(pn_messenger_set_timeout(self._mng, value))
timeout = property(_get_timeout, _set_timeout,
doc="""
The timeout property contains the default timeout for blocking
operations performed by the L{Messenger}.
""")
def start(self):
"""
Transitions the L{Messenger} to an active state. A L{Messenger} is
initially created in an inactive state. When inactive a
L{Messenger} will not send or receive messages from its internal
queues. A L{Messenger} must be started before calling L{send} or
L{recv}.
"""
self._check(pn_messenger_start(self._mng))
def stop(self):
"""
Transitions the L{Messenger} to an inactive state. An inactive
L{Messenger} will not send or receive messages from its internal
queues. A L{Messenger} should be stopped before being discarded to
ensure a clean shutdown handshake occurs on any internally managed
connections.
"""
self._check(pn_messenger_stop(self._mng))
def subscribe(self, source):
"""
Subscribes the L{Messenger} to messages originating from the
specified source. The source is an address as specified in the
L{Messenger} introduction with the following addition. If the
domain portion of the address begins with the '~' character, the
L{Messenger} will interpret the domain as host/port, bind to it,
and listen for incoming messages. For example "~0.0.0.0",
"amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
local interface and listen for incoming messages with the last
variant only permitting incoming SSL connections.
@type source: string
@param source: the source of messages to subscribe to
"""
self._check(pn_messenger_subscribe(self._mng, source))
def put(self, message):
"""
Places the content contained in the message onto the outgoing
queue of the L{Messenger}. This method will never block, however
it will send any unblocked L{Messages<Message>} in the outgoing
queue immediately and leave any blocked L{Messages<Message>}
remaining in the outgoing queue. The L{send} call may be used to
block until the outgoing queue is empty. The L{outgoing} property
may be used to check the depth of the outgoing queue.
@type message: Message
@param message: the message to place in the outgoing queue
"""
message._pre_encode()
self._check(pn_messenger_put(self._mng, message._msg))
def send(self):
"""
Blocks until the outgoing queue is empty or the operation times
out. The L{timeout} property controls how long a L{Messenger} will
block before timing out.
"""
self._check(pn_messenger_send(self._mng))
def recv(self, n):
"""
Receives up to I{n} messages into the incoming queue of the
L{Messenger}. This method will block until at least one message is
available or the operation times out.
"""
self._check(pn_messenger_recv(self._mng, n))
def get(self, message):
"""
Moves the message from the head of the incoming message queue into
the supplied message object. Any content in the message will be
overwritten.
@type message: Message
@param message: the destination message object
"""
self._check(pn_messenger_get(self._mng, message._msg))
message._post_decode()
@property
def outgoing(self):
"""
The outgoing queue depth.
"""
return pn_messenger_outgoing(self._mng)
@property
def incoming(self):
"""
The incoming queue depth.
"""
return pn_messenger_incoming(self._mng)
class Message(object):
"""
The L{Message} class is a mutable holder of message content.
@ivar instructions: delivery instructions for the message
@type instructions: dict
@ivar annotations: infrastructure defined message annotations
@type annotations: dict
@ivar properties: application defined message properties
@type properties: dict
@ivar body: message body
@type body: bytes | unicode | dict | list | int | long | float | UUID
"""
DATA = PN_DATA
TEXT = PN_TEXT
AMQP = PN_AMQP
JSON = PN_JSON
DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
def __init__(self):
self._msg = pn_message()
self._id = Data(pn_message_id(self._msg))
self._correlation_id = Data(pn_message_correlation_id(self._msg))
self.instructions = None
self.annotations = None
self.properties = None
self.body = None
def __del__(self):
if hasattr(self, "_msg"):
pn_message_free(self._msg)
del self._msg
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, MessageException)
raise exc("[%s]: %s" % (err, pn_message_error(self._msg)))
else:
return err
def _pre_encode(self):
inst = Data(pn_message_instructions(self._msg))
ann = Data(pn_message_annotations(self._msg))
props = Data(pn_message_properties(self._msg))
body = Data(pn_message_body(self._msg))
inst.clear()
if self.instructions is not None:
inst.put_object(self.instructions)
ann.clear()
if self.annotations is not None:
ann.put_object(self.annotations)
props.clear()
if self.properties is not None:
props.put_object(self.properties)
if self.body is not None:
# XXX: move this out when load/save are gone
body.clear()
body.put_object(self.body)
def _post_decode(self):
inst = Data(pn_message_instructions(self._msg))
ann = Data(pn_message_annotations(self._msg))
props = Data(pn_message_properties(self._msg))
body = Data(pn_message_body(self._msg))
if inst.next():
self.instructions = inst.get_object()
else:
self.instructions = None
if ann.next():
self.annotations = ann.get_object()
else:
self.annotations = None
if props.next():
self.properties = props.get_object()
else:
self.properties = None
if body.next():
self.body = body.get_object()
else:
self.body = None
def clear(self):
"""
Clears the contents of the L{Message}. All fields will be reset to
their default values.
"""
pn_message_clear(self._msg)
self.instructions = None
self.annotations = None
self.properties = None
self.body = None
def _is_inferred(self):
return pn_message_is_inferred(self._msg)
def _set_inferred(self, value):
self._check(pn_message_set_inferred(self._msg, bool(value)))
inferred = property(_is_inferred, _set_inferred)
def _is_durable(self):
return pn_message_is_durable(self._msg)
def _set_durable(self, value):
self._check(pn_message_set_durable(self._msg, bool(value)))
durable = property(_is_durable, _set_durable,
doc="""
The durable property indicates that the message should be held durably
by any intermediaries taking responsibility for the message.
""")
def _get_priority(self):
return pn_message_get_priority(self._msg)
def _set_priority(self, value):
self._check(pn_message_set_priority(self._msg, value))
priority = property(_get_priority, _set_priority,
doc="""
The priority of the message.
""")
def _get_ttl(self):
return pn_message_get_ttl(self._msg)
def _set_ttl(self, value):
self._check(pn_message_set_ttl(self._msg, value))
ttl = property(_get_ttl, _set_ttl,
doc="""
The time to live of the message measured in milliseconds. Expired
messages may be dropped.
""")
def _is_first_acquirer(self):
return pn_message_is_first_acquirer(self._msg)
def _set_first_acquirer(self, value):
self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
doc="""
True iff the recipient is the first to acquire the message.
""")
def _get_delivery_count(self):
return pn_message_get_delivery_count(self._msg)
def _set_delivery_count(self, value):
self._check(pn_message_set_delivery_count(self._msg, value))
delivery_count = property(_get_delivery_count, _set_delivery_count,
doc="""
The number of delivery attempts made for this message.
""")
def _get_id(self):
return self._id.get_object()
def _set_id(self, value):
if type(value) in (int, long):
value = ulong(value)
self._id.rewind()
self._id.put_object(value)
id = property(_get_id, _set_id,
doc="""
The id of the message.
""")
def _get_user_id(self):
return pn_message_get_user_id(self._msg)
def _set_user_id(self, value):
self._check(pn_message_set_user_id(self._msg, value))
user_id = property(_get_user_id, _set_user_id,
doc="""
The user id of the message creator.
""")
def _get_address(self):
return pn_message_get_address(self._msg)
def _set_address(self, value):
self._check(pn_message_set_address(self._msg, value))
address = property(_get_address, _set_address,
doc="""
The address of the message.
""")
def _get_subject(self):
return pn_message_get_subject(self._msg)
def _set_subject(self, value):
self._check(pn_message_set_subject(self._msg, value))
subject = property(_get_subject, _set_subject,
doc="""
The subject of the message.
""")
def _get_reply_to(self):
return pn_message_get_reply_to(self._msg)
def _set_reply_to(self, value):
self._check(pn_message_set_reply_to(self._msg, value))
reply_to = property(_get_reply_to, _set_reply_to,
doc="""
The reply-to address for the message.
""")
def _get_correlation_id(self):
return self._correlation_id.get_object()
def _set_correlation_id(self, value):
if type(value) in (int, long):
value = ulong(value)
self._correlation_id.rewind()
self._correlation_id.put_object(value)
correlation_id = property(_get_correlation_id, _set_correlation_id,
doc="""
The correlation-id for the message.
""")
def _get_content_type(self):
return pn_message_get_content_type(self._msg)
def _set_content_type(self, value):
self._check(pn_message_set_content_type(self._msg, value))
content_type = property(_get_content_type, _set_content_type,
doc="""
The content-type of the message.
""")
def _get_content_encoding(self):
return pn_message_get_content_encoding(self._msg)
def _set_content_encoding(self, value):
self._check(pn_message_set_content_encoding(self._msg, value))
content_encoding = property(_get_content_encoding, _set_content_encoding,
doc="""
The content-encoding of the message.
""")
def _get_expiry_time(self):
return pn_message_get_expiry_time(self._msg)
def _set_expiry_time(self, value):
self._check(pn_message_set_expiry_time(self._msg, value))
expiry_time = property(_get_expiry_time, _set_expiry_time,
doc="""
The expiry time of the message.
""")
def _get_creation_time(self):
return pn_message_get_creation_time(self._msg)
def _set_creation_time(self, value):
self._check(pn_message_set_creation_time(self._msg, value))
creation_time = property(_get_creation_time, _set_creation_time,
doc="""
The creation time of the message.
""")
def _get_group_id(self):
return pn_message_get_group_id(self._msg)
def _set_group_id(self, value):
self._check(pn_message_set_group_id(self._msg, value))
group_id = property(_get_group_id, _set_group_id,
doc="""
The group id of the message.
""")
def _get_group_sequence(self):
return pn_message_get_group_sequence(self._msg)
def _set_group_sequence(self, value):
self._check(pn_message_set_group_sequence(self._msg, value))
group_sequence = property(_get_group_sequence, _set_group_sequence,
doc="""
The sequence of the message within its group.
""")
def _get_reply_to_group_id(self):
return pn_message_get_reply_to_group_id(self._msg)
def _set_reply_to_group_id(self, value):
self._check(pn_message_set_reply_to_group_id(self._msg, value))
reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
doc="""
The group-id for any replies.
""")
# XXX
def _get_format(self):
return pn_message_get_format(self._msg)
def _set_format(self, value):
self._check(pn_message_set_format(self._msg, value))
format = property(_get_format, _set_format,
doc="""
The format of the message.
""")
def encode(self):
self._pre_encode()
sz = 16
while True:
err, data = pn_message_encode(self._msg, sz)
if err == PN_OVERFLOW:
sz *= 2
continue
else:
self._check(err)
return data
def decode(self, data):
self._check(pn_message_decode(self._msg, data, len(data)))
self._post_decode()
def load(self, data):
self._check(pn_message_load(self._msg, data))
def save(self):
sz = 16
while True:
err, data = pn_message_save(self._msg, sz)
if err == PN_OVERFLOW:
sz *= 2
continue
else:
self._check(err)
return data
class DataException(ProtonException):
"""
The DataException class is the root of the Data exception hierarchy.
All exceptions raised by the Data class extend this exception.
"""
pass
class UnmappedType:
def __init__(self, msg):
self.msg = msg
def __repr__(self):
return "UnmappedType(%s)" % self.msg
class ulong(long):
def __repr__(self):
return "ulong(%s)" % long.__repr__(self)
class timestamp(long):
def __repr__(self):
return "timestamp(%s)" % long.__repr__(self)
class symbol(unicode):
def __repr__(self):
return "symbol(%s)" % unicode.__repr__(self)
class char(unicode):
def __repr__(self):
return "char(%s)" % unicode.__repr__(self)
class Described(object):
def __init__(self, descriptor, value):
self.descriptor = descriptor
self.value = value
def __repr__(self):
return "Described(%r, %r)" % (self.descriptor, self.value)
def __eq__(self, o):
if isinstance(o, Described):
return self.descriptor == o.descriptor and self.value == o.value
else:
return False
class Constant(object):
def __init__(self, name):
self.name = name
def __repr__(self):
return self.name
UNDESCRIBED = Constant("UNDESCRIBED")
class Array(object):
def __init__(self, descriptor, type, *elements):
self.descriptor = descriptor
self.type = type
self.elements = elements
def __repr__(self):
if self.elements:
els = ", %s" % (", ".join(map(repr, self.elements)))
else:
els = ""
return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
def __eq__(self, o):
if isinstance(o, Array):
return self.descriptor == o.descriptor and \
self.type == o.type and self.elements == o.elements
else:
return False
class Data:
"""
The L{Data} class provides an interface for decoding, extracting,
creating, and encoding arbitrary AMQP data. A L{Data} object
contains a tree of AMQP values. Leaf nodes in this tree correspond
to scalars in the AMQP type system such as L{ints<INT>} or
L{strings<STRING>}. Non-leaf nodes in this tree correspond to
compound values in the AMQP type system such as L{lists<LIST>},
L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
The root node of the tree is the L{Data} object itself and can have
an arbitrary number of children.
A L{Data} object maintains the notion of the current sibling node
and a current parent node. Siblings are ordered within their parent.
Values are accessed and/or added by using the L{next}, L{prev},
L{enter}, and L{exit} methods to navigate to the desired location in
the tree and using the supplied variety of put_*/get_* methods to
access or add a value of the desired type.
The put_* methods will always add a value I{after} the current node
in the tree. If the current node has a next sibling the put_* method
will overwrite the value on this node. If there is no current node
or the current node has no next sibling then one will be added. The
put_* methods always set the added/modified node to the current
node. The get_* methods read the value of the current node and do
not change which node is current.
The following types of scalar values are supported:
- L{NULL}
- L{BOOL}
- L{UBYTE}
- L{USHORT}
- L{SHORT}
- L{UINT}
- L{INT}
- L{ULONG}
- L{LONG}
- L{FLOAT}
- L{DOUBLE}
- L{BINARY}
- L{STRING}
- L{SYMBOL}
The following types of compound values are supported:
- L{DESCRIBED}
- L{ARRAY}
- L{LIST}
- L{MAP}
"""
NULL = PN_NULL; "A null value."
BOOL = PN_BOOL; "A boolean value."
UBYTE = PN_UBYTE; "An unsigned byte value."
BYTE = PN_BYTE; "A signed byte value."
USHORT = PN_USHORT; "An unsigned short value."
SHORT = PN_SHORT; "A short value."
UINT = PN_UINT; "An unsigned int value."
INT = PN_INT; "A signed int value."
CHAR = PN_CHAR; "A character value."
ULONG = PN_ULONG; "An unsigned long value."
LONG = PN_LONG; "A signed long value."
TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
FLOAT = PN_FLOAT; "A float value."
DOUBLE = PN_DOUBLE; "A double value."
DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
UUID = PN_UUID; "A UUID value."
BINARY = PN_BINARY; "A binary string."
STRING = PN_STRING; "A unicode string."
SYMBOL = PN_SYMBOL; "A symbolic string."
DESCRIBED = PN_DESCRIBED; "A described value."
ARRAY = PN_ARRAY; "An array value."
LIST = PN_LIST; "A list value."
MAP = PN_MAP; "A map value."
def __init__(self, capacity=16):
if type(capacity) in (int, long):
self._data = pn_data(capacity)
self._free = True
else:
self._data = capacity
self._free = False
def __del__(self):
if self._free and hasattr(self, "_data"):
pn_data_free(self._data)
del self._data
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, DataException)
raise exc("[%s]: %s" % (err, "xxx"))
else:
return err
def clear(self):
"""
Clears the data object.
"""
pn_data_clear(self._data)
def rewind(self):
"""
Clears current node and sets the parent to the root node.
"""
pn_data_rewind(self._data)
def next(self):
"""
Advances the current node to its next sibling and returns its
type. If there is no next sibling the current node remains
unchanged and None is returned.
"""
found = pn_data_next(self._data)
if found:
return self.type()
else:
return None
def prev(self):
"""
Advances the current node to its previous sibling and returns its
type. If there is no previous sibling the current node remains
unchanged and None is returned.
"""
found = pn_data_prev(self._data)
if found:
return self.type()
else:
return None
def enter(self):
"""
Sets the parent node to the current node and clears the current node.
"""
return pn_data_enter(self._data)
def exit(self):
"""
Sets the current node to the parent node and the parent node to
its own parent.
"""
return pn_data_exit(self._data)
def type(self):
"""
Returns the type of the current node.
"""
dtype = pn_data_type(self._data)
if dtype == -1:
return None
else:
return dtype
def encode(self):
"""
Returns a representation of the data encoded in AMQP format.
"""
size = 1024
while True:
cd, enc = pn_data_encode(self._data, size)
if cd == PN_OVERFLOW:
size *= 2
elif cd >= 0:
return enc
else:
self._check(cd)
def decode(self, encoded):
"""
Decodes the first value from supplied AMQP data and returns the
number of bytes consumed.
@type encoded: binary
@param encoded: AMQP encoded binary data
"""
return self._check(pn_data_decode(self._data, encoded))
def put_list(self):
"""
Puts a list value. Elements may be filled by entering the list
node and putting element values.
>>> data = Data()
>>> data.put_list()
>>> data.enter()
>>> data.put_int(1)
>>> data.put_int(2)
>>> data.put_int(3)
>>> data.exit()
"""
self._check(pn_data_put_list(self._data))
def put_map(self):
"""
Puts a map value. Elements may be filled by entering the map node
and putting alternating key value pairs.
>>> data = Data()
>>> data.put_map()
>>> data.enter()
>>> data.put_string("key")
>>> data.put_string("value")
>>> data.exit()
"""
self._check(pn_data_put_map(self._data))
def put_array(self, described, element_type):
"""
Puts an array value. Elements may be filled by entering the array
node and putting the element values. The values must all be of the
specified array element type. If an array is described then the
first child value of the array is the descriptor and may be of any
type.
>>> data = Data()
>>>
>>> data.put_array(False, Data.INT)
>>> data.enter()
>>> data.put_int(1)
>>> data.put_int(2)
>>> data.put_int(3)
>>> data.exit()
>>>
>>> data.put_array(True, Data.DOUBLE)
>>> data.enter()
>>> data.put_symbol("array-descriptor")
>>> data.put_double(1.1)
>>> data.put_double(1.2)
>>> data.put_double(1.3)
>>> data.exit()
@type described: bool
@param described: specifies whether the array is described
@type element_type: int
@param element_type: the type of the array elements
"""
self._check(pn_data_put_array(self._data, described, element_type))
def put_described(self):
"""
Puts a described value. A described node has two children, the
descriptor and the value. These are specified by entering the node
and putting the desired values.
>>> data = Data()
>>> data.put_described()
>>> data.enter()
>>> data.put_symbol("value-descriptor")
>>> data.put_string("the value")
>>> data.exit()
"""
self._check(pn_data_put_described(self._data))
def put_null(self):
"""
Puts a null value.
"""
self._check(pn_data_put_null(self._data))
def put_bool(self, b):
"""
Puts a boolean value.
@param b: a boolean value
"""
self._check(pn_data_put_bool(self._data, b))
def put_ubyte(self, ub):
"""
Puts an unsigned byte value.
@param ub: an integral value
"""
self._check(pn_data_put_ubyte(self._data, ub))
def put_byte(self, b):
"""
Puts a signed byte value.
@param b: an integral value
"""
self._check(pn_data_put_byte(self._data, b))
def put_ushort(self, us):
"""
Puts an unsigned short value.
@param us: an integral value.
"""
self._check(pn_data_put_ushort(self._data, us))
def put_short(self, s):
"""
Puts a signed short value.
@param s: an integral value
"""
self._check(pn_data_put_short(self._data, s))
def put_uint(self, ui):
"""
Puts an unsigned int value.
@param ui: an integral value
"""
self._check(pn_data_put_uint(self._data, ui))
def put_int(self, i):
"""
Puts a signed int value.
@param i: an integral value
"""
self._check(pn_data_put_int(self._data, i))
def put_char(self, c):
"""
Puts a char value.
@param c: a single character
"""
self._check(pn_data_put_char(self._data, ord(c)))
def put_ulong(self, ul):
"""
Puts an unsigned long value.
@param ul: an integral value
"""
self._check(pn_data_put_ulong(self._data, ul))
def put_long(self, l):
"""
Puts a signed long value.
@param l: an integral value
"""
self._check(pn_data_put_long(self._data, l))
def put_timestamp(self, t):
"""
Puts a timestamp value.
@param t: an integral value
"""
self._check(pn_data_put_timestamp(self._data, t))
def put_float(self, f):
"""
Puts a float value.
@param f: a floating point value
"""
self._check(pn_data_put_float(self._data, f))
def put_double(self, d):
"""
Puts a double value.
@param d: a floating point value.
"""
self._check(pn_data_put_double(self._data, d))
def put_decimal32(self, d):
"""
Puts a decimal32 value.
@param d: a decimal32 value
"""
self._check(pn_data_put_decimal32(self._data, d))
def put_decimal64(self, d):
"""
Puts a decimal64 value.
@param d: a decimal64 value
"""
self._check(pn_data_put_decimal64(self._data, d))
def put_decimal128(self, d):
"""
Puts a decimal128 value.
@param d: a decimal128 value
"""
self._check(pn_data_put_decimal128(self._data, d))
def put_uuid(self, u):
"""
Puts a UUID value.
@param u: a uuid value
"""
self._check(pn_data_put_uuid(self._data, u.bytes))
def put_binary(self, b):
"""
Puts a binary value.
@type b: binary
@param b: a binary value
"""
self._check(pn_data_put_binary(self._data, b))
def put_string(self, s):
"""
Puts a unicode value.
@type s: unicode
@param s: a unicode value
"""
self._check(pn_data_put_string(self._data, s.encode("utf8")))
def put_symbol(self, s):
"""
Puts a symbolic value.
@type s: string
@param s: the symbol name
"""
self._check(pn_data_put_symbol(self._data, s))
def get_list(self):
"""
If the current node is a list, return the number of elements,
otherwise return zero. List elements can be accessed by entering
the list.
>>> count = data.get_list()
>>> data.enter()
>>> for i in range(count):
... type = data.next()
... if type == Data.STRING:
... print data.get_string()
... elif type == ...:
... ...
>>> data.exit()
"""
return pn_data_get_list(self._data)
def get_map(self):
"""
If the current node is a map, return the number of child elements,
otherwise return zero. Key value pairs can be accessed by entering
the map.
>>> count = data.get_map()
>>> data.enter()
>>> for i in range(count/2):
... type = data.next()
... if type == Data.STRING:
... print data.get_string()
... elif type == ...:
... ...
>>> data.exit()
"""
return pn_data_get_map(self._data)
def get_array(self):
"""
If the current node is an array, return a tuple of the element
count, a boolean indicating whether the array is described, and
the type of each element, otherwise return (0, False, None). Array
data can be accessed by entering the array.
>>> # read an array of strings with a symbolic descriptor
>>> count, described, type = data.get_array()
>>> data.enter()
>>> data.next()
>>> print "Descriptor:", data.get_symbol()
>>> for i in range(count):
... data.next()
... print "Element:", data.get_string()
>>> data.exit()
"""
count = pn_data_get_array(self._data)
described = pn_data_is_array_described(self._data)
type = pn_data_get_array_type(self._data)
if type == -1:
type = None
return count, described, type
def is_described(self):
"""
Checks if the current node is a described value. The descriptor
and value may be accessed by entering the described value.
>>> # read a symbolically described string
>>> assert data.is_described() # will error if the current node is not described
>>> data.enter()
>>> print data.get_symbol()
>>> print data.get_string()
>>> data.exit()
"""
return pn_data_is_described(self._data)
def is_null(self):
"""
Checks if the current node is a null.
"""
self._check(pn_data_get_null(self._data))
def get_bool(self):
"""
If the current node is a boolean, returns its value, returns False
otherwise.
"""
return pn_data_get_bool(self._data)
def get_ubyte(self):
"""
If the current node is an unsigned byte, returns its value,
returns 0 otherwise.
"""
return pn_data_get_ubyte(self._data)
def get_byte(self):
"""
If the current node is a signed byte, returns its value, returns 0
otherwise.
"""
return pn_data_get_byte(self._data)
def get_ushort(self):
"""
If the current node is an unsigned short, returns its value,
returns 0 otherwise.
"""
return pn_data_get_ushort(self._data)
def get_short(self):
"""
If the current node is a signed short, returns its value, returns
0 otherwise.
"""
return pn_data_get_short(self._data)
def get_uint(self):
"""
If the current node is an unsigned int, returns its value, returns
0 otherwise.
"""
return pn_data_get_uint(self._data)
def get_int(self):
"""
If the current node is a signed int, returns its value, returns 0
otherwise.
"""
return pn_data_get_int(self._data)
def get_char(self):
"""
If the current node is a char, returns its value, returns 0
otherwise.
"""
return char(unichr(pn_data_get_char(self._data)))
def get_ulong(self):
"""
If the current node is an unsigned long, returns its value,
returns 0 otherwise.
"""
return ulong(pn_data_get_ulong(self._data))
def get_long(self):
"""
If the current node is an signed long, returns its value, returns
0 otherwise.
"""
return pn_data_get_long(self._data)
def get_timestamp(self):
"""
If the current node is a timestamp, returns its value, returns 0
otherwise.
"""
return timestamp(pn_data_get_timestamp(self._data))
def get_float(self):
"""
If the current node is a float, returns its value, raises 0
otherwise.
"""
return pn_data_get_float(self._data)
def get_double(self):
"""
If the current node is a double, returns its value, returns 0
otherwise.
"""
return pn_data_get_double(self._data)
# XXX: need to convert
def get_decimal32(self):
"""
If the current node is a decimal32, returns its value, returns 0
otherwise.
"""
return pn_data_get_decimal32(self._data)
# XXX: need to convert
def get_decimal64(self):
"""
If the current node is a decimal64, returns its value, returns 0
otherwise.
"""
return pn_data_get_decimal64(self._data)
# XXX: need to convert
def get_decimal128(self):
"""
If the current node is a decimal128, returns its value, returns 0
otherwise.
"""
return pn_data_get_decimal128(self._data)
def get_uuid(self):
"""
If the current node is a UUID, returns its value, returns None
otherwise.
"""
if pn_data_type(self._data) == Data.UUID:
return uuid.UUID(bytes=pn_data_get_uuid(self._data))
else:
return None
def get_binary(self):
"""
If the current node is binary, returns its value, returns ""
otherwise.
"""
return pn_data_get_binary(self._data)
def get_string(self):
"""
If the current node is a string, returns its value, returns ""
otherwise.
"""
return pn_data_get_string(self._data).decode("utf8")
def get_symbol(self):
"""
If the current node is a symbol, returns its value, returns ""
otherwise.
"""
return symbol(pn_data_get_symbol(self._data))
def copy(self, src):
self._check(pn_data_copy(self._data, src._data))
def format(self):
sz = 16
while True:
err, result = pn_data_format(self._data, sz)
if err == PN_OVERFLOW:
sz *= 2
continue
else:
self._check(err)
return result
def dump(self):
pn_data_dump(self._data)
def put_dict(self, d):
self.put_map()
self.enter()
try:
for k, v in d.items():
self.put_object(k)
self.put_object(v)
finally:
self.exit()
def get_dict(self):
if self.enter():
try:
result = {}
while self.next():
k = self.get_object()
if self.next():
v = self.get_object()
else:
v = None
result[k] = v
finally:
self.exit()
return result
def put_sequence(self, s):
self.put_list()
self.enter()
try:
for o in s:
self.put_object(o)
finally:
self.exit()
def get_sequence(self):
if self.enter():
try:
result = []
while self.next():
result.append(self.get_object())
finally:
self.exit()
return result
def get_py_described(self):
if self.enter():
try:
self.next()
descriptor = self.get_object()
self.next()
value = self.get_object()
finally:
self.exit()
return Described(descriptor, value)
def put_py_described(self, d):
self.put_described()
self.enter()
try:
self.put_object(d.descriptor)
self.put_object(d.value)
finally:
self.exit()
def get_py_array(self):
count, described, type = self.get_array()
if self.enter():
try:
if described:
self.next()
descriptor = self.get_object()
else:
descriptor = UNDESCRIBED
elements = []
while self.next():
elements.append(self.get_object())
finally:
self.exit()
return Array(descriptor, type, *elements)
def put_py_array(self, a):
self.put_array(a.descriptor != UNDESCRIBED, a.type)
self.enter()
try:
for e in a.elements:
self.put_object(e)
finally:
self.exit()
put_mappings = {
None.__class__: lambda s, _: s.put_null(),
bool: put_bool,
dict: put_dict,
list: put_sequence,
tuple: put_sequence,
unicode: put_string,
bytes: put_binary,
symbol: put_symbol,
int: put_long,
char: put_char,
long: put_long,
ulong: put_ulong,
timestamp: put_timestamp,
float: put_double,
uuid.UUID: put_uuid,
Described: put_py_described,
Array: put_py_array
}
get_mappings = {
NULL: lambda s: None,
BOOL: get_bool,
BYTE: get_byte,
UBYTE: get_ubyte,
SHORT: get_short,
USHORT: get_ushort,
INT: get_int,
UINT: get_uint,
CHAR: get_char,
LONG: get_long,
ULONG: get_ulong,
TIMESTAMP: get_timestamp,
FLOAT: get_float,
DOUBLE: get_double,
DECIMAL32: get_decimal32,
DECIMAL64: get_decimal64,
DECIMAL128: get_decimal128,
UUID: get_uuid,
BINARY: get_binary,
STRING: get_string,
SYMBOL: get_symbol,
DESCRIBED: get_py_described,
ARRAY: get_py_array,
LIST: get_sequence,
MAP: get_dict
}
def put_object(self, obj):
putter = self.put_mappings[obj.__class__]
putter(self, obj)
def get_object(self):
type = self.type()
if type is None: return None
getter = self.get_mappings.get(type)
if getter:
return getter(self)
else:
self.dump()
return UnmappedType(str(type))
class ConnectionException(ProtonException):
pass
class Endpoint(object):
LOCAL_UNINIT = PN_LOCAL_UNINIT
REMOTE_UNINIT = PN_REMOTE_UNINIT
LOCAL_ACTIVE = PN_LOCAL_ACTIVE
REMOTE_ACTIVE = PN_REMOTE_ACTIVE
LOCAL_CLOSED = PN_LOCAL_CLOSED
REMOTE_CLOSED = PN_REMOTE_CLOSED
def wrap_connection(conn):
if not conn: return None
ctx = pn_connection_get_context(conn)
if ctx: return ctx
wrapper = Connection(_conn=conn)
return wrapper
class Connection(Endpoint):
def __init__(self, _conn=None):
if _conn:
self._conn = _conn
else:
self._conn = pn_connection()
pn_connection_set_context(self._conn, self)
def __del__(self):
if hasattr(self, "_conn"):
pn_connection_free(self._conn)
del self._conn
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, ConnectionException)
raise exc("[%s]: %s" % (err, pn_connection_error(self._conn)))
else:
return err
def _get_container(self):
return pn_connection_get_container(self._conn)
def _set_container(self, name):
return pn_connection_set_container(self._conn, name)
container = property(_get_container, _set_container)
def _get_hostname(self):
return pn_connection_get_hostname(self._conn)
def _set_hostname(self, name):
return pn_connection_set_hostname(self._conn, name)
hostname = property(_get_hostname, _set_hostname)
@property
def remote_container(self):
return pn_connection_remote_container(self._conn)
@property
def remote_hostname(self):
return pn_connection_remote_hostname(self._conn)
@property
def offered_capabilities(self):
return Data(pn_connection_offered_capabilities(self._conn))
@property
def desired_capabilities(self):
return Data(pn_connection_desired_capabilities(self._conn))
@property
def remote_offered_capabilities(self):
return Data(pn_connection_remote_offered_capabilities(self._conn))
@property
def remote_desired_capabilities(self):
return Data(pn_connection_remote_desired_capabilities(self._conn))
def open(self):
pn_connection_open(self._conn)
def close(self):
pn_connection_close(self._conn)
@property
def state(self):
return pn_connection_state(self._conn)
@property
def writable(self):
return pn_connection_writable(self._conn)
def session(self):
return wrap_session(pn_session(self._conn))
def session_head(self, mask):
return wrap_session(pn_session_head(self._conn, mask))
def link_head(self, mask):
return wrap_link(pn_link_head(self._conn, mask))
@property
def work_head(self):
return wrap_delivery(pn_work_head(self._conn))
class SessionException(ProtonException):
pass
def wrap_session(ssn):
if ssn is None: return None
ctx = pn_session_get_context(ssn)
if ctx:
return ctx
else:
wrapper = Session(ssn)
pn_session_set_context(ssn, wrapper)
return wrapper
class Session(Endpoint):
def __init__(self, ssn):
self._ssn = ssn
def __del__(self):
if hasattr(self, "_ssn"):
pn_session_free(self._ssn)
del self._ssn
def open(self):
pn_session_open(self._ssn)
def close(self):
pn_session_close(self._ssn)
@property
def state(self):
return pn_session_state(self._ssn)
@property
def connection(self):
return wrap_connection(pn_session_connection(self._ssn))
def sender(self, name):
return wrap_link(pn_sender(self._ssn, name))
def receiver(self, name):
return wrap_link(pn_receiver(self._ssn, name))
class LinkException(ProtonException):
pass
def wrap_link(link):
if link is None: return None
ctx = pn_link_get_context(link)
if ctx:
return ctx
else:
if pn_link_is_sender(link):
wrapper = Sender(link)
else:
wrapper = Receiver(link)
pn_link_set_context(link, wrapper)
return wrapper
class Link(Endpoint):
def __init__(self, link):
self._link = link
def __del__(self):
if hasattr(self, "_link"):
pn_link_free(self._link)
del self._link
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, LinkException)
raise exc("[%s]: %s" % (err, pn_link_error(self._link)))
else:
return err
def open(self):
pn_link_open(self._link)
def close(self):
pn_link_close(self._link)
@property
def state(self):
return pn_link_state(self._link)
@property
def source(self):
return Terminus(pn_link_source(self._link))
@property
def target(self):
return Terminus(pn_link_target(self._link))
@property
def remote_source(self):
return Terminus(pn_link_remote_source(self._link))
@property
def remote_target(self):
return Terminus(pn_link_remote_target(self._link))
@property
def session(self):
return wrap_session(pn_link_session(self._link))
def delivery(self, tag):
return wrap_delivery(pn_delivery(self._link, tag))
@property
def current(self):
return wrap_delivery(pn_link_current(self._link))
def advance(self):
return pn_link_advance(self._link)
@property
def unsettled(self):
return pn_link_unsettled(self._link)
@property
def credit(self):
return pn_link_credit(self._link)
@property
def available(self):
return pn_link_available(self._link)
@property
def queued(self):
return pn_link_queued(self._link)
def next(self, mask):
return wrap_link(pn_link_next(self._link, mask))
class Terminus(object):
UNSPECIFIED = PN_UNSPECIFIED
SOURCE = PN_SOURCE
TARGET = PN_TARGET
COORDINATOR = PN_COORDINATOR
NONDURABLE = PN_NONDURABLE
CONFIGURATION = PN_CONFIGURATION
DELIVERIES = PN_DELIVERIES
def __init__(self, impl):
self._impl = impl
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, LinkException)
raise exc("[%s]" % err)
else:
return err
def _get_type(self):
return pn_terminus_get_type(self._impl)
def _set_type(self, type):
self._check(pn_terminus_set_type(self._impl, type))
type = property(_get_type, _set_type)
def _get_address(self):
return pn_terminus_get_address(self._impl)
def _set_address(self, address):
self._check(pn_terminus_set_address(self._impl, address))
address = property(_get_address, _set_address)
def _get_durability(self):
return pn_terminus_get_durability(self._impl)
def _set_durability(self, seconds):
self._check(pn_terminus_set_durability(self._impl, seconds))
durability = property(_get_durability, _set_durability)
def _get_expiry_policy(self):
return pn_terminus_get_expiry_policy(self._impl)
def _set_expiry_policy(self, seconds):
self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
def _get_timeout(self):
return pn_terminus_get_timeout(self._impl)
def _set_timeout(self, seconds):
self._check(pn_terminus_set_timeout(self._impl, seconds))
timeout = property(_get_timeout, _set_timeout)
def _is_dynamic(self):
return pn_terminus_is_dynamic(self._impl)
def _set_dynamic(self, dynamic):
self._check(pn_terminus_set_dynamic(self._impl, dynamic))
dynamic = property(_is_dynamic, _set_dynamic)
@property
def properties(self):
return Data(pn_terminus_properties(self._impl))
@property
def capabilities(self):
return Data(pn_terminus_capabilities(self._impl))
@property
def outcomes(self):
return Data(pn_terminus_outcomes(self._impl))
@property
def filter(self):
return Data(pn_terminus_filter(self._impl))
def copy(self, src):
self._check(pn_terminus_copy(self._impl, src._impl))
class Sender(Link):
def offered(self, n):
pn_link_offered(self._link, n)
def send(self, bytes):
return self._check(pn_link_send(self._link, bytes))
def drained(self):
pn_link_drained(self._link)
class Receiver(Link):
def flow(self, n):
pn_link_flow(self._link, n)
def recv(self, limit):
n, bytes = pn_link_recv(self._link, limit)
if n == PN_EOS:
return None
else:
self._check(n)
return bytes
def drain(self, n):
pn_link_drain(self._link, n)
def wrap_delivery(dlv):
if not dlv: return None
ctx = pn_delivery_get_context(dlv)
if ctx: return ctx
wrapper = Delivery(dlv)
pn_delivery_set_context(dlv, wrapper)
return wrapper
class Delivery(object):
ACCEPTED = PN_ACCEPTED
def __init__(self, dlv):
self._dlv = dlv
@property
def tag(self):
return pn_delivery_tag(self._dlv)
@property
def writable(self):
return pn_delivery_writable(self._dlv)
@property
def readable(self):
return pn_delivery_readable(self._dlv)
@property
def updated(self):
return pn_delivery_updated(self._dlv)
def update(self, state):
pn_delivery_update(self._dlv, state)
@property
def local_state(self):
return pn_delivery_local_state(self._dlv)
@property
def remote_state(self):
return pn_delivery_remote_state(self._dlv)
@property
def settled(self):
return pn_delivery_settled(self._dlv)
def settle(self):
pn_delivery_settle(self._dlv)
@property
def work_next(self):
return wrap_delivery(pn_work_next(self._dlv))
class TransportException(ProtonException):
pass
class Transport(object):
TRACE_DRV = PN_TRACE_DRV
TRACE_FRM = PN_TRACE_FRM
TRACE_RAW = PN_TRACE_RAW
def __init__(self):
self._trans = pn_transport()
def __del__(self):
if hasattr(self, "_trans"):
pn_transport_free(self._trans)
del self._trans
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, TransportException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._trans))))
else:
return err
def bind(self, connection):
self._check(pn_transport_bind(self._trans, connection._conn))
def trace(self, n):
pn_transport_trace(self._trans, n)
def tick(self, now):
return pn_transport_tick(self._trans, now)
def output(self, n):
cd, out = pn_transport_output(self._trans, n)
if cd == PN_EOS:
return None
else:
self._check(cd)
return out
def input(self, binary):
n = pn_transport_input(self._trans, binary)
if n == PN_EOS:
return None
else:
return self._check(n)
def _get_max_frame_size(self):
return pn_transport_get_max_frame(self._trans)
def _set_max_frame_size(self, value):
pn_transport_set_max_frame(self._trans, value)
max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
doc="""
Sets the maximum size for received frames (in bytes).
""")
@property
def remote_max_frame_size(self):
return pn_transport_get_remote_max_frame(self._trans)
class SASLException(TransportException):
pass
class SASL(object):
OK = PN_SASL_OK
AUTH = PN_SASL_AUTH
def __init__(self, transport):
self._sasl = pn_sasl(transport._trans)
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, SASLException)
raise exc("[%s]" % (err))
else:
return err
def mechanisms(self, mechs):
pn_sasl_mechanisms(self._sasl, mechs)
def client(self):
pn_sasl_client(self._sasl)
def server(self):
pn_sasl_server(self._sasl)
def plain(self, user, password):
pn_sasl_plain(self._sasl, user, password)
def send(self, data):
self._check(pn_sasl_send(self._sasl, data, len(data)))
def recv(self):
sz = 16
while True:
n, data = pn_sasl_recv(self._sasl, sz)
if n == PN_OVERFLOW:
sz *= 2
continue
elif n == PN_EOS:
return None
else:
self._check(n)
return data
@property
def outcome(self):
outcome = pn_sasl_outcome(self._sasl)
if outcome == PN_SASL_NONE:
return None
else:
return outcome
def done(self, outcome):
pn_sasl_done(self._sasl, outcome)
class SSLException(TransportException):
pass
class SSLUnavailable(SSLException):
pass
class SSL(object):
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, SSLException)
raise exc("SSL failure.")
else:
return err
def __init__(self, transport):
self._ssl = pn_ssl(transport._trans)
if self._ssl is None:
raise SSLUnavailable()
MODE_CLIENT = PN_SSL_MODE_CLIENT
MODE_SERVER = PN_SSL_MODE_SERVER
def init(self, mode):
return self._check( pn_ssl_init(self._ssl, mode) )
def set_credentials(self, cert_file, key_file, password):
return self._check( pn_ssl_set_credentials(self._ssl, cert_file, key_file,
password) )
def set_trusted_ca_db(self, certificate_db):
return self._check( pn_ssl_set_trusted_ca_db(self._ssl, certificate_db) )
def allow_unsecured_client(self):
return self._check( pn_ssl_allow_unsecured_client(self._ssl) )
VERIFY_PEER = PN_SSL_VERIFY_PEER
ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
def set_peer_authentication(self, verify_mode, trusted_CAs=None):
return self._check( pn_ssl_set_peer_authentication(self._ssl, verify_mode,
trusted_CAs) )
def peer_authentication(self):
# @TODO: fix up buffer return value...
pass
def cipher_name(self):
rc, name = pn_ssl_get_cipher_name( self._ssl, 128 )
if rc:
return name
return None
def protocol_name(self):
rc, name = pn_ssl_get_protocol_name( self._ssl, 128 )
if rc:
return name
return None
__all__ = ["Messenger", "Message", "ProtonException", "MessengerException",
"MessageException", "Timeout", "Data", "Endpoint", "Connection",
"Session", "Link", "Terminus", "Sender", "Receiver", "Delivery",
"Transport", "TransportException", "SASL", "SSL", "Described",
"Array", "symbol", "char", "timestamp", "ulong", "UNDESCRIBED",
"SSLUnavailable", "PN_SESSION_WINDOW"]