blob: e375723a759667c746d7b940b3271266460e9963 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
The proton module defines a suite of APIs that implement the AMQP 1.0
protocol.
The proton APIs consist of the following classes:
- L{Messenger} -- A messaging endpoint.
- L{Message} -- A class for creating and/or accessing AMQP message content.
- L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
data.
"""
from cproton import *
import weakref, re, socket
try:
import uuid
except ImportError:
"""
No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases.
"""
import struct
class uuid:
class UUID:
def __init__(self, hex=None, bytes=None):
if [hex, bytes].count(None) != 1:
raise TypeError("need one of hex or bytes")
if bytes is not None:
self.bytes = bytes
elif hex is not None:
fields=hex.split("-")
fields[4:5] = [fields[4][:4], fields[4][4:]]
self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
def __cmp__(self, other):
if isinstance(other, uuid.UUID):
return cmp(self.bytes, other.bytes)
else:
return -1
def __str__(self):
return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
def __repr__(self):
return "UUID(%r)" % str(self)
def __hash__(self):
return self.bytes.__hash__()
import os, random, socket, time
rand = random.Random()
rand.seed((os.getpid(), time.time(), socket.gethostname()))
def random_uuid():
bytes = [rand.randint(0, 255) for i in xrange(16)]
# From RFC4122, the version bits are set to 0100
bytes[7] &= 0x0F
bytes[7] |= 0x40
# From RFC4122, the top two bits of byte 8 get set to 01
bytes[8] &= 0x3F
bytes[8] |= 0x80
return "".join(map(chr, bytes))
def uuid4():
return uuid.UUID(bytes=random_uuid())
def generate_uuid():
return uuid.uuid4()
try:
bytes()
except NameError:
bytes = str
VERSION_MAJOR = PN_VERSION_MAJOR
VERSION_MINOR = PN_VERSION_MINOR
API_LANGUAGE = "C"
IMPLEMENTATION_LANGUAGE = "C"
class Constant(object):
def __init__(self, name):
self.name = name
def __repr__(self):
return self.name
class ProtonException(Exception):
"""
The root of the proton exception hierarchy. All proton exception
classes derive from this exception.
"""
pass
class Timeout(ProtonException):
"""
A timeout exception indicates that a blocking operation has timed
out.
"""
pass
class Interrupt(ProtonException):
"""
An interrupt exception indicaes that a blocking operation was interrupted.
"""
pass
class MessengerException(ProtonException):
"""
The root of the messenger exception hierarchy. All exceptions
generated by the messenger class derive from this exception.
"""
pass
class MessageException(ProtonException):
"""
The MessageException class is the root of the message exception
hierarhcy. All exceptions generated by the Message class derive from
this exception.
"""
pass
EXCEPTIONS = {
PN_TIMEOUT: Timeout,
PN_INTR: Interrupt
}
PENDING = Constant("PENDING")
ACCEPTED = Constant("ACCEPTED")
REJECTED = Constant("REJECTED")
RELEASED = Constant("RELEASED")
ABORTED = Constant("ABORTED")
SETTLED = Constant("SETTLED")
STATUSES = {
PN_STATUS_ABORTED: ABORTED,
PN_STATUS_ACCEPTED: ACCEPTED,
PN_STATUS_REJECTED: REJECTED,
PN_STATUS_RELEASED: RELEASED,
PN_STATUS_PENDING: PENDING,
PN_STATUS_SETTLED: SETTLED,
PN_STATUS_UNKNOWN: None
}
AUTOMATIC = Constant("AUTOMATIC")
MANUAL = Constant("MANUAL")
class Messenger(object):
"""
The L{Messenger} class defines a high level interface for sending
and receiving L{Messages<Message>}. Every L{Messenger} contains a
single logical queue of incoming messages and a single logical queue
of outgoing messages. These messages in these queues may be destined
for, or originate from, a variety of addresses.
The messenger interface is single-threaded. All methods
except one (L{interrupt}) are intended to be used from within
the messenger thread.
Address Syntax
==============
An address has the following form::
[ amqp[s]:// ] [user[:password]@] domain [/[name]]
Where domain can be one of::
host | host:port | ip | ip:port | name
The following are valid examples of addresses:
- example.org
- example.org:1234
- amqp://example.org
- amqps://example.org
- example.org/incoming
- amqps://example.org/outgoing
- amqps://fred:trustno1@example.org
- 127.0.0.1:1234
- amqps://127.0.0.1:1234
Sending & Receiving Messages
============================
The L{Messenger} class works in conjuction with the L{Message} class. The
L{Message} class is a mutable holder of message content.
The L{put} method copies its L{Message} to the outgoing queue, and may
send queued messages if it can do so without blocking. The L{send}
method blocks until it has sent the requested number of messages,
or until a timeout interrupts the attempt.
>>> message = Message()
>>> for i in range(3):
... message.address = "amqp://host/queue"
... message.subject = "Hello World %i" % i
... messenger.put(message)
>>> messenger.send()
Similarly, the L{recv} method receives messages into the incoming
queue, and may block as it attempts to receive the requested number
of messages, or until timeout is reached. It may receive fewer
than the requested number. The L{get} method pops the
eldest L{Message} off the incoming queue and copies it into the L{Message}
object that you supply. It will not block.
>>> message = Message()
>>> messenger.recv(10):
>>> while messenger.incoming > 0:
... messenger.get(message)
... print message.subject
Hello World 0
Hello World 1
Hello World 2
The blocking flag allows you to turn off blocking behavior entirely,
in which case L{send} and L{recv} will do whatever they can without
blocking, and then return. You can then look at the number
of incoming and outgoing messages to see how much outstanding work
still remains.
"""
def __init__(self, name=None):
"""
Construct a new L{Messenger} with the given name. The name has
global scope. If a NULL name is supplied, a UUID based name will
be chosen.
@type name: string
@param name: the name of the messenger or None
"""
self._mng = pn_messenger(name)
self._selectables = {}
def __del__(self):
"""
Destroy the L{Messenger}. This will close all connections that
are managed by the L{Messenger}. Call the L{stop} method before
destroying the L{Messenger}.
"""
if hasattr(self, "_mng"):
pn_messenger_free(self._mng)
del self._mng
def _check(self, err):
if err < 0:
if (err == PN_INPROGRESS):
return
exc = EXCEPTIONS.get(err, MessengerException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng))))
else:
return err
@property
def name(self):
"""
The name of the L{Messenger}.
"""
return pn_messenger_name(self._mng)
def _get_certificate(self):
return pn_messenger_get_certificate(self._mng)
def _set_certificate(self, value):
self._check(pn_messenger_set_certificate(self._mng, value))
certificate = property(_get_certificate, _set_certificate,
doc="""
Path to a certificate file for the L{Messenger}. This certificate is
used when the L{Messenger} accepts or establishes SSL/TLS connections.
This property must be specified for the L{Messenger} to accept
incoming SSL/TLS connections and to establish client authenticated
outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
connections do not require this property.
""")
def _get_private_key(self):
return pn_messenger_get_private_key(self._mng)
def _set_private_key(self, value):
self._check(pn_messenger_set_private_key(self._mng, value))
private_key = property(_get_private_key, _set_private_key,
doc="""
Path to a private key file for the L{Messenger's<Messenger>}
certificate. This property must be specified for the L{Messenger} to
accept incoming SSL/TLS connections and to establish client
authenticated outgoing SSL/TLS connection. Non client authenticated
SSL/TLS connections do not require this property.
""")
def _get_password(self):
return pn_messenger_get_password(self._mng)
def _set_password(self, value):
self._check(pn_messenger_set_password(self._mng, value))
password = property(_get_password, _set_password,
doc="""
This property contains the password for the L{Messenger.private_key}
file, or None if the file is not encrypted.
""")
def _get_trusted_certificates(self):
return pn_messenger_get_trusted_certificates(self._mng)
def _set_trusted_certificates(self, value):
self._check(pn_messenger_set_trusted_certificates(self._mng, value))
trusted_certificates = property(_get_trusted_certificates,
_set_trusted_certificates,
doc="""
A path to a database of trusted certificates for use in verifying the
peer on an SSL/TLS connection. If this property is None, then the peer
will not be verified.
""")
def _get_timeout(self):
t = pn_messenger_get_timeout(self._mng)
if t == -1:
return None
else:
return millis2secs(t)
def _set_timeout(self, value):
if value is None:
t = -1
else:
t = secs2millis(value)
self._check(pn_messenger_set_timeout(self._mng, t))
timeout = property(_get_timeout, _set_timeout,
doc="""
The timeout property contains the default timeout for blocking
operations performed by the L{Messenger}.
""")
def _is_blocking(self):
return pn_messenger_is_blocking(self._mng)
def _set_blocking(self, b):
self._check(pn_messenger_set_blocking(self._mng, b))
blocking = property(_is_blocking, _set_blocking,
doc="""
Enable or disable blocking behavior during L{Message} sending
and receiving. This affects every blocking call, with the
exception of L{work}. Currently, the affected calls are
L{send}, L{recv}, and L{stop}.
""")
def _is_passive(self):
return pn_messenger_is_passive(self._mng)
def _set_passive(self, b):
self._check(pn_messenger_set_passive(self._mng, b))
passive = property(_is_passive, _set_passive,
doc="""
When passive is set to true, Messenger will not attempt to perform I/O
internally. In this mode it is necessary to use the selectables API to
drive any I/O needed to perform requested actions. In this mode
Messenger will never block.
""")
def _get_incoming_window(self):
return pn_messenger_get_incoming_window(self._mng)
def _set_incoming_window(self, window):
self._check(pn_messenger_set_incoming_window(self._mng, window))
incoming_window = property(_get_incoming_window, _set_incoming_window,
doc="""
The incoming tracking window for the messenger. The messenger will
track the remote status of this many incoming deliveries after they
have been accepted or rejected. Defaults to zero.
L{Messages<Message>} enter this window only when you take them into your application
using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>}
without explicitly accepting or rejecting the oldest message, then the
message that passes beyond the edge of the incoming window will be assigned
the default disposition of its link.
""")
def _get_outgoing_window(self):
return pn_messenger_get_outgoing_window(self._mng)
def _set_outgoing_window(self, window):
self._check(pn_messenger_set_outgoing_window(self._mng, window))
outgoing_window = property(_get_outgoing_window, _set_outgoing_window,
doc="""
The outgoing tracking window for the messenger. The messenger will
track the remote status of this many outgoing deliveries after calling
send. Defaults to zero.
A L{Message} enters this window when you call the put() method with the
message. If your outgoing window size is I{n}, and you call L{put} I{n}+1
times, status information will no longer be available for the
first message.
""")
def start(self):
"""
Currently a no-op placeholder.
For future compatibility, do not L{send} or L{recv} messages
before starting the L{Messenger}.
"""
self._check(pn_messenger_start(self._mng))
def stop(self):
"""
Transitions the L{Messenger} to an inactive state. An inactive
L{Messenger} will not send or receive messages from its internal
queues. A L{Messenger} should be stopped before being discarded to
ensure a clean shutdown handshake occurs on any internally managed
connections.
"""
self._check(pn_messenger_stop(self._mng))
@property
def stopped(self):
"""
Returns true iff a L{Messenger} is in the stopped state.
This function does not block.
"""
return pn_messenger_stopped(self._mng)
def subscribe(self, source):
"""
Subscribes the L{Messenger} to messages originating from the
specified source. The source is an address as specified in the
L{Messenger} introduction with the following addition. If the
domain portion of the address begins with the '~' character, the
L{Messenger} will interpret the domain as host/port, bind to it,
and listen for incoming messages. For example "~0.0.0.0",
"amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
local interface and listen for incoming messages with the last
variant only permitting incoming SSL connections.
@type source: string
@param source: the source of messages to subscribe to
"""
sub_impl = pn_messenger_subscribe(self._mng, source)
if not sub_impl:
self._check(pn_error_code(pn_messenger_error(self._mng)))
raise MessengerException("Cannot subscribe to %s"%source)
return Subscription(sub_impl)
def put(self, message):
"""
Places the content contained in the message onto the outgoing
queue of the L{Messenger}. This method will never block, however
it will send any unblocked L{Messages<Message>} in the outgoing
queue immediately and leave any blocked L{Messages<Message>}
remaining in the outgoing queue. The L{send} call may be used to
block until the outgoing queue is empty. The L{outgoing} property
may be used to check the depth of the outgoing queue.
When the content in a given L{Message} object is copied to the outgoing
message queue, you may then modify or discard the L{Message} object
without having any impact on the content in the outgoing queue.
This method returns an outgoing tracker for the L{Message}. The tracker
can be used to determine the delivery status of the L{Message}.
@type message: Message
@param message: the message to place in the outgoing queue
@return: a tracker
"""
message._pre_encode()
self._check(pn_messenger_put(self._mng, message._msg))
return pn_messenger_outgoing_tracker(self._mng)
def status(self, tracker):
"""
Gets the last known remote state of the delivery associated with
the given tracker.
@type tracker: tracker
@param tracker: the tracker whose status is to be retrieved
@return: one of None, PENDING, REJECTED, or ACCEPTED
"""
disp = pn_messenger_status(self._mng, tracker);
return STATUSES.get(disp, disp)
def buffered(self, tracker):
"""
Checks if the delivery associated with the given tracker is still
waiting to be sent.
@type tracker: tracker
@param tracker: the tracker whose status is to be retrieved
@return: true if delivery is still buffered
"""
return pn_messenger_buffered(self._mng, tracker);
def settle(self, tracker=None):
"""
Frees a L{Messenger} from tracking the status associated with a given
tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up
to the most recent will be settled.
"""
if tracker is None:
tracker = pn_messenger_outgoing_tracker(self._mng)
flags = PN_CUMULATIVE
else:
flags = 0
self._check(pn_messenger_settle(self._mng, tracker, flags))
def send(self, n=-1):
"""
This call will block until the indicated number of L{messages<Message>}
have been sent, or until the operation times out. If n is -1 this call will
block until all outgoing L{messages<Message>} have been sent. If n is 0 then
this call will send whatever it can without blocking.
"""
self._check(pn_messenger_send(self._mng, n))
def recv(self, n=None):
"""
Receives up to I{n} L{messages<Message>} into the incoming queue. If no value
for I{n} is supplied, this call will receive as many L{messages<Message>} as it
can buffer internally. If the L{Messenger} is in blocking mode, this
call will block until at least one L{Message} is available in the
incoming queue.
"""
if n is None:
n = -1
self._check(pn_messenger_recv(self._mng, n))
def work(self, timeout=None):
"""
Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}.
This will block for the indicated timeout.
This method may also do I/O work other than sending and receiving
L{messages<Message>}. For example, closing connections after messenger.L{stop}()
has been called.
"""
if timeout is None:
t = -1
else:
t = secs2millis(timeout)
err = pn_messenger_work(self._mng, t)
if (err == PN_TIMEOUT):
return False
else:
self._check(err)
return True
@property
def receiving(self):
return pn_messenger_receiving(self._mng)
def interrupt(self):
"""
The L{Messenger} interface is single-threaded.
This is the only L{Messenger} function intended to be called
from outside of the L{Messenger} thread.
Call this from a non-messenger thread to interrupt
a L{Messenger} that is blocking.
This will cause any in-progress blocking call to throw
the L{Interrupt} exception. If there is no currently blocking
call, then the next blocking call will be affected, even if it
is within the same thread that interrupt was called from.
"""
self._check(pn_messenger_interrupt(self._mng))
def get(self, message=None):
"""
Moves the message from the head of the incoming message queue into
the supplied message object. Any content in the message will be
overwritten.
A tracker for the incoming L{Message} is returned. The tracker can
later be used to communicate your acceptance or rejection of the
L{Message}.
If None is passed in for the L{Message} object, the L{Message}
popped from the head of the queue is discarded.
@type message: Message
@param message: the destination message object
@return: a tracker
"""
if message is None:
impl = None
else:
impl = message._msg
self._check(pn_messenger_get(self._mng, impl))
if message is not None:
message._post_decode()
return pn_messenger_incoming_tracker(self._mng)
def accept(self, tracker=None):
"""
Signal the sender that you have acted on the L{Message}
pointed to by the tracker. If no tracker is supplied,
then all messages that have been returned by the L{get}
method are accepted, except those that have already been
auto-settled by passing beyond your incoming window size.
@type tracker: tracker
@param tracker: a tracker as returned by get
"""
if tracker is None:
tracker = pn_messenger_incoming_tracker(self._mng)
flags = PN_CUMULATIVE
else:
flags = 0
self._check(pn_messenger_accept(self._mng, tracker, flags))
def reject(self, tracker=None):
"""
Rejects the L{Message} indicated by the tracker. If no tracker
is supplied, all messages that have been returned by the L{get}
method are rejected, except those that have already been auto-settled
by passing beyond your outgoing window size.
@type tracker: tracker
@param tracker: a tracker as returned by get
"""
if tracker is None:
tracker = pn_messenger_incoming_tracker(self._mng)
flags = PN_CUMULATIVE
else:
flags = 0
self._check(pn_messenger_reject(self._mng, tracker, flags))
@property
def outgoing(self):
"""
The outgoing queue depth.
"""
return pn_messenger_outgoing(self._mng)
@property
def incoming(self):
"""
The incoming queue depth.
"""
return pn_messenger_incoming(self._mng)
def route(self, pattern, address):
"""
Adds a routing rule to a L{Messenger's<Messenger>} internal routing table.
The route procedure may be used to influence how a L{Messenger} will
internally treat a given address or class of addresses. Every call
to the route procedure will result in L{Messenger} appending a routing
rule to its internal routing table.
Whenever a L{Message} is presented to a L{Messenger} for delivery, it
will match the address of this message against the set of routing
rules in order. The first rule to match will be triggered, and
instead of routing based on the address presented in the message,
the L{Messenger} will route based on the address supplied in the rule.
The pattern matching syntax supports two types of matches, a '%'
will match any character except a '/', and a '*' will match any
character including a '/'.
A routing address is specified as a normal AMQP address, however it
may additionally use substitution variables from the pattern match
that triggered the rule.
Any message sent to "foo" will be routed to "amqp://foo.com":
>>> messenger.route("foo", "amqp://foo.com");
Any message sent to "foobar" will be routed to
"amqp://foo.com/bar":
>>> messenger.route("foobar", "amqp://foo.com/bar");
Any message sent to bar/<path> will be routed to the corresponding
path within the amqp://bar.com domain:
>>> messenger.route("bar/*", "amqp://bar.com/$1");
Route all L{messages<Message>} over TLS:
>>> messenger.route("amqp:*", "amqps:$1")
Supply credentials for foo.com:
>>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
Supply credentials for all domains:
>>> messenger.route("amqp://*", "amqp://user:password@$1");
Route all addresses through a single proxy while preserving the
original destination:
>>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
Route any address through a single broker:
>>> messenger.route("*", "amqp://user:password@broker/$1");
"""
self._check(pn_messenger_route(self._mng, pattern, address))
def rewrite(self, pattern, address):
"""
Similar to route(), except that the destination of
the L{Message} is determined before the message address is rewritten.
The outgoing address is only rewritten after routing has been
finalized. If a message has an outgoing address of
"amqp://0.0.0.0:5678", and a rewriting rule that changes its
outgoing address to "foo", it will still arrive at the peer that
is listening on "amqp://0.0.0.0:5678", but when it arrives there,
the receiver will see its outgoing address as "foo".
The default rewrite rule removes username and password from addresses
before they are transmitted.
"""
self._check(pn_messenger_rewrite(self._mng, pattern, address))
def selectable(self):
impl = pn_messenger_selectable(self._mng)
if impl:
fd = pn_selectable_fd(impl)
sel = self._selectables.get(fd, None)
if sel is None:
sel = Selectable(self, impl)
self._selectables[fd] = sel
return sel
else:
return None
@property
def deadline(self):
tstamp = pn_messenger_deadline(self._mng)
if tstamp:
return millis2secs(tstamp)
else:
return None
class Message(object):
"""The L{Message} class is a mutable holder of message content.
@ivar instructions: delivery instructions for the message
@type instructions: dict
@ivar annotations: infrastructure defined message annotations
@type annotations: dict
@ivar properties: application defined message properties
@type properties: dict
@ivar body: message body
@type body: bytes | unicode | dict | list | int | long | float | UUID
"""
DATA = PN_DATA
TEXT = PN_TEXT
AMQP = PN_AMQP
JSON = PN_JSON
DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
def __init__(self, **kwargs):
"""
@param kwargs: Message property name/value pairs to initialise the Message
"""
self._msg = pn_message()
self._id = Data(pn_message_id(self._msg))
self._correlation_id = Data(pn_message_correlation_id(self._msg))
self.instructions = None
self.annotations = None
self.properties = None
self.body = None
for k,v in kwargs.iteritems():
getattr(self, k) # Raise exception if it's not a valid attribute.
setattr(self, k, v)
def __del__(self):
if hasattr(self, "_msg"):
pn_message_free(self._msg)
del self._msg
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, MessageException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
else:
return err
def _pre_encode(self):
inst = Data(pn_message_instructions(self._msg))
ann = Data(pn_message_annotations(self._msg))
props = Data(pn_message_properties(self._msg))
body = Data(pn_message_body(self._msg))
inst.clear()
if self.instructions is not None:
inst.put_object(self.instructions)
ann.clear()
if self.annotations is not None:
ann.put_object(self.annotations)
props.clear()
if self.properties is not None:
props.put_object(self.properties)
body.clear()
if self.body is not None:
body.put_object(self.body)
def _post_decode(self):
inst = Data(pn_message_instructions(self._msg))
ann = Data(pn_message_annotations(self._msg))
props = Data(pn_message_properties(self._msg))
body = Data(pn_message_body(self._msg))
if inst.next():
self.instructions = inst.get_object()
else:
self.instructions = None
if ann.next():
self.annotations = ann.get_object()
else:
self.annotations = None
if props.next():
self.properties = props.get_object()
else:
self.properties = None
if body.next():
self.body = body.get_object()
else:
self.body = None
def clear(self):
"""
Clears the contents of the L{Message}. All fields will be reset to
their default values.
"""
pn_message_clear(self._msg)
self.instructions = None
self.annotations = None
self.properties = None
self.body = None
def _is_inferred(self):
return pn_message_is_inferred(self._msg)
def _set_inferred(self, value):
self._check(pn_message_set_inferred(self._msg, bool(value)))
inferred = property(_is_inferred, _set_inferred, doc="""
The inferred flag for a message indicates how the message content
is encoded into AMQP sections. If inferred is true then binary and
list values in the body of the message will be encoded as AMQP DATA
and AMQP SEQUENCE sections, respectively. If inferred is false,
then all values in the body of the message will be encoded as AMQP
VALUE sections regardless of their type.
""")
def _is_durable(self):
return pn_message_is_durable(self._msg)
def _set_durable(self, value):
self._check(pn_message_set_durable(self._msg, bool(value)))
durable = property(_is_durable, _set_durable,
doc="""
The durable property indicates that the message should be held durably
by any intermediaries taking responsibility for the message.
""")
def _get_priority(self):
return pn_message_get_priority(self._msg)
def _set_priority(self, value):
self._check(pn_message_set_priority(self._msg, value))
priority = property(_get_priority, _set_priority,
doc="""
The priority of the message.
""")
def _get_ttl(self):
return millis2secs(pn_message_get_ttl(self._msg))
def _set_ttl(self, value):
self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
ttl = property(_get_ttl, _set_ttl,
doc="""
The time to live of the message measured in seconds. Expired messages
may be dropped.
""")
def _is_first_acquirer(self):
return pn_message_is_first_acquirer(self._msg)
def _set_first_acquirer(self, value):
self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
doc="""
True iff the recipient is the first to acquire the message.
""")
def _get_delivery_count(self):
return pn_message_get_delivery_count(self._msg)
def _set_delivery_count(self, value):
self._check(pn_message_set_delivery_count(self._msg, value))
delivery_count = property(_get_delivery_count, _set_delivery_count,
doc="""
The number of delivery attempts made for this message.
""")
def _get_id(self):
return self._id.get_object()
def _set_id(self, value):
if type(value) in (int, long):
value = ulong(value)
self._id.rewind()
self._id.put_object(value)
id = property(_get_id, _set_id,
doc="""
The id of the message.
""")
def _get_user_id(self):
return pn_message_get_user_id(self._msg)
def _set_user_id(self, value):
self._check(pn_message_set_user_id(self._msg, value))
user_id = property(_get_user_id, _set_user_id,
doc="""
The user id of the message creator.
""")
def _get_address(self):
return pn_message_get_address(self._msg)
def _set_address(self, value):
self._check(pn_message_set_address(self._msg, value))
address = property(_get_address, _set_address,
doc="""
The address of the message.
""")
def _get_subject(self):
return pn_message_get_subject(self._msg)
def _set_subject(self, value):
self._check(pn_message_set_subject(self._msg, value))
subject = property(_get_subject, _set_subject,
doc="""
The subject of the message.
""")
def _get_reply_to(self):
return pn_message_get_reply_to(self._msg)
def _set_reply_to(self, value):
self._check(pn_message_set_reply_to(self._msg, value))
reply_to = property(_get_reply_to, _set_reply_to,
doc="""
The reply-to address for the message.
""")
def _get_correlation_id(self):
return self._correlation_id.get_object()
def _set_correlation_id(self, value):
if type(value) in (int, long):
value = ulong(value)
self._correlation_id.rewind()
self._correlation_id.put_object(value)
correlation_id = property(_get_correlation_id, _set_correlation_id,
doc="""
The correlation-id for the message.
""")
def _get_content_type(self):
return pn_message_get_content_type(self._msg)
def _set_content_type(self, value):
self._check(pn_message_set_content_type(self._msg, value))
content_type = property(_get_content_type, _set_content_type,
doc="""
The content-type of the message.
""")
def _get_content_encoding(self):
return pn_message_get_content_encoding(self._msg)
def _set_content_encoding(self, value):
self._check(pn_message_set_content_encoding(self._msg, value))
content_encoding = property(_get_content_encoding, _set_content_encoding,
doc="""
The content-encoding of the message.
""")
def _get_expiry_time(self):
return millis2secs(pn_message_get_expiry_time(self._msg))
def _set_expiry_time(self, value):
self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
expiry_time = property(_get_expiry_time, _set_expiry_time,
doc="""
The expiry time of the message.
""")
def _get_creation_time(self):
return millis2secs(pn_message_get_creation_time(self._msg))
def _set_creation_time(self, value):
self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
creation_time = property(_get_creation_time, _set_creation_time,
doc="""
The creation time of the message.
""")
def _get_group_id(self):
return pn_message_get_group_id(self._msg)
def _set_group_id(self, value):
self._check(pn_message_set_group_id(self._msg, value))
group_id = property(_get_group_id, _set_group_id,
doc="""
The group id of the message.
""")
def _get_group_sequence(self):
return pn_message_get_group_sequence(self._msg)
def _set_group_sequence(self, value):
self._check(pn_message_set_group_sequence(self._msg, value))
group_sequence = property(_get_group_sequence, _set_group_sequence,
doc="""
The sequence of the message within its group.
""")
def _get_reply_to_group_id(self):
return pn_message_get_reply_to_group_id(self._msg)
def _set_reply_to_group_id(self, value):
self._check(pn_message_set_reply_to_group_id(self._msg, value))
reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
doc="""
The group-id for any replies.
""")
# XXX
def _get_format(self):
return pn_message_get_format(self._msg)
def _set_format(self, value):
self._check(pn_message_set_format(self._msg, value))
format = property(_get_format, _set_format,
doc="""
The format of the message.
""")
def encode(self):
self._pre_encode()
sz = 16
while True:
err, data = pn_message_encode(self._msg, sz)
if err == PN_OVERFLOW:
sz *= 2
continue
else:
self._check(err)
return data
def decode(self, data):
self._check(pn_message_decode(self._msg, data, len(data)))
self._post_decode()
def load(self, data):
self._check(pn_message_load(self._msg, data))
def save(self):
sz = 16
while True:
err, data = pn_message_save(self._msg, sz)
if err == PN_OVERFLOW:
sz *= 2
continue
else:
self._check(err)
return data
def __repr2__(self):
props = []
for attr in ("inferred", "address", "reply_to", "durable", "ttl",
"priority", "first_acquirer", "delivery_count", "id",
"correlation_id", "user_id", "group_id", "group_sequence",
"reply_to_group_id", "instructions", "annotations",
"properties", "body"):
value = getattr(self, attr)
if value: props.append("%s=%r" % (attr, value))
return "Message(%s)" % ", ".join(props)
def __repr__(self):
tmp = pn_string(None)
err = pn_inspect(self._msg, tmp)
result = pn_string_get(tmp)
pn_free(tmp)
self._check(err)
return result
class Subscription(object):
def __init__(self, impl):
self._impl = impl
@property
def address(self):
return pn_subscription_address(self._impl)
class Selectable(object):
def __init__(self, messenger, impl):
self.messenger = messenger
self._impl = impl
def fileno(self):
if not self._impl: raise ValueError("selectable freed")
return pn_selectable_fd(self._impl)
@property
def capacity(self):
if not self._impl: raise ValueError("selectable freed")
return pn_selectable_capacity(self._impl)
@property
def pending(self):
if not self._impl: raise ValueError("selectable freed")
return pn_selectable_pending(self._impl)
@property
def deadline(self):
if not self._impl: raise ValueError("selectable freed")
tstamp = pn_selectable_deadline(self._impl)
if tstamp:
return millis2secs(tstamp)
else:
return None
def readable(self):
if not self._impl: raise ValueError("selectable freed")
pn_selectable_readable(self._impl)
def writable(self):
if not self._impl: raise ValueError("selectable freed")
pn_selectable_writable(self._impl)
def expired(self):
if not self._impl: raise ValueError("selectable freed")
pn_selectable_expired(self._impl)
def _is_registered(self):
if not self._impl: raise ValueError("selectable freed")
return pn_selectable_is_registered(self._impl)
def _set_registered(self, registered):
if not self._impl: raise ValueError("selectable freed")
pn_selectable_set_registered(self._impl, registered)
registered = property(_is_registered, _set_registered,
doc="""
The registered property may be get/set by an I/O polling system to
indicate whether the fd has been registered or not.
""")
@property
def is_terminal(self):
if not self._impl: return True
return pn_selectable_is_terminal(self._impl)
def free(self):
if self._impl:
del self.messenger._selectables[self.fileno()]
pn_selectable_free(self._impl)
self._impl = None
def __del__(self):
self.free()
class DataException(ProtonException):
"""
The DataException class is the root of the Data exception hierarchy.
All exceptions raised by the Data class extend this exception.
"""
pass
class UnmappedType:
def __init__(self, msg):
self.msg = msg
def __repr__(self):
return "UnmappedType(%s)" % self.msg
class ulong(long):
def __repr__(self):
return "ulong(%s)" % long.__repr__(self)
class timestamp(long):
def __repr__(self):
return "timestamp(%s)" % long.__repr__(self)
class symbol(unicode):
def __repr__(self):
return "symbol(%s)" % unicode.__repr__(self)
class char(unicode):
def __repr__(self):
return "char(%s)" % unicode.__repr__(self)
class Described(object):
def __init__(self, descriptor, value):
self.descriptor = descriptor
self.value = value
def __repr__(self):
return "Described(%r, %r)" % (self.descriptor, self.value)
def __eq__(self, o):
if isinstance(o, Described):
return self.descriptor == o.descriptor and self.value == o.value
else:
return False
UNDESCRIBED = Constant("UNDESCRIBED")
class Array(object):
def __init__(self, descriptor, type, *elements):
self.descriptor = descriptor
self.type = type
self.elements = elements
def __repr__(self):
if self.elements:
els = ", %s" % (", ".join(map(repr, self.elements)))
else:
els = ""
return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
def __eq__(self, o):
if isinstance(o, Array):
return self.descriptor == o.descriptor and \
self.type == o.type and self.elements == o.elements
else:
return False
class Data:
"""
The L{Data} class provides an interface for decoding, extracting,
creating, and encoding arbitrary AMQP data. A L{Data} object
contains a tree of AMQP values. Leaf nodes in this tree correspond
to scalars in the AMQP type system such as L{ints<INT>} or
L{strings<STRING>}. Non-leaf nodes in this tree correspond to
compound values in the AMQP type system such as L{lists<LIST>},
L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
The root node of the tree is the L{Data} object itself and can have
an arbitrary number of children.
A L{Data} object maintains the notion of the current sibling node
and a current parent node. Siblings are ordered within their parent.
Values are accessed and/or added by using the L{next}, L{prev},
L{enter}, and L{exit} methods to navigate to the desired location in
the tree and using the supplied variety of put_*/get_* methods to
access or add a value of the desired type.
The put_* methods will always add a value I{after} the current node
in the tree. If the current node has a next sibling the put_* method
will overwrite the value on this node. If there is no current node
or the current node has no next sibling then one will be added. The
put_* methods always set the added/modified node to the current
node. The get_* methods read the value of the current node and do
not change which node is current.
The following types of scalar values are supported:
- L{NULL}
- L{BOOL}
- L{UBYTE}
- L{USHORT}
- L{SHORT}
- L{UINT}
- L{INT}
- L{ULONG}
- L{LONG}
- L{FLOAT}
- L{DOUBLE}
- L{BINARY}
- L{STRING}
- L{SYMBOL}
The following types of compound values are supported:
- L{DESCRIBED}
- L{ARRAY}
- L{LIST}
- L{MAP}
"""
NULL = PN_NULL; "A null value."
BOOL = PN_BOOL; "A boolean value."
UBYTE = PN_UBYTE; "An unsigned byte value."
BYTE = PN_BYTE; "A signed byte value."
USHORT = PN_USHORT; "An unsigned short value."
SHORT = PN_SHORT; "A short value."
UINT = PN_UINT; "An unsigned int value."
INT = PN_INT; "A signed int value."
CHAR = PN_CHAR; "A character value."
ULONG = PN_ULONG; "An unsigned long value."
LONG = PN_LONG; "A signed long value."
TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
FLOAT = PN_FLOAT; "A float value."
DOUBLE = PN_DOUBLE; "A double value."
DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
UUID = PN_UUID; "A UUID value."
BINARY = PN_BINARY; "A binary string."
STRING = PN_STRING; "A unicode string."
SYMBOL = PN_SYMBOL; "A symbolic string."
DESCRIBED = PN_DESCRIBED; "A described value."
ARRAY = PN_ARRAY; "An array value."
LIST = PN_LIST; "A list value."
MAP = PN_MAP; "A map value."
type_names = {
NULL: "null",
BOOL: "bool",
BYTE: "byte",
UBYTE: "ubyte",
SHORT: "short",
USHORT: "ushort",
INT: "int",
UINT: "uint",
CHAR: "char",
LONG: "long",
ULONG: "ulong",
TIMESTAMP: "timestamp",
FLOAT: "float",
DOUBLE: "double",
DECIMAL32: "decimal32",
DECIMAL64: "decimal64",
DECIMAL128: "decimal128",
UUID: "uuid",
BINARY: "binary",
STRING: "string",
SYMBOL: "symbol",
DESCRIBED: "described",
ARRAY: "array",
LIST: "list",
MAP: "map"
}
@classmethod
def type_name(type): return Data.type_names[type]
def __init__(self, capacity=16):
if type(capacity) in (int, long):
self._data = pn_data(capacity)
self._free = True
else:
self._data = capacity
self._free = False
def __del__(self):
if self._free and hasattr(self, "_data"):
pn_data_free(self._data)
del self._data
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, DataException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
else:
return err
def clear(self):
"""
Clears the data object.
"""
pn_data_clear(self._data)
def rewind(self):
"""
Clears current node and sets the parent to the root node. Clearing the
current node sets it _before_ the first node, calling next() will advance to
the first node.
"""
assert self._data is not None
pn_data_rewind(self._data)
def next(self):
"""
Advances the current node to its next sibling and returns its
type. If there is no next sibling the current node remains
unchanged and None is returned.
"""
found = pn_data_next(self._data)
if found:
return self.type()
else:
return None
def prev(self):
"""
Advances the current node to its previous sibling and returns its
type. If there is no previous sibling the current node remains
unchanged and None is returned.
"""
found = pn_data_prev(self._data)
if found:
return self.type()
else:
return None
def enter(self):
"""
Sets the parent node to the current node and clears the current node.
Clearing the current node sets it _before_ the first child,
call next() advances to the first child.
"""
return pn_data_enter(self._data)
def exit(self):
"""
Sets the current node to the parent node and the parent node to
its own parent.
"""
return pn_data_exit(self._data)
def lookup(self, name):
return pn_data_lookup(self._data, name)
def narrow(self):
pn_data_narrow(self._data)
def widen(self):
pn_data_widen(self._data)
def type(self):
"""
Returns the type of the current node.
"""
dtype = pn_data_type(self._data)
if dtype == -1:
return None
else:
return dtype
def encode(self):
"""
Returns a representation of the data encoded in AMQP format.
"""
size = 1024
while True:
cd, enc = pn_data_encode(self._data, size)
if cd == PN_OVERFLOW:
size *= 2
elif cd >= 0:
return enc
else:
self._check(cd)
def decode(self, encoded):
"""
Decodes the first value from supplied AMQP data and returns the
number of bytes consumed.
@type encoded: binary
@param encoded: AMQP encoded binary data
"""
return self._check(pn_data_decode(self._data, encoded))
def put_list(self):
"""
Puts a list value. Elements may be filled by entering the list
node and putting element values.
>>> data = Data()
>>> data.put_list()
>>> data.enter()
>>> data.put_int(1)
>>> data.put_int(2)
>>> data.put_int(3)
>>> data.exit()
"""
self._check(pn_data_put_list(self._data))
def put_map(self):
"""
Puts a map value. Elements may be filled by entering the map node
and putting alternating key value pairs.
>>> data = Data()
>>> data.put_map()
>>> data.enter()
>>> data.put_string("key")
>>> data.put_string("value")
>>> data.exit()
"""
self._check(pn_data_put_map(self._data))
def put_array(self, described, element_type):
"""
Puts an array value. Elements may be filled by entering the array
node and putting the element values. The values must all be of the
specified array element type. If an array is described then the
first child value of the array is the descriptor and may be of any
type.
>>> data = Data()
>>>
>>> data.put_array(False, Data.INT)
>>> data.enter()
>>> data.put_int(1)
>>> data.put_int(2)
>>> data.put_int(3)
>>> data.exit()
>>>
>>> data.put_array(True, Data.DOUBLE)
>>> data.enter()
>>> data.put_symbol("array-descriptor")
>>> data.put_double(1.1)
>>> data.put_double(1.2)
>>> data.put_double(1.3)
>>> data.exit()
@type described: bool
@param described: specifies whether the array is described
@type element_type: int
@param element_type: the type of the array elements
"""
self._check(pn_data_put_array(self._data, described, element_type))
def put_described(self):
"""
Puts a described value. A described node has two children, the
descriptor and the value. These are specified by entering the node
and putting the desired values.
>>> data = Data()
>>> data.put_described()
>>> data.enter()
>>> data.put_symbol("value-descriptor")
>>> data.put_string("the value")
>>> data.exit()
"""
self._check(pn_data_put_described(self._data))
def put_null(self):
"""
Puts a null value.
"""
self._check(pn_data_put_null(self._data))
def put_bool(self, b):
"""
Puts a boolean value.
@param b: a boolean value
"""
self._check(pn_data_put_bool(self._data, b))
def put_ubyte(self, ub):
"""
Puts an unsigned byte value.
@param ub: an integral value
"""
self._check(pn_data_put_ubyte(self._data, ub))
def put_byte(self, b):
"""
Puts a signed byte value.
@param b: an integral value
"""
self._check(pn_data_put_byte(self._data, b))
def put_ushort(self, us):
"""
Puts an unsigned short value.
@param us: an integral value.
"""
self._check(pn_data_put_ushort(self._data, us))
def put_short(self, s):
"""
Puts a signed short value.
@param s: an integral value
"""
self._check(pn_data_put_short(self._data, s))
def put_uint(self, ui):
"""
Puts an unsigned int value.
@param ui: an integral value
"""
self._check(pn_data_put_uint(self._data, ui))
def put_int(self, i):
"""
Puts a signed int value.
@param i: an integral value
"""
self._check(pn_data_put_int(self._data, i))
def put_char(self, c):
"""
Puts a char value.
@param c: a single character
"""
self._check(pn_data_put_char(self._data, ord(c)))
def put_ulong(self, ul):
"""
Puts an unsigned long value.
@param ul: an integral value
"""
self._check(pn_data_put_ulong(self._data, ul))
def put_long(self, l):
"""
Puts a signed long value.
@param l: an integral value
"""
self._check(pn_data_put_long(self._data, l))
def put_timestamp(self, t):
"""
Puts a timestamp value.
@param t: an integral value
"""
self._check(pn_data_put_timestamp(self._data, t))
def put_float(self, f):
"""
Puts a float value.
@param f: a floating point value
"""
self._check(pn_data_put_float(self._data, f))
def put_double(self, d):
"""
Puts a double value.
@param d: a floating point value.
"""
self._check(pn_data_put_double(self._data, d))
def put_decimal32(self, d):
"""
Puts a decimal32 value.
@param d: a decimal32 value
"""
self._check(pn_data_put_decimal32(self._data, d))
def put_decimal64(self, d):
"""
Puts a decimal64 value.
@param d: a decimal64 value
"""
self._check(pn_data_put_decimal64(self._data, d))
def put_decimal128(self, d):
"""
Puts a decimal128 value.
@param d: a decimal128 value
"""
self._check(pn_data_put_decimal128(self._data, d))
def put_uuid(self, u):
"""
Puts a UUID value.
@param u: a uuid value
"""
self._check(pn_data_put_uuid(self._data, u.bytes))
def put_binary(self, b):
"""
Puts a binary value.
@type b: binary
@param b: a binary value
"""
self._check(pn_data_put_binary(self._data, b))
def put_string(self, s):
"""
Puts a unicode value.
@type s: unicode
@param s: a unicode value
"""
self._check(pn_data_put_string(self._data, s.encode("utf8")))
def put_symbol(self, s):
"""
Puts a symbolic value.
@type s: string
@param s: the symbol name
"""
self._check(pn_data_put_symbol(self._data, s))
def get_list(self):
"""
If the current node is a list, return the number of elements,
otherwise return zero. List elements can be accessed by entering
the list.
>>> count = data.get_list()
>>> data.enter()
>>> for i in range(count):
... type = data.next()
... if type == Data.STRING:
... print data.get_string()
... elif type == ...:
... ...
>>> data.exit()
"""
return pn_data_get_list(self._data)
def get_map(self):
"""
If the current node is a map, return the number of child elements,
otherwise return zero. Key value pairs can be accessed by entering
the map.
>>> count = data.get_map()
>>> data.enter()
>>> for i in range(count/2):
... type = data.next()
... if type == Data.STRING:
... print data.get_string()
... elif type == ...:
... ...
>>> data.exit()
"""
return pn_data_get_map(self._data)
def get_array(self):
"""
If the current node is an array, return a tuple of the element
count, a boolean indicating whether the array is described, and
the type of each element, otherwise return (0, False, None). Array
data can be accessed by entering the array.
>>> # read an array of strings with a symbolic descriptor
>>> count, described, type = data.get_array()
>>> data.enter()
>>> data.next()
>>> print "Descriptor:", data.get_symbol()
>>> for i in range(count):
... data.next()
... print "Element:", data.get_string()
>>> data.exit()
"""
count = pn_data_get_array(self._data)
described = pn_data_is_array_described(self._data)
type = pn_data_get_array_type(self._data)
if type == -1:
type = None
return count, described, type
def is_described(self):
"""
Checks if the current node is a described value. The descriptor
and value may be accessed by entering the described value.
>>> # read a symbolically described string
>>> assert data.is_described() # will error if the current node is not described
>>> data.enter()
>>> print data.get_symbol()
>>> print data.get_string()
>>> data.exit()
"""
return pn_data_is_described(self._data)
def is_null(self):
"""
Checks if the current node is a null.
"""
return pn_data_is_null(self._data)
def get_bool(self):
"""
If the current node is a boolean, returns its value, returns False
otherwise.
"""
return pn_data_get_bool(self._data)
def get_ubyte(self):
"""
If the current node is an unsigned byte, returns its value,
returns 0 otherwise.
"""
return pn_data_get_ubyte(self._data)
def get_byte(self):
"""
If the current node is a signed byte, returns its value, returns 0
otherwise.
"""
return pn_data_get_byte(self._data)
def get_ushort(self):
"""
If the current node is an unsigned short, returns its value,
returns 0 otherwise.
"""
return pn_data_get_ushort(self._data)
def get_short(self):
"""
If the current node is a signed short, returns its value, returns
0 otherwise.
"""
return pn_data_get_short(self._data)
def get_uint(self):
"""
If the current node is an unsigned int, returns its value, returns
0 otherwise.
"""
return pn_data_get_uint(self._data)
def get_int(self):
"""
If the current node is a signed int, returns its value, returns 0
otherwise.
"""
return pn_data_get_int(self._data)
def get_char(self):
"""
If the current node is a char, returns its value, returns 0
otherwise.
"""
return char(unichr(pn_data_get_char(self._data)))
def get_ulong(self):
"""
If the current node is an unsigned long, returns its value,
returns 0 otherwise.
"""
return ulong(pn_data_get_ulong(self._data))
def get_long(self):
"""
If the current node is an signed long, returns its value, returns
0 otherwise.
"""
return pn_data_get_long(self._data)
def get_timestamp(self):
"""
If the current node is a timestamp, returns its value, returns 0
otherwise.
"""
return timestamp(pn_data_get_timestamp(self._data))
def get_float(self):
"""
If the current node is a float, returns its value, raises 0
otherwise.
"""
return pn_data_get_float(self._data)
def get_double(self):
"""
If the current node is a double, returns its value, returns 0
otherwise.
"""
return pn_data_get_double(self._data)
# XXX: need to convert
def get_decimal32(self):
"""
If the current node is a decimal32, returns its value, returns 0
otherwise.
"""
return pn_data_get_decimal32(self._data)
# XXX: need to convert
def get_decimal64(self):
"""
If the current node is a decimal64, returns its value, returns 0
otherwise.
"""
return pn_data_get_decimal64(self._data)
# XXX: need to convert
def get_decimal128(self):
"""
If the current node is a decimal128, returns its value, returns 0
otherwise.
"""
return pn_data_get_decimal128(self._data)
def get_uuid(self):
"""
If the current node is a UUID, returns its value, returns None
otherwise.
"""
if pn_data_type(self._data) == Data.UUID:
return uuid.UUID(bytes=pn_data_get_uuid(self._data))
else:
return None
def get_binary(self):
"""
If the current node is binary, returns its value, returns ""
otherwise.
"""
return pn_data_get_binary(self._data)
def get_string(self):
"""
If the current node is a string, returns its value, returns ""
otherwise.
"""
return pn_data_get_string(self._data).decode("utf8")
def get_symbol(self):
"""
If the current node is a symbol, returns its value, returns ""
otherwise.
"""
return symbol(pn_data_get_symbol(self._data))
def copy(self, src):
self._check(pn_data_copy(self._data, src._data))
def format(self):
sz = 16
while True:
err, result = pn_data_format(self._data, sz)
if err == PN_OVERFLOW:
sz *= 2
continue
else:
self._check(err)
return result
def dump(self):
pn_data_dump(self._data)
def put_dict(self, d):
self.put_map()
self.enter()
try:
for k, v in d.items():
self.put_object(k)
self.put_object(v)
finally:
self.exit()
def get_dict(self):
if self.enter():
try:
result = {}
while self.next():
k = self.get_object()
if self.next():
v = self.get_object()
else:
v = None
result[k] = v
finally:
self.exit()
return result
def put_sequence(self, s):
self.put_list()
self.enter()
try:
for o in s:
self.put_object(o)
finally:
self.exit()
def get_sequence(self):
if self.enter():
try:
result = []
while self.next():
result.append(self.get_object())
finally:
self.exit()
return result
def get_py_described(self):
if self.enter():
try:
self.next()
descriptor = self.get_object()
self.next()
value = self.get_object()
finally:
self.exit()
return Described(descriptor, value)
def put_py_described(self, d):
self.put_described()
self.enter()
try:
self.put_object(d.descriptor)
self.put_object(d.value)
finally:
self.exit()
def get_py_array(self):
"""
If the current node is an array, return an Array object
representing the array and its contents. Otherwise return None.
This is a convenience wrapper around get_array, enter, etc.
"""
count, described, type = self.get_array()
if type is None: return None
if self.enter():
try:
if described:
self.next()
descriptor = self.get_object()
else:
descriptor = UNDESCRIBED
elements = []
while self.next():
elements.append(self.get_object())
finally:
self.exit()
return Array(descriptor, type, *elements)
def put_py_array(self, a):
described = a.descriptor != UNDESCRIBED
self.put_array(described, a.type)
self.enter()
try:
if described:
self.put_object(a.descriptor)
for e in a.elements:
self.put_object(e)
finally:
self.exit()
put_mappings = {
None.__class__: lambda s, _: s.put_null(),
bool: put_bool,
dict: put_dict,
list: put_sequence,
tuple: put_sequence,
unicode: put_string,
bytes: put_binary,
symbol: put_symbol,
int: put_long,
char: put_char,
long: put_long,
ulong: put_ulong,
timestamp: put_timestamp,
float: put_double,
uuid.UUID: put_uuid,
Described: put_py_described,
Array: put_py_array
}
get_mappings = {
NULL: lambda s: None,
BOOL: get_bool,
BYTE: get_byte,
UBYTE: get_ubyte,
SHORT: get_short,
USHORT: get_ushort,
INT: get_int,
UINT: get_uint,
CHAR: get_char,
LONG: get_long,
ULONG: get_ulong,
TIMESTAMP: get_timestamp,
FLOAT: get_float,
DOUBLE: get_double,
DECIMAL32: get_decimal32,
DECIMAL64: get_decimal64,
DECIMAL128: get_decimal128,
UUID: get_uuid,
BINARY: get_binary,
STRING: get_string,
SYMBOL: get_symbol,
DESCRIBED: get_py_described,
ARRAY: get_py_array,
LIST: get_sequence,
MAP: get_dict
}
def put_object(self, obj):
putter = self.put_mappings[obj.__class__]
putter(self, obj)
def get_object(self):
type = self.type()
if type is None: return None
getter = self.get_mappings.get(type)
if getter:
return getter(self)
else:
return UnmappedType(str(type))
class ConnectionException(ProtonException):
pass
class Endpoint(object):
LOCAL_UNINIT = PN_LOCAL_UNINIT
REMOTE_UNINIT = PN_REMOTE_UNINIT
LOCAL_ACTIVE = PN_LOCAL_ACTIVE
REMOTE_ACTIVE = PN_REMOTE_ACTIVE
LOCAL_CLOSED = PN_LOCAL_CLOSED
REMOTE_CLOSED = PN_REMOTE_CLOSED
def __init__(self):
self.condition = None
self._release_invoked = False
def _release(self):
"""Release the underlying C Engine resource."""
if not self._release_invoked:
for c in self._children:
c._release()
self._free_resource()
self.connection._releasing(self)
self._release_invoked = True
def _update_cond(self):
obj2cond(self.condition, self._get_cond_impl())
@property
def remote_condition(self):
return cond2obj(self._get_remote_cond_impl())
# the following must be provided by subclasses
def _get_cond_impl(self):
assert False, "Subclass must override this!"
def _get_remote_cond_impl(self):
assert False, "Subclass must override this!"
class Condition:
def __init__(self, name, description=None, info=None):
self.name = name
self.description = description
self.info = info
def __repr__(self):
return "Condition(%s)" % ", ".join([repr(x) for x in
(self.name, self.description, self.info)
if x])
def __eq__(self, o):
if not isinstance(o, Condition): return False
return self.name == o.name and \
self.description == o.description and \
self.info == o.info
def obj2cond(obj, cond):
pn_condition_clear(cond)
if obj:
pn_condition_set_name(cond, str(obj.name))
pn_condition_set_description(cond, obj.description)
info = Data(pn_condition_info(cond))
if obj.info:
info.put_object(obj.info)
def cond2obj(cond):
if pn_condition_is_set(cond):
return Condition(pn_condition_get_name(cond),
pn_condition_get_description(cond),
dat2obj(pn_condition_info(cond)))
else:
return None
def dat2obj(dimpl):
if dimpl:
d = Data(dimpl)
d.rewind()
d.next()
obj = d.get_object()
d.rewind()
return obj
def obj2dat(obj, dimpl):
if obj is not None:
d = Data(dimpl)
d.put_object(obj)
def secs2millis(secs):
return long(secs*1000)
def millis2secs(millis):
return float(millis)/1000.0
class Connection(Endpoint):
@staticmethod
def _wrap_connection(c_conn):
"""Maintain only a single instance of this class for each Connection
object that exists in the the C Engine. This is done by storing a (weak)
reference to the python instance in the context field of the C object.
"""
if not c_conn: return None
py_conn = pn_void2py(pn_connection_get_context(c_conn))
if py_conn: return py_conn
wrapper = Connection(_conn=c_conn)
return wrapper
def __init__(self, _conn=None):
Endpoint.__init__(self)
if _conn:
self._conn = _conn
else:
self._conn = pn_connection()
pn_connection_set_context(self._conn, pn_py2void(self))
self.offered_capabilities = None
self.desired_capabilities = None
self.properties = None
self._sessions = set()
def __del__(self):
if hasattr(self, "_conn") and self._conn:
self._release()
def free(self):
self._release()
@property
def _children(self):
return self._sessions
@property
def connection(self):
return self
def _free_resource(self):
pn_connection_free(self._conn)
def _released(self):
self._conn = None
def _releasing(self, child):
coll = getattr(self, "_collector", None)
if coll: coll = coll()
if coll:
coll._contexts.add(child)
else:
child._released()
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, ConnectionException)
raise exc("[%s]: %s" % (err, pn_connection_error(self._conn)))
else:
return err
def _get_cond_impl(self):
return pn_connection_condition(self._conn)
def _get_remote_cond_impl(self):
return pn_connection_remote_condition(self._conn)
def collect(self, collector):
if collector is None:
pn_connection_collect(self._conn, None)
else:
pn_connection_collect(self._conn, collector._impl)
self._collector = weakref.ref(collector)
def _get_container(self):
return pn_connection_get_container(self._conn)
def _set_container(self, name):
return pn_connection_set_container(self._conn, name)
container = property(_get_container, _set_container)
def _get_hostname(self):
return pn_connection_get_hostname(self._conn)
def _set_hostname(self, name):
return pn_connection_set_hostname(self._conn, name)
hostname = property(_get_hostname, _set_hostname)
@property
def remote_container(self):
return pn_connection_remote_container(self._conn)
@property
def remote_hostname(self):
return pn_connection_remote_hostname(self._conn)
@property
def remote_offered_capabilities(self):
return dat2obj(pn_connection_remote_offered_capabilities(self._conn))
@property
def remote_desired_capabilities(self):
return dat2obj(pn_connection_remote_desired_capabilities(self._conn))
@property
def remote_properties(self):
return dat2obj(pn_connection_remote_properties(self._conn))
def open(self):
obj2dat(self.offered_capabilities,
pn_connection_offered_capabilities(self._conn))
obj2dat(self.desired_capabilities,
pn_connection_desired_capabilities(self._conn))
obj2dat(self.properties, pn_connection_properties(self._conn))
pn_connection_open(self._conn)
def close(self):
self._update_cond()
pn_connection_close(self._conn)
@property
def state(self):
return pn_connection_state(self._conn)
def session(self):
return Session._wrap_session(pn_session(self._conn))
def session_head(self, mask):
return Session._wrap_session(pn_session_head(self._conn, mask))
def link_head(self, mask):
return Link._wrap_link(pn_link_head(self._conn, mask))
@property
def work_head(self):
return Delivery._wrap_delivery(pn_work_head(self._conn))
@property
def error(self):
return pn_error_code(pn_connection_error(self._conn))
class SessionException(ProtonException):
pass
class Session(Endpoint):
@staticmethod
def _wrap_session(c_ssn):
"""Maintain only a single instance of this class for each Session object that
exists in the C Engine.
"""
if c_ssn is None: return None
py_ssn = pn_void2py(pn_session_get_context(c_ssn))
if py_ssn: return py_ssn
wrapper = Session(c_ssn)
return wrapper
def __init__(self, ssn):
Endpoint.__init__(self)
self._ssn = ssn
pn_session_set_context(self._ssn, pn_py2void(self))
self._links = set()
self.connection._sessions.add(self)
@property
def _children(self):
return self._links
def _free_resource(self):
pn_session_free(self._ssn)
def _released(self):
self._ssn = None
def free(self):
"""Release the Session, freeing its resources.
Call this when you no longer need the session. This will allow the
session's resources to be reclaimed. Once called, you should no longer
reference the session.
"""
self.connection._sessions.remove(self)
self._release()
def _get_cond_impl(self):
return pn_session_condition(self._ssn)
def _get_remote_cond_impl(self):
return pn_session_remote_condition(self._ssn)
def _get_incoming_capacity(self):
return pn_session_get_incoming_capacity(self._ssn)
def _set_incoming_capacity(self, capacity):
pn_session_set_incoming_capacity(self._ssn, capacity)
incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
@property
def outgoing_bytes(self):
return pn_session_outgoing_bytes(self._ssn)
@property
def incoming_bytes(self):
return pn_session_incoming_bytes(self._ssn)
def open(self):
pn_session_open(self._ssn)
def close(self):
self._update_cond()
pn_session_close(self._ssn)
def next(self, mask):
return Session._wrap_session(pn_session_next(self._ssn, mask))
@property
def state(self):
return pn_session_state(self._ssn)
@property
def connection(self):
return Connection._wrap_connection(pn_session_connection(self._ssn))
def sender(self, name):
return Link._wrap_link(pn_sender(self._ssn, name))
def receiver(self, name):
return Link._wrap_link(pn_receiver(self._ssn, name))
class LinkException(ProtonException):
pass
class Link(Endpoint):
SND_UNSETTLED = PN_SND_UNSETTLED
SND_SETTLED = PN_SND_SETTLED
SND_MIXED = PN_SND_MIXED
RCV_FIRST = PN_RCV_FIRST
RCV_SECOND = PN_RCV_SECOND
@staticmethod
def _wrap_link(c_link):
"""Maintain only a single instance of this class for each Session object that
exists in the C Engine.
"""
if c_link is None: return None
py_link = pn_void2py(pn_link_get_context(c_link))
if py_link: return py_link
if pn_link_is_sender(c_link):
wrapper = Sender(c_link)
else:
wrapper = Receiver(c_link)
return wrapper
def __init__(self, c_link):
Endpoint.__init__(self)
self._link = c_link
pn_link_set_context(self._link, pn_py2void(self))
self._deliveries = set()
self.session._links.add(self)
@property
def _children(self):
return self._deliveries
def _free_resource(self):
pn_link_free(self._link)
def _released(self):
self._link = None
def free(self):
"""Release the Link, freeing its resources"""
self.session._links.remove(self)
self._release()
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, LinkException)
raise exc("[%s]: %s" % (err, pn_link_error(self._link)))
else:
return err
def _get_cond_impl(self):
return pn_link_condition(self._link)
def _get_remote_cond_impl(self):
return pn_link_remote_condition(self._link)
def open(self):
pn_link_open(self._link)
def close(self):
self._update_cond()
pn_link_close(self._link)
@property
def state(self):
return pn_link_state(self._link)
@property
def source(self):
return Terminus(pn_link_source(self._link))
@property
def target(self):
return Terminus(pn_link_target(self._link))
@property
def remote_source(self):
return Terminus(pn_link_remote_source(self._link))
@property
def remote_target(self):
return Terminus(pn_link_remote_target(self._link))
@property
def session(self):
return Session._wrap_session(pn_link_session(self._link))
@property
def connection(self):
return self.session.connection
def delivery(self, tag):
return Delivery._wrap_delivery(pn_delivery(self._link, tag))
@property
def current(self):
return Delivery._wrap_delivery(pn_link_current(self._link))
def advance(self):
return pn_link_advance(self._link)
@property
def unsettled(self):
return pn_link_unsettled(self._link)
@property
def credit(self):
return pn_link_credit(self._link)
@property
def available(self):
return pn_link_available(self._link)
@property
def queued(self):
return pn_link_queued(self._link)
def next(self, mask):
return Link._wrap_link(pn_link_next(self._link, mask))
@property
def name(self):
return pn_link_name(self._link)
@property
def is_sender(self):
return pn_link_is_sender(self._link)
@property
def is_receiver(self):
return pn_link_is_receiver(self._link)
@property
def remote_snd_settle_mode(self):
return pn_link_remote_snd_settle_mode(self._link)
@property
def remote_rcv_settle_mode(self):
return pn_link_remote_rcv_settle_mode(self._link)
def _get_snd_settle_mode(self):
return pn_link_snd_settle_mode(self._link)
def _set_snd_settle_mode(self, mode):
pn_link_set_snd_settle_mode(self._link, mode)
snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
def _get_rcv_settle_mode(self):
return pn_link_rcv_settle_mode(self._link)
def _set_rcv_settle_mode(self, mode):
pn_link_set_rcv_settle_mode(self._link, mode)
rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
def drained(self):
return pn_link_drained(self._link)
def detach(self):
return pn_link_detach(self._link)
class Terminus(object):
UNSPECIFIED = PN_UNSPECIFIED
SOURCE = PN_SOURCE
TARGET = PN_TARGET
COORDINATOR = PN_COORDINATOR
NONDURABLE = PN_NONDURABLE
CONFIGURATION = PN_CONFIGURATION
DELIVERIES = PN_DELIVERIES
DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
DIST_MODE_COPY = PN_DIST_MODE_COPY
DIST_MODE_MOVE = PN_DIST_MODE_MOVE
def __init__(self, impl):
self._impl = impl
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, LinkException)
raise exc("[%s]" % err)
else:
return err
def _get_type(self):
return pn_terminus_get_type(self._impl)
def _set_type(self, type):
self._check(pn_terminus_set_type(self._impl, type))
type = property(_get_type, _set_type)
def _get_address(self):
return pn_terminus_get_address(self._impl)
def _set_address(self, address):
self._check(pn_terminus_set_address(self._impl, address))
address = property(_get_address, _set_address)
def _get_durability(self):
return pn_terminus_get_durability(self._impl)
def _set_durability(self, seconds):
self._check(pn_terminus_set_durability(self._impl, seconds))
durability = property(_get_durability, _set_durability)
def _get_expiry_policy(self):
return pn_terminus_get_expiry_policy(self._impl)
def _set_expiry_policy(self, seconds):
self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
def _get_timeout(self):
return pn_terminus_get_timeout(self._impl)
def _set_timeout(self, seconds):
self._check(pn_terminus_set_timeout(self._impl, seconds))
timeout = property(_get_timeout, _set_timeout)
def _is_dynamic(self):
return pn_terminus_is_dynamic(self._impl)
def _set_dynamic(self, dynamic):
self._check(pn_terminus_set_dynamic(self._impl, dynamic))
dynamic = property(_is_dynamic, _set_dynamic)
def _get_distribution_mode(self):
return pn_terminus_get_distribution_mode(self._impl)
def _set_distribution_mode(self, mode):
self._check(pn_terminus_set_distribution_mode(self._impl, mode))
distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
@property
def properties(self):
return Data(pn_terminus_properties(self._impl))
@property
def capabilities(self):
return Data(pn_terminus_capabilities(self._impl))
@property
def outcomes(self):
return Data(pn_terminus_outcomes(self._impl))
@property
def filter(self):
return Data(pn_terminus_filter(self._impl))
def copy(self, src):
self._check(pn_terminus_copy(self._impl, src._impl))
class Sender(Link):
def __init__(self, c_link):
super(Sender, self).__init__(c_link)
def offered(self, n):
pn_link_offered(self._link, n)
def send(self, bytes):
return self._check(pn_link_send(self._link, bytes))
class Receiver(Link):
def __init__(self, c_link):
super(Receiver, self).__init__(c_link)
def flow(self, n):
pn_link_flow(self._link, n)
def recv(self, limit):
n, bytes = pn_link_recv(self._link, limit)
if n == PN_EOS:
return None
else:
self._check(n)
return bytes
def drain(self, n):
pn_link_drain(self._link, n)
def draining(self):
return pn_link_draining(self._link)
class NamedInt(int):
values = {}
def __new__(cls, i, name):
ni = super(NamedInt, cls).__new__(cls, i)
cls.values[i] = ni
return ni
def __init__(self, i, name):
self.name = name
def __repr__(self):
return self.name
def __str__(self):
return self.name
@classmethod
def get(cls, i):
return cls.values.get(i, i)
class DispositionType(NamedInt):
values = {}
class Disposition(object):
RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
REJECTED = DispositionType(PN_REJECTED, "REJECTED")
RELEASED = DispositionType(PN_RELEASED, "RELEASED")
MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
def __init__(self, impl, local):
self._impl = impl
self.local = local
self._data = None
self._condition = None
self._annotations = None
@property
def type(self):
return DispositionType.get(pn_disposition_type(self._impl))
def _get_section_number(self):
return pn_disposition_get_section_number(self._impl)
def _set_section_number(self, n):
pn_disposition_set_section_number(self._impl, n)
section_number = property(_get_section_number, _set_section_number)
def _get_section_offset(self):
return pn_disposition_get_section_offset(self._impl)
def _set_section_offset(self, n):
pn_disposition_set_section_offset(self._impl, n)
section_offset = property(_get_section_offset, _set_section_offset)
def _get_failed(self):
return pn_disposition_is_failed(self._impl)
def _set_failed(self, b):
pn_disposition_set_failed(self._impl, b)
failed = property(_get_failed, _set_failed)
def _get_undeliverable(self):
return pn_disposition_is_undeliverable(self._impl)
def _set_undeliverable(self, b):
pn_disposition_set_undeliverable(self._impl, b)
undeliverable = property(_get_undeliverable, _set_undeliverable)
def _get_data(self):
if self.local:
return self._data
else:
return dat2obj(pn_disposition_data(self._impl))
def _set_data(self, obj):
if self.local:
self._data = obj
else:
raise AttributeError("data attribute is read-only")
data = property(_get_data, _set_data)
def _get_annotations(self):
if self.local:
return self._annotations
else:
return dat2obj(pn_disposition_annotations(self._impl))
def _set_annotations(self, obj):
if self.local:
self._annotations = obj
else:
raise AttributeError("annotations attribute is read-only")
annotations = property(_get_annotations, _set_annotations)
def _get_condition(self):
if self.local:
return self._condition
else:
return cond2obj(pn_disposition_condition(self._impl))
def _set_condition(self, obj):
if self.local:
self._condition = obj
else:
raise AttributeError("condition attribute is read-only")
condition = property(_get_condition, _set_condition)
class Delivery(object):
RECEIVED = Disposition.RECEIVED
ACCEPTED = Disposition.ACCEPTED
REJECTED = Disposition.REJECTED
RELEASED = Disposition.RELEASED
MODIFIED = Disposition.MODIFIED
@staticmethod
def _wrap_delivery(c_dlv):
"""Maintain only a single instance of this class for each Delivery object that
exists in the C Engine.
"""
if not c_dlv: return None
py_dlv = pn_void2py(pn_delivery_get_context(c_dlv))
if py_dlv: return py_dlv
wrapper = Delivery(c_dlv)
return wrapper
def __init__(self, dlv):
self._dlv = dlv
pn_delivery_set_context(self._dlv, pn_py2void(self))
self.local = Disposition(pn_delivery_local(self._dlv), True)
self.remote = Disposition(pn_delivery_remote(self._dlv), False)
self.link._deliveries.add(self)
def __del__(self):
self._release()
def _release(self):
"""Release the underlying C Engine resource."""
if self._dlv:
pn_delivery_set_context(self._dlv, pn_py2void(None))
pn_delivery_settle(self._dlv)
self._dlv = None
@property
def released(self):
return self._dlv is None
@property
def tag(self):
return pn_delivery_tag(self._dlv)
@property
def writable(self):
return pn_delivery_writable(self._dlv)
@property
def readable(self):
return pn_delivery_readable(self._dlv)
@property
def updated(self):
return pn_delivery_updated(self._dlv)
def update(self, state):
obj2dat(self.local._data, pn_disposition_data(self.local._impl))
obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
pn_delivery_update(self._dlv, state)
@property
def pending(self):
return pn_delivery_pending(self._dlv)
@property
def partial(self):
return pn_delivery_partial(self._dlv)
@property
def local_state(self):
return DispositionType.get(pn_delivery_local_state(self._dlv))
@property
def remote_state(self):
return DispositionType.get(pn_delivery_remote_state(self._dlv))
@property
def settled(self):
return pn_delivery_settled(self._dlv)
def settle(self):
"""Release the delivery"""
self.link._deliveries.remove(self)
self._release()
@property
def work_next(self):
return Delivery._wrap_delivery(pn_work_next(self._dlv))
@property
def link(self):
return Link._wrap_link(pn_delivery_link(self._dlv))
class TransportException(ProtonException):
pass
class Transport(object):
TRACE_OFF = PN_TRACE_OFF
TRACE_DRV = PN_TRACE_DRV
TRACE_FRM = PN_TRACE_FRM
TRACE_RAW = PN_TRACE_RAW
CLIENT = 1
SERVER = 2
@staticmethod
def _wrap_transport(c_trans):
if not c_trans: return None
wrapper = Transport(_trans=c_trans)
return wrapper
def __init__(self, mode=None, _trans=None):
if not mode and not _trans:
self._trans = pn_transport()
elif not mode:
self._shared_trans = True
self._trans = _trans
elif mode==Transport.CLIENT:
self._trans = pn_transport()
elif mode==Transport.SERVER:
self._trans = pn_transport()
pn_transport_set_server(self._trans)
else:
raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
self._sasl = None
self._ssl = None
def __del__(self):
if hasattr(self, "_trans"):
if not hasattr(self, "_shared_trans"):
pn_transport_free(self._trans)
if hasattr(self, "_sasl") and self._sasl:
# pn_transport_free deallocs the C sasl associated with the
# transport, so erase the reference if a SASL object was used.
self._sasl._sasl = None
self._sasl = None
if hasattr(self, "_ssl") and self._ssl:
# ditto the owned c SSL object
self._ssl._ssl = None
self._ssl = None
del self._trans
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, TransportException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._trans))))
else:
return err
def bind(self, connection):
"""Assign a connection to the transport"""
self._check(pn_transport_bind(self._trans, connection._conn))
# keep python connection from being garbage collected:
self._connection = connection
def unbind(self):
"""Release the connection"""
self._check(pn_transport_unbind(self._trans))
self._connection = None
def trace(self, n):
pn_transport_trace(self._trans, n)
def tick(self, now):
"""Process any timed events (like heartbeat generation).
now = seconds since epoch (float).
"""
return millis2secs(pn_transport_tick(self._trans, secs2millis(now)))
def capacity(self):
c = pn_transport_capacity(self._trans)
if c >= PN_EOS:
return c
else:
return self._check(c)
def push(self, bytes):
n = self._check(pn_transport_push(self._trans, bytes))
if n != len(bytes):
raise OverflowError("unable to process all bytes")
def close_tail(self):
self._check(pn_transport_close_tail(self._trans))
def pending(self):
p = pn_transport_pending(self._trans)
if p >= PN_EOS:
return p
else:
return self._check(p)
def peek(self, size):
cd, out = pn_transport_peek(self._trans, size)
if cd == PN_EOS:
return None
else:
self._check(cd)
return out
def pop(self, size):
pn_transport_pop(self._trans, size)
def close_head(self):
self._check(pn_transport_close_head(self._trans))
@property
def closed(self):
return pn_transport_closed(self._trans)
# AMQP 1.0 max-frame-size
def _get_max_frame_size(self):
return pn_transport_get_max_frame(self._trans)
def _set_max_frame_size(self, value):
pn_transport_set_max_frame(self._trans, value)
max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
doc="""
Sets the maximum size for received frames (in bytes).
""")
@property
def remote_max_frame_size(self):
return pn_transport_get_remote_max_frame(self._trans)
def _get_channel_max(self):
return pn_transport_get_channel_max(self._trans)
def _set_channel_max(self, value):
pn_transport_set_channel_max(self._trans, value)
channel_max = property(_get_channel_max, _set_channel_max,
doc="""
Sets the maximum channel that may be used on the transport.
""")
@property
def remote_channel_max(self):
return pn_transport_remote_channel_max(self._trans)
# AMQP 1.0 idle-time-out
def _get_idle_timeout(self):
return millis2secs(pn_transport_get_idle_timeout(self._trans))
def _set_idle_timeout(self, sec):
pn_transport_set_idle_timeout(self._trans, secs2millis(sec))
idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
doc="""
The idle timeout of the connection (float, in seconds).
""")
@property
def remote_idle_timeout(self):
return millis2secs(pn_transport_get_remote_idle_timeout(self._trans))
@property
def frames_output(self):
return pn_transport_get_frames_output(self._trans)
@property
def frames_input(self):
return pn_transport_get_frames_input(self._trans)
def sasl(self):
# SASL factory (singleton for this transport)
if not self._sasl:
self._sasl = SASL(self)
return self._sasl
def ssl(self, domain=None, session_details=None):
# SSL factory (singleton for this transport)
if not self._ssl:
self._ssl = SSL(self, domain, session_details)
return self._ssl
@property
def condition(self):
return cond2obj(pn_transport_condition(self._trans))
@property
def connection(self):
return Connection._wrap_connection(pn_transport_connection(self._trans))
class SASLException(TransportException):
pass
class SASL(object):
OK = PN_SASL_OK
AUTH = PN_SASL_AUTH
SKIPPED = PN_SASL_SKIPPED
def __new__(cls, transport):
"""Enforce a singleton SASL object per Transport"""
if not transport._sasl:
obj = super(SASL, cls).__new__(cls)
obj._sasl = pn_sasl(transport._trans)
transport._sasl = obj
return transport._sasl
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, SASLException)
raise exc("[%s]" % (err))
else:
return err
def mechanisms(self, mechs):
pn_sasl_mechanisms(self._sasl, mechs)
# @deprecated
def client(self):
pn_sasl_client(self._sasl)
# @deprecated
def server(self):
pn_sasl_server(self._sasl)
def allow_skip(self, allow):
pn_sasl_allow_skip(self._sasl, allow)
def plain(self, user, password):
pn_sasl_plain(self._sasl, user, password)
def send(self, data):
self._check(pn_sasl_send(self._sasl, data, len(data)))
def recv(self):
sz = 16
while True:
n, data = pn_sasl_recv(self._sasl, sz)
if n == PN_OVERFLOW:
sz *= 2
continue
elif n == PN_EOS:
return None
else:
self._check(n)
return data
@property
def outcome(self):
outcome = pn_sasl_outcome(self._sasl)
if outcome == PN_SASL_NONE:
return None
else:
return outcome
def done(self, outcome):
pn_sasl_done(self._sasl, outcome)
STATE_IDLE = PN_SASL_IDLE
STATE_STEP = PN_SASL_STEP
STATE_PASS = PN_SASL_PASS
STATE_FAIL = PN_SASL_FAIL
@property
def state(self):
return pn_sasl_state(self._sasl)
class SSLException(TransportException):
pass
class SSLUnavailable(SSLException):
pass
class SSLDomain(object):
MODE_CLIENT = PN_SSL_MODE_CLIENT
MODE_SERVER = PN_SSL_MODE_SERVER
VERIFY_PEER = PN_SSL_VERIFY_PEER
VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
def __init__(self, mode):
self._domain = pn_ssl_domain(mode)
if self._domain is None:
raise SSLUnavailable()
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, SSLException)
raise exc("SSL failure.")
else:
return err
def set_credentials(self, cert_file, key_file, password):
return self._check( pn_ssl_domain_set_credentials(self._domain,
cert_file, key_file,
password) )
def set_trusted_ca_db(self, certificate_db):
return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain,
certificate_db) )
def set_peer_authentication(self, verify_mode, trusted_CAs=None):
return self._check( pn_ssl_domain_set_peer_authentication(self._domain,
verify_mode,
trusted_CAs) )
def allow_unsecured_client(self):
return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
class SSL(object):
@staticmethod
def present():
return pn_ssl_present()
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, SSLException)
raise exc("SSL failure.")
else:
return err
def __new__(cls, transport, domain, session_details=None):
"""Enforce a singleton SSL object per Transport"""
if transport._ssl:
# unfortunately, we've combined the allocation and the configuration in a
# single step. So catch any attempt by the application to provide what
# may be a different configuration than the original (hack)
ssl = transport._ssl
if (domain and (ssl._domain is not domain) or
session_details and (ssl._session_details is not session_details)):
raise SSLException("Cannot re-configure existing SSL object!")
else:
obj = super(SSL, cls).__new__(cls)
obj._domain = domain
obj._session_details = session_details
session_id = None
if session_details:
session_id = session_details.get_session_id()
obj._ssl = pn_ssl( transport._trans )
if obj._ssl is None:
raise SSLUnavailable()
pn_ssl_init( obj._ssl, domain._domain, session_id )
transport._ssl = obj
return transport._ssl
def cipher_name(self):
rc, name = pn_ssl_get_cipher_name( self._ssl, 128 )
if rc:
return name
return None
def protocol_name(self):
rc, name = pn_ssl_get_protocol_name( self._ssl, 128 )
if rc:
return name
return None
RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
RESUME_NEW = PN_SSL_RESUME_NEW
RESUME_REUSED = PN_SSL_RESUME_REUSED
def resume_status(self):
return pn_ssl_resume_status( self._ssl )
def _set_peer_hostname(self, hostname):
self._check(pn_ssl_set_peer_hostname( self._ssl, hostname ))
def _get_peer_hostname(self):
err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 )
self._check(err)
return name
peer_hostname = property(_get_peer_hostname, _set_peer_hostname,
doc="""
Manage the expected name of the remote peer. Used to authenticate the remote.
""")
class SSLSessionDetails(object):
""" Unique identifier for the SSL session. Used to resume previous session on a new
SSL connection.
"""
def __init__(self, session_id):
self._session_id = session_id
def get_session_id(self):
return self._session_id
wrappers = {
"pn_void": lambda x: pn_void2py(x),
"pn_pyref": lambda x: pn_void2py(x),
"pn_connection": lambda x: Connection._wrap_connection(pn_cast_pn_connection(x)),
"pn_session": lambda x: Session._wrap_session(pn_cast_pn_session(x)),
"pn_link": lambda x: Link._wrap_link(pn_cast_pn_link(x)),
"pn_delivery": lambda x: Delivery._wrap_delivery(pn_cast_pn_delivery(x)),
"pn_transport": lambda x: Transport._wrap_transport(pn_cast_pn_transport(x))
}
class Collector:
def __init__(self):
self._impl = pn_collector()
self._contexts = set()
def put(self, obj, etype):
pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
def peek(self):
event = pn_collector_peek(self._impl)
if event is None:
return None
clazz = pn_class_name(pn_event_class(event))
context = wrappers[clazz](pn_event_context(event))
return Event(clazz, context, EventType.TYPES[pn_event_type(event)])
def pop(self):
ev = self.peek()
if ev is not None:
ev._popped(self)
pn_collector_pop(self._impl)
def __del__(self):
pn_collector_free(self._impl)
class EventType:
TYPES = {}
def __init__(self, number, method):
self.number = number
self.name = pn_event_type_name(self.number)
self.method = method
self.TYPES[number] = self
def __repr__(self):
return self.name
def dispatch(handler, method, *args):
m = getattr(handler, method, None)
if m:
return m(*args)
elif hasattr(handler, "on_unhandled"):
return handler.on_unhandled(method, args)
class Event(object):
CONNECTION_INIT = EventType(PN_CONNECTION_INIT, "on_connection_init")
CONNECTION_BOUND = EventType(PN_CONNECTION_BOUND, "on_connection_bound")
CONNECTION_UNBOUND = EventType(PN_CONNECTION_UNBOUND, "on_connection_unbound")
CONNECTION_LOCAL_OPEN = EventType(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open")
CONNECTION_LOCAL_CLOSE = EventType(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close")
CONNECTION_REMOTE_OPEN = EventType(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open")
CONNECTION_REMOTE_CLOSE = EventType(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close")
CONNECTION_FINAL = EventType(PN_CONNECTION_FINAL, "on_connection_final")
SESSION_INIT = EventType(PN_SESSION_INIT, "on_session_init")
SESSION_LOCAL_OPEN = EventType(PN_SESSION_LOCAL_OPEN, "on_session_local_open")
SESSION_LOCAL_CLOSE = EventType(PN_SESSION_LOCAL_CLOSE, "on_session_local_close")
SESSION_REMOTE_OPEN = EventType(PN_SESSION_REMOTE_OPEN, "on_session_remote_open")
SESSION_REMOTE_CLOSE = EventType(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close")
SESSION_FINAL = EventType(PN_SESSION_FINAL, "on_session_final")
LINK_INIT = EventType(PN_LINK_INIT, "on_link_init")
LINK_LOCAL_OPEN = EventType(PN_LINK_LOCAL_OPEN, "on_link_local_open")
LINK_LOCAL_CLOSE = EventType(PN_LINK_LOCAL_CLOSE, "on_link_local_close")
LINK_LOCAL_DETACH = EventType(PN_LINK_LOCAL_DETACH, "on_link_local_detach")
LINK_REMOTE_OPEN = EventType(PN_LINK_REMOTE_OPEN, "on_link_remote_open")
LINK_REMOTE_CLOSE = EventType(PN_LINK_REMOTE_CLOSE, "on_link_remote_close")
LINK_REMOTE_DETACH = EventType(PN_LINK_REMOTE_DETACH, "on_link_remote_detach")
LINK_FLOW = EventType(PN_LINK_FLOW, "on_link_flow")
LINK_FINAL = EventType(PN_LINK_FINAL, "on_link_final")
DELIVERY = EventType(PN_DELIVERY, "on_delivery")
TRANSPORT = EventType(PN_TRANSPORT, "on_transport")
TRANSPORT_ERROR = EventType(PN_TRANSPORT_ERROR, "on_transport_error")
TRANSPORT_HEAD_CLOSED = EventType(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed")
TRANSPORT_TAIL_CLOSED = EventType(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
TRANSPORT_CLOSED = EventType(PN_TRANSPORT_CLOSED, "on_transport_closed")
def __init__(self, clazz, context, type):
self.clazz = clazz
self.context = context
self.type = type
def _popped(self, collector):
if self.type in (Event.LINK_FINAL, Event.SESSION_FINAL,
Event.CONNECTION_FINAL):
collector._contexts.remove(self.context)
self.context._released()
def dispatch(self, handler):
return dispatch(handler, self.type.method, self)
@property
def connection(self):
if self.clazz == "pn_connection":
return self.context
elif self.clazz == "pn_session":
return self.context.connection
elif self.clazz == "pn_link":
return self.context.connection
elif self.clazz == "pn_delivery" and not self.context.released:
return self.context.link.connection
else:
return None
@property
def session(self):
if self.clazz == "pn_session":
return self.context
elif self.clazz == "pn_link":
return self.context.session
elif self.clazz == "pn_delivery" and not self.context.released:
return self.context.link.session
else:
return None
@property
def link(self):
if self.clazz == "pn_link":
return self.context
elif self.clazz == "pn_delivery" and not self.context.released:
return self.context.link
else:
return None
@property
def sender(self):
l = self.link
if l and l.is_sender:
return l
else:
return None
@property
def receiver(self):
l = self.link
if l and l.is_receiver:
return l
else:
return None
@property
def delivery(self):
if self.clazz == "pn_delivery":
return self.context
else:
return None
def __repr__(self):
return "%s(%s)" % (self.type, self.context)
class Handler(object):
def on_unhandled(self, method, args):
pass
###
# Driver
###
class DriverException(ProtonException):
"""
The DriverException class is the root of the driver exception hierarchy.
"""
pass
class Connector(object):
@staticmethod
def _wrap_connector(c_cxtr, py_driver=None):
"""Maintain only a single instance of this class for each Connector object that
exists in the C Driver.
"""
if not c_cxtr: return None
py_cxtr = pn_void2py(pn_connector_context(c_cxtr))
if py_cxtr: return py_cxtr
wrapper = Connector(_cxtr=c_cxtr, _py_driver=py_driver)
return wrapper
def __init__(self, _cxtr, _py_driver):
self._cxtr = _cxtr
assert(_py_driver)
self._driver = weakref.ref(_py_driver)
pn_connector_set_context(self._cxtr, pn_py2void(self))
self._connection = None
self._driver()._connectors.add(self)
def _release(self):
"""Release the underlying C Engine resource."""
if self._cxtr:
pn_connector_set_context(self._cxtr, pn_py2void(None))
pn_connector_free(self._cxtr)
self._cxtr = None
def free(self):
"""Release the Connector, freeing its resources.
Call this when you no longer need the Connector. This will allow the
connector's resources to be reclaimed. Once called, you should no longer
reference this connector.
"""
self.connection = None
d = self._driver()
if d: d._connectors.remove(self)
self._release()
def next(self):
return Connector._wrap_connector(pn_connector_next(self._cxtr))
def process(self):
pn_connector_process(self._cxtr)
def listener(self):
return Listener._wrap_listener(pn_connector_listener(self._cxtr))
def sasl(self):
## seems easier just to grab the SASL associated with the transport:
trans = self.transport
if trans:
return SASL(self.transport)
return None
@property
def transport(self):
trans = pn_connector_transport(self._cxtr)
return Transport._wrap_transport(trans)
def close(self):
return pn_connector_close(self._cxtr)
@property
def closed(self):
return pn_connector_closed(self._cxtr)
def _get_connection(self):
return self._connection
def _set_connection(self, conn):
if conn:
pn_connector_set_connection(self._cxtr, conn._conn)
else:
pn_connector_set_connection(self._cxtr, None)
self._connection = conn
connection = property(_get_connection, _set_connection,
doc="""
Associate a Connection with this Connector.
""")
class Listener(object):
@staticmethod
def _wrap_listener(c_lsnr, py_driver=None):
"""Maintain only a single instance of this class for each Listener object that
exists in the C Driver.
"""
if not c_lsnr: return None
py_lsnr = pn_void2py(pn_listener_context(c_lsnr))
if py_lsnr: return py_lsnr
wrapper = Listener(_lsnr=c_lsnr, _py_driver=py_driver)
return wrapper
def __init__(self, _lsnr, _py_driver):
self._lsnr = _lsnr
assert(_py_driver)
self._driver = weakref.ref(_py_driver)
pn_listener_set_context(self._lsnr, pn_py2void(self))
self._driver()._listeners.add(self)
def _release(self):
"""Release the underlying C Engine resource."""
if self._lsnr:
pn_listener_set_context(self._lsnr, pn_py2void(None));
pn_listener_free(self._lsnr)
self._lsnr = None
def free(self):
"""Release the Listener, freeing its resources"""
d = self._driver()
if d: d._listeners.remove(self)
self._release()
def next(self):
return Listener._wrap_listener(pn_listener_next(self._lsnr))
def accept(self):
d = self._driver()
if d:
cxtr = pn_listener_accept(self._lsnr)
c = Connector._wrap_connector(cxtr, d)
return c
return None
def close(self):
pn_listener_close(self._lsnr)
class Driver(object):
def __init__(self):
self._driver = pn_driver()
self._listeners = set()
self._connectors = set()
def __del__(self):
# freeing the driver will release all child objects in the C Engine, so
# clean up their references in the corresponding Python objects
for c in self._connectors:
c._release()
for l in self._listeners:
l._release()
if hasattr(self, "_driver") and self._driver:
pn_driver_free(self._driver)
del self._driver
def wait(self, timeout_sec):
if timeout_sec is None or timeout_sec < 0.0:
t = -1
else:
t = secs2millis(timeout_sec)
return pn_driver_wait(self._driver, t)
def wakeup(self):
return pn_driver_wakeup(self._driver)
def listener(self, host, port):
"""Construct a listener"""
return Listener._wrap_listener(pn_listener(self._driver, host, port, None),
self)
def pending_listener(self):
return Listener._wrap_listener(pn_driver_listener(self._driver))
def head_listener(self):
return Listener._wrap_listener(pn_listener_head(self._driver))
def connector(self, host, port):
return Connector._wrap_connector(pn_connector(self._driver, host, port, None),
self)
def head_connector(self):
return Connector._wrap_connector(pn_connector_head(self._driver))
def pending_connector(self):
return Connector._wrap_connector(pn_driver_connector(self._driver))
class Url(object):
"""
Simple URL parser/constructor, handles URLs of the form:
<scheme>://<user>:<password>@<host>:<port>/<path>
All components can be None if not specifeid in the URL string.
The port can be specified as a service name, e.g. 'amqp' in the
URL string but Url.port always gives the integer value.
@ivar scheme: Url scheme e.g. 'amqp' or 'amqps'
@ivar user: Username
@ivar password: Password
@ivar host: Host name, ipv6 literal or ipv4 dotted quad.
@ivar port: Integer port.
@ivar host_port: Returns host:port
"""
AMQPS = "amqps"
AMQP = "amqp"
class Port(int):
"""An integer port number that can be constructed from a service name string"""
def __new__(cls, value):
"""@param value: integer port number or string service name."""
port = super(Url.Port, cls).__new__(cls, cls._port_int(value))
setattr(port, 'name', str(value))
return port
def __eq__(self, x): return str(self) == x or int(self) == x
def __ne__(self, x): return not self == x
def __str__(self): return str(self.name)
@staticmethod
def _port_int(value):
"""Convert service, an integer or a service name, into an integer port number."""
try:
return int(value)
except ValueError:
try:
return socket.getservbyname(value)
except socket.error:
# Not every system has amqp/amqps defined as a service
if value == Url.AMQPS: return 5671
elif value == Url.AMQP: return 5672
else:
raise ValueError("Not a valid port number or service name: '%s'" % value)
def __init__(self, url=None, **kwargs):
"""
@param url: URL string to parse.
@param kwargs: scheme, user, password, host, port, path.
If specified, replaces corresponding part in url string.
"""
if url:
self._url = pn_url_parse(str(url))
if not self._url: raise ValueError("Invalid URL '%s'" % url)
else:
self._url = pn_url()
for k in kwargs: # Let kwargs override values parsed from url
getattr(self, k) # Check for invalid kwargs
setattr(self, k, kwargs[k])
class PartDescriptor(object):
def __init__(self, part):
self.getter = globals()["pn_url_get_%s" % part]
self.setter = globals()["pn_url_set_%s" % part]
def __get__(self, obj, type=None): return self.getter(obj._url)
def __set__(self, obj, value): return self.setter(obj._url, str(value))
scheme = PartDescriptor('scheme')
username = PartDescriptor('username')
password = PartDescriptor('password')
host = PartDescriptor('host')
path = PartDescriptor('path')
def _get_port(self):
portstr = pn_url_get_port(self._url)
return portstr and Url.Port(portstr)
def _set_port(self, value):
if value is None: pn_url_set_port(self._url, None)
else: pn_url_set_port(self._url, str(Url.Port(value)))
port = property(_get_port, _set_port)
def __str__(self): return pn_url_str(self._url)
def __repr__(self): return "Url(%r)" % str(self)
def __del__(self):
pn_url_free(self._url);
self._url = None
def defaults(self):
"""
Fill in missing values (scheme, host or port) with defaults
@return: self
"""
self.scheme = self.scheme or self.AMQP
self.host = self.host or '0.0.0.0'
self.port = self.port or self.Port(self.scheme)
return self
__all__ = [
"API_LANGUAGE",
"IMPLEMENTATION_LANGUAGE",
"ABORTED",
"ACCEPTED",
"AUTOMATIC",
"PENDING",
"MANUAL",
"REJECTED",
"RELEASED",
"SETTLED",
"UNDESCRIBED",
"Array",
"Collector",
"Condition",
"Connection",
"Connector",
"Data",
"Delivery",
"Disposition",
"Described",
"Driver",
"DriverException",
"Endpoint",
"Event",
"Handler",
"Link",
"Listener",
"Message",
"MessageException",
"Messenger",
"MessengerException",
"ProtonException",
"VERSION_MAJOR",
"VERSION_MINOR",
"Receiver",
"SASL",
"Sender",
"Session",
"SSL",
"SSLDomain",
"SSLSessionDetails",
"SSLUnavailable",
"SSLException",
"Terminus",
"Timeout",
"Interrupt",
"Transport",
"TransportException",
"Url",
"char",
"dispatch",
"symbol",
"timestamp",
"ulong"
]