blob: d7db20b418b7a55d222f2283ff5b99a72828227f [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
The proton module defines a suite of APIs that implement the AMQP 1.0
protocol.
The proton APIs consist of the following classes:
- L{Messenger} -- A messaging endpoint.
- L{Message} -- A class for creating and/or accessing AMQP message content.
- L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
data.
"""
from __future__ import absolute_import
from cproton import *
from .wrapper import Wrapper
from proton import _compat
import weakref, socket, sys, threading
try:
import uuid
def generate_uuid():
return uuid.uuid4()
except ImportError:
"""
No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases.
"""
import struct
class uuid:
class UUID:
def __init__(self, hex=None, bytes=None):
if [hex, bytes].count(None) != 1:
raise TypeError("need one of hex or bytes")
if bytes is not None:
self.bytes = bytes
elif hex is not None:
fields=hex.split("-")
fields[4:5] = [fields[4][:4], fields[4][4:]]
self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
def __cmp__(self, other):
if isinstance(other, uuid.UUID):
return cmp(self.bytes, other.bytes)
else:
return -1
def __str__(self):
return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
def __repr__(self):
return "UUID(%r)" % str(self)
def __hash__(self):
return self.bytes.__hash__()
import os, random, time
rand = random.Random()
rand.seed((os.getpid(), time.time(), socket.gethostname()))
def random_uuid():
data = [rand.randint(0, 255) for i in xrange(16)]
# From RFC4122, the version bits are set to 0100
data[6] &= 0x0F
data[6] |= 0x40
# From RFC4122, the top two bits of byte 8 get set to 01
data[8] &= 0x3F
data[8] |= 0x80
return "".join(map(chr, data))
def uuid4():
return uuid.UUID(bytes=random_uuid())
def generate_uuid():
return uuid4()
#
# Hacks to provide Python2 <---> Python3 compatibility
#
try:
bytes()
except NameError:
bytes = str
try:
long()
except NameError:
long = int
try:
unicode()
except NameError:
unicode = str
VERSION_MAJOR = PN_VERSION_MAJOR
VERSION_MINOR = PN_VERSION_MINOR
VERSION_POINT = PN_VERSION_POINT
VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT)
API_LANGUAGE = "C"
IMPLEMENTATION_LANGUAGE = "C"
class Constant(object):
def __init__(self, name):
self.name = name
def __repr__(self):
return self.name
class ProtonException(Exception):
"""
The root of the proton exception hierarchy. All proton exception
classes derive from this exception.
"""
pass
class Timeout(ProtonException):
"""
A timeout exception indicates that a blocking operation has timed
out.
"""
pass
class Interrupt(ProtonException):
"""
An interrupt exception indicaes that a blocking operation was interrupted.
"""
pass
class MessengerException(ProtonException):
"""
The root of the messenger exception hierarchy. All exceptions
generated by the messenger class derive from this exception.
"""
pass
class MessageException(ProtonException):
"""
The MessageException class is the root of the message exception
hierarhcy. All exceptions generated by the Message class derive from
this exception.
"""
pass
EXCEPTIONS = {
PN_TIMEOUT: Timeout,
PN_INTR: Interrupt
}
PENDING = Constant("PENDING")
ACCEPTED = Constant("ACCEPTED")
REJECTED = Constant("REJECTED")
RELEASED = Constant("RELEASED")
MODIFIED = Constant("MODIFIED")
ABORTED = Constant("ABORTED")
SETTLED = Constant("SETTLED")
STATUSES = {
PN_STATUS_ABORTED: ABORTED,
PN_STATUS_ACCEPTED: ACCEPTED,
PN_STATUS_REJECTED: REJECTED,
PN_STATUS_RELEASED: RELEASED,
PN_STATUS_MODIFIED: MODIFIED,
PN_STATUS_PENDING: PENDING,
PN_STATUS_SETTLED: SETTLED,
PN_STATUS_UNKNOWN: None
}
AUTOMATIC = Constant("AUTOMATIC")
MANUAL = Constant("MANUAL")
class Messenger(object):
"""
The L{Messenger} class defines a high level interface for sending
and receiving L{Messages<Message>}. Every L{Messenger} contains a
single logical queue of incoming messages and a single logical queue
of outgoing messages. These messages in these queues may be destined
for, or originate from, a variety of addresses.
The messenger interface is single-threaded. All methods
except one (L{interrupt}) are intended to be used from within
the messenger thread.
Address Syntax
==============
An address has the following form::
[ amqp[s]:// ] [user[:password]@] domain [/[name]]
Where domain can be one of::
host | host:port | ip | ip:port | name
The following are valid examples of addresses:
- example.org
- example.org:1234
- amqp://example.org
- amqps://example.org
- example.org/incoming
- amqps://example.org/outgoing
- amqps://fred:trustno1@example.org
- 127.0.0.1:1234
- amqps://127.0.0.1:1234
Sending & Receiving Messages
============================
The L{Messenger} class works in conjuction with the L{Message} class. The
L{Message} class is a mutable holder of message content.
The L{put} method copies its L{Message} to the outgoing queue, and may
send queued messages if it can do so without blocking. The L{send}
method blocks until it has sent the requested number of messages,
or until a timeout interrupts the attempt.
>>> message = Message()
>>> for i in range(3):
... message.address = "amqp://host/queue"
... message.subject = "Hello World %i" % i
... messenger.put(message)
>>> messenger.send()
Similarly, the L{recv} method receives messages into the incoming
queue, and may block as it attempts to receive the requested number
of messages, or until timeout is reached. It may receive fewer
than the requested number. The L{get} method pops the
eldest L{Message} off the incoming queue and copies it into the L{Message}
object that you supply. It will not block.
>>> message = Message()
>>> messenger.recv(10):
>>> while messenger.incoming > 0:
... messenger.get(message)
... print message.subject
Hello World 0
Hello World 1
Hello World 2
The blocking flag allows you to turn off blocking behavior entirely,
in which case L{send} and L{recv} will do whatever they can without
blocking, and then return. You can then look at the number
of incoming and outgoing messages to see how much outstanding work
still remains.
"""
def __init__(self, name=None):
"""
Construct a new L{Messenger} with the given name. The name has
global scope. If a NULL name is supplied, a UUID based name will
be chosen.
@type name: string
@param name: the name of the messenger or None
"""
self._mng = pn_messenger(name)
self._selectables = {}
def __del__(self):
"""
Destroy the L{Messenger}. This will close all connections that
are managed by the L{Messenger}. Call the L{stop} method before
destroying the L{Messenger}.
"""
if hasattr(self, "_mng"):
pn_messenger_free(self._mng)
del self._mng
def _check(self, err):
if err < 0:
if (err == PN_INPROGRESS):
return
exc = EXCEPTIONS.get(err, MessengerException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng))))
else:
return err
@property
def name(self):
"""
The name of the L{Messenger}.
"""
return pn_messenger_name(self._mng)
def _get_certificate(self):
return pn_messenger_get_certificate(self._mng)
def _set_certificate(self, value):
self._check(pn_messenger_set_certificate(self._mng, value))
certificate = property(_get_certificate, _set_certificate,
doc="""
Path to a certificate file for the L{Messenger}. This certificate is
used when the L{Messenger} accepts or establishes SSL/TLS connections.
This property must be specified for the L{Messenger} to accept
incoming SSL/TLS connections and to establish client authenticated
outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
connections do not require this property.
""")
def _get_private_key(self):
return pn_messenger_get_private_key(self._mng)
def _set_private_key(self, value):
self._check(pn_messenger_set_private_key(self._mng, value))
private_key = property(_get_private_key, _set_private_key,
doc="""
Path to a private key file for the L{Messenger's<Messenger>}
certificate. This property must be specified for the L{Messenger} to
accept incoming SSL/TLS connections and to establish client
authenticated outgoing SSL/TLS connection. Non client authenticated
SSL/TLS connections do not require this property.
""")
def _get_password(self):
return pn_messenger_get_password(self._mng)
def _set_password(self, value):
self._check(pn_messenger_set_password(self._mng, value))
password = property(_get_password, _set_password,
doc="""
This property contains the password for the L{Messenger.private_key}
file, or None if the file is not encrypted.
""")
def _get_trusted_certificates(self):
return pn_messenger_get_trusted_certificates(self._mng)
def _set_trusted_certificates(self, value):
self._check(pn_messenger_set_trusted_certificates(self._mng, value))
trusted_certificates = property(_get_trusted_certificates,
_set_trusted_certificates,
doc="""
A path to a database of trusted certificates for use in verifying the
peer on an SSL/TLS connection. If this property is None, then the peer
will not be verified.
""")
def _get_timeout(self):
t = pn_messenger_get_timeout(self._mng)
if t == -1:
return None
else:
return millis2secs(t)
def _set_timeout(self, value):
if value is None:
t = -1
else:
t = secs2millis(value)
self._check(pn_messenger_set_timeout(self._mng, t))
timeout = property(_get_timeout, _set_timeout,
doc="""
The timeout property contains the default timeout for blocking
operations performed by the L{Messenger}.
""")
def _is_blocking(self):
return pn_messenger_is_blocking(self._mng)
def _set_blocking(self, b):
self._check(pn_messenger_set_blocking(self._mng, b))
blocking = property(_is_blocking, _set_blocking,
doc="""
Enable or disable blocking behavior during L{Message} sending
and receiving. This affects every blocking call, with the
exception of L{work}. Currently, the affected calls are
L{send}, L{recv}, and L{stop}.
""")
def _is_passive(self):
return pn_messenger_is_passive(self._mng)
def _set_passive(self, b):
self._check(pn_messenger_set_passive(self._mng, b))
passive = property(_is_passive, _set_passive,
doc="""
When passive is set to true, Messenger will not attempt to perform I/O
internally. In this mode it is necessary to use the selectables API to
drive any I/O needed to perform requested actions. In this mode
Messenger will never block.
""")
def _get_incoming_window(self):
return pn_messenger_get_incoming_window(self._mng)
def _set_incoming_window(self, window):
self._check(pn_messenger_set_incoming_window(self._mng, window))
incoming_window = property(_get_incoming_window, _set_incoming_window,
doc="""
The incoming tracking window for the messenger. The messenger will
track the remote status of this many incoming deliveries after they
have been accepted or rejected. Defaults to zero.
L{Messages<Message>} enter this window only when you take them into your application
using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>}
without explicitly accepting or rejecting the oldest message, then the
message that passes beyond the edge of the incoming window will be assigned
the default disposition of its link.
""")
def _get_outgoing_window(self):
return pn_messenger_get_outgoing_window(self._mng)
def _set_outgoing_window(self, window):
self._check(pn_messenger_set_outgoing_window(self._mng, window))
outgoing_window = property(_get_outgoing_window, _set_outgoing_window,
doc="""
The outgoing tracking window for the messenger. The messenger will
track the remote status of this many outgoing deliveries after calling
send. Defaults to zero.
A L{Message} enters this window when you call the put() method with the
message. If your outgoing window size is I{n}, and you call L{put} I{n}+1
times, status information will no longer be available for the
first message.
""")
def start(self):
"""
Currently a no-op placeholder.
For future compatibility, do not L{send} or L{recv} messages
before starting the L{Messenger}.
"""
self._check(pn_messenger_start(self._mng))
def stop(self):
"""
Transitions the L{Messenger} to an inactive state. An inactive
L{Messenger} will not send or receive messages from its internal
queues. A L{Messenger} should be stopped before being discarded to
ensure a clean shutdown handshake occurs on any internally managed
connections.
"""
self._check(pn_messenger_stop(self._mng))
@property
def stopped(self):
"""
Returns true iff a L{Messenger} is in the stopped state.
This function does not block.
"""
return pn_messenger_stopped(self._mng)
def subscribe(self, source):
"""
Subscribes the L{Messenger} to messages originating from the
specified source. The source is an address as specified in the
L{Messenger} introduction with the following addition. If the
domain portion of the address begins with the '~' character, the
L{Messenger} will interpret the domain as host/port, bind to it,
and listen for incoming messages. For example "~0.0.0.0",
"amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
local interface and listen for incoming messages with the last
variant only permitting incoming SSL connections.
@type source: string
@param source: the source of messages to subscribe to
"""
sub_impl = pn_messenger_subscribe(self._mng, source)
if not sub_impl:
self._check(pn_error_code(pn_messenger_error(self._mng)))
raise MessengerException("Cannot subscribe to %s"%source)
return Subscription(sub_impl)
def put(self, message):
"""
Places the content contained in the message onto the outgoing
queue of the L{Messenger}. This method will never block, however
it will send any unblocked L{Messages<Message>} in the outgoing
queue immediately and leave any blocked L{Messages<Message>}
remaining in the outgoing queue. The L{send} call may be used to
block until the outgoing queue is empty. The L{outgoing} property
may be used to check the depth of the outgoing queue.
When the content in a given L{Message} object is copied to the outgoing
message queue, you may then modify or discard the L{Message} object
without having any impact on the content in the outgoing queue.
This method returns an outgoing tracker for the L{Message}. The tracker
can be used to determine the delivery status of the L{Message}.
@type message: Message
@param message: the message to place in the outgoing queue
@return: a tracker
"""
message._pre_encode()
self._check(pn_messenger_put(self._mng, message._msg))
return pn_messenger_outgoing_tracker(self._mng)
def status(self, tracker):
"""
Gets the last known remote state of the delivery associated with
the given tracker.
@type tracker: tracker
@param tracker: the tracker whose status is to be retrieved
@return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED
"""
disp = pn_messenger_status(self._mng, tracker);
return STATUSES.get(disp, disp)
def buffered(self, tracker):
"""
Checks if the delivery associated with the given tracker is still
waiting to be sent.
@type tracker: tracker
@param tracker: the tracker whose status is to be retrieved
@return: true if delivery is still buffered
"""
return pn_messenger_buffered(self._mng, tracker);
def settle(self, tracker=None):
"""
Frees a L{Messenger} from tracking the status associated with a given
tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up
to the most recent will be settled.
"""
if tracker is None:
tracker = pn_messenger_outgoing_tracker(self._mng)
flags = PN_CUMULATIVE
else:
flags = 0
self._check(pn_messenger_settle(self._mng, tracker, flags))
def send(self, n=-1):
"""
This call will block until the indicated number of L{messages<Message>}
have been sent, or until the operation times out. If n is -1 this call will
block until all outgoing L{messages<Message>} have been sent. If n is 0 then
this call will send whatever it can without blocking.
"""
self._check(pn_messenger_send(self._mng, n))
def recv(self, n=None):
"""
Receives up to I{n} L{messages<Message>} into the incoming queue. If no value
for I{n} is supplied, this call will receive as many L{messages<Message>} as it
can buffer internally. If the L{Messenger} is in blocking mode, this
call will block until at least one L{Message} is available in the
incoming queue.
"""
if n is None:
n = -1
self._check(pn_messenger_recv(self._mng, n))
def work(self, timeout=None):
"""
Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}.
This will block for the indicated timeout.
This method may also do I/O work other than sending and receiving
L{messages<Message>}. For example, closing connections after messenger.L{stop}()
has been called.
"""
if timeout is None:
t = -1
else:
t = secs2millis(timeout)
err = pn_messenger_work(self._mng, t)
if (err == PN_TIMEOUT):
return False
else:
self._check(err)
return True
@property
def receiving(self):
return pn_messenger_receiving(self._mng)
def interrupt(self):
"""
The L{Messenger} interface is single-threaded.
This is the only L{Messenger} function intended to be called
from outside of the L{Messenger} thread.
Call this from a non-messenger thread to interrupt
a L{Messenger} that is blocking.
This will cause any in-progress blocking call to throw
the L{Interrupt} exception. If there is no currently blocking
call, then the next blocking call will be affected, even if it
is within the same thread that interrupt was called from.
"""
self._check(pn_messenger_interrupt(self._mng))
def get(self, message=None):
"""
Moves the message from the head of the incoming message queue into
the supplied message object. Any content in the message will be
overwritten.
A tracker for the incoming L{Message} is returned. The tracker can
later be used to communicate your acceptance or rejection of the
L{Message}.
If None is passed in for the L{Message} object, the L{Message}
popped from the head of the queue is discarded.
@type message: Message
@param message: the destination message object
@return: a tracker
"""
if message is None:
impl = None
else:
impl = message._msg
self._check(pn_messenger_get(self._mng, impl))
if message is not None:
message._post_decode()
return pn_messenger_incoming_tracker(self._mng)
def accept(self, tracker=None):
"""
Signal the sender that you have acted on the L{Message}
pointed to by the tracker. If no tracker is supplied,
then all messages that have been returned by the L{get}
method are accepted, except those that have already been
auto-settled by passing beyond your incoming window size.
@type tracker: tracker
@param tracker: a tracker as returned by get
"""
if tracker is None:
tracker = pn_messenger_incoming_tracker(self._mng)
flags = PN_CUMULATIVE
else:
flags = 0
self._check(pn_messenger_accept(self._mng, tracker, flags))
def reject(self, tracker=None):
"""
Rejects the L{Message} indicated by the tracker. If no tracker
is supplied, all messages that have been returned by the L{get}
method are rejected, except those that have already been auto-settled
by passing beyond your outgoing window size.
@type tracker: tracker
@param tracker: a tracker as returned by get
"""
if tracker is None:
tracker = pn_messenger_incoming_tracker(self._mng)
flags = PN_CUMULATIVE
else:
flags = 0
self._check(pn_messenger_reject(self._mng, tracker, flags))
@property
def outgoing(self):
"""
The outgoing queue depth.
"""
return pn_messenger_outgoing(self._mng)
@property
def incoming(self):
"""
The incoming queue depth.
"""
return pn_messenger_incoming(self._mng)
def route(self, pattern, address):
"""
Adds a routing rule to a L{Messenger's<Messenger>} internal routing table.
The route procedure may be used to influence how a L{Messenger} will
internally treat a given address or class of addresses. Every call
to the route procedure will result in L{Messenger} appending a routing
rule to its internal routing table.
Whenever a L{Message} is presented to a L{Messenger} for delivery, it
will match the address of this message against the set of routing
rules in order. The first rule to match will be triggered, and
instead of routing based on the address presented in the message,
the L{Messenger} will route based on the address supplied in the rule.
The pattern matching syntax supports two types of matches, a '%'
will match any character except a '/', and a '*' will match any
character including a '/'.
A routing address is specified as a normal AMQP address, however it
may additionally use substitution variables from the pattern match
that triggered the rule.
Any message sent to "foo" will be routed to "amqp://foo.com":
>>> messenger.route("foo", "amqp://foo.com");
Any message sent to "foobar" will be routed to
"amqp://foo.com/bar":
>>> messenger.route("foobar", "amqp://foo.com/bar");
Any message sent to bar/<path> will be routed to the corresponding
path within the amqp://bar.com domain:
>>> messenger.route("bar/*", "amqp://bar.com/$1");
Route all L{messages<Message>} over TLS:
>>> messenger.route("amqp:*", "amqps:$1")
Supply credentials for foo.com:
>>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
Supply credentials for all domains:
>>> messenger.route("amqp://*", "amqp://user:password@$1");
Route all addresses through a single proxy while preserving the
original destination:
>>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
Route any address through a single broker:
>>> messenger.route("*", "amqp://user:password@broker/$1");
"""
self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
def rewrite(self, pattern, address):
"""
Similar to route(), except that the destination of
the L{Message} is determined before the message address is rewritten.
The outgoing address is only rewritten after routing has been
finalized. If a message has an outgoing address of
"amqp://0.0.0.0:5678", and a rewriting rule that changes its
outgoing address to "foo", it will still arrive at the peer that
is listening on "amqp://0.0.0.0:5678", but when it arrives there,
the receiver will see its outgoing address as "foo".
The default rewrite rule removes username and password from addresses
before they are transmitted.
"""
self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
def selectable(self):
return Selectable.wrap(pn_messenger_selectable(self._mng))
@property
def deadline(self):
tstamp = pn_messenger_deadline(self._mng)
if tstamp:
return millis2secs(tstamp)
else:
return None
class Message(object):
"""The L{Message} class is a mutable holder of message content.
@ivar instructions: delivery instructions for the message
@type instructions: dict
@ivar annotations: infrastructure defined message annotations
@type annotations: dict
@ivar properties: application defined message properties
@type properties: dict
@ivar body: message body
@type body: bytes | unicode | dict | list | int | long | float | UUID
"""
DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
def __init__(self, body=None, **kwargs):
"""
@param kwargs: Message property name/value pairs to initialise the Message
"""
self._msg = pn_message()
self._id = Data(pn_message_id(self._msg))
self._correlation_id = Data(pn_message_correlation_id(self._msg))
self.instructions = None
self.annotations = None
self.properties = None
self.body = body
for k,v in _compat.iteritems(kwargs):
getattr(self, k) # Raise exception if it's not a valid attribute.
setattr(self, k, v)
def __del__(self):
if hasattr(self, "_msg"):
pn_message_free(self._msg)
del self._msg
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, MessageException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
else:
return err
def _pre_encode(self):
inst = Data(pn_message_instructions(self._msg))
ann = Data(pn_message_annotations(self._msg))
props = Data(pn_message_properties(self._msg))
body = Data(pn_message_body(self._msg))
inst.clear()
if self.instructions is not None:
inst.put_object(self.instructions)
ann.clear()
if self.annotations is not None:
ann.put_object(self.annotations)
props.clear()
if self.properties is not None:
props.put_object(self.properties)
body.clear()
if self.body is not None:
body.put_object(self.body)
def _post_decode(self):
inst = Data(pn_message_instructions(self._msg))
ann = Data(pn_message_annotations(self._msg))
props = Data(pn_message_properties(self._msg))
body = Data(pn_message_body(self._msg))
if inst.next():
self.instructions = inst.get_object()
else:
self.instructions = None
if ann.next():
self.annotations = ann.get_object()
else:
self.annotations = None
if props.next():
self.properties = props.get_object()
else:
self.properties = None
if body.next():
self.body = body.get_object()
else:
self.body = None
def clear(self):
"""
Clears the contents of the L{Message}. All fields will be reset to
their default values.
"""
pn_message_clear(self._msg)
self.instructions = None
self.annotations = None
self.properties = None
self.body = None
def _is_inferred(self):
return pn_message_is_inferred(self._msg)
def _set_inferred(self, value):
self._check(pn_message_set_inferred(self._msg, bool(value)))
inferred = property(_is_inferred, _set_inferred, doc="""
The inferred flag for a message indicates how the message content
is encoded into AMQP sections. If inferred is true then binary and
list values in the body of the message will be encoded as AMQP DATA
and AMQP SEQUENCE sections, respectively. If inferred is false,
then all values in the body of the message will be encoded as AMQP
VALUE sections regardless of their type.
""")
def _is_durable(self):
return pn_message_is_durable(self._msg)
def _set_durable(self, value):
self._check(pn_message_set_durable(self._msg, bool(value)))
durable = property(_is_durable, _set_durable,
doc="""
The durable property indicates that the message should be held durably
by any intermediaries taking responsibility for the message.
""")
def _get_priority(self):
return pn_message_get_priority(self._msg)
def _set_priority(self, value):
self._check(pn_message_set_priority(self._msg, value))
priority = property(_get_priority, _set_priority,
doc="""
The priority of the message.
""")
def _get_ttl(self):
return millis2secs(pn_message_get_ttl(self._msg))
def _set_ttl(self, value):
self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
ttl = property(_get_ttl, _set_ttl,
doc="""
The time to live of the message measured in seconds. Expired messages
may be dropped.
""")
def _is_first_acquirer(self):
return pn_message_is_first_acquirer(self._msg)
def _set_first_acquirer(self, value):
self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
doc="""
True iff the recipient is the first to acquire the message.
""")
def _get_delivery_count(self):
return pn_message_get_delivery_count(self._msg)
def _set_delivery_count(self, value):
self._check(pn_message_set_delivery_count(self._msg, value))
delivery_count = property(_get_delivery_count, _set_delivery_count,
doc="""
The number of delivery attempts made for this message.
""")
def _get_id(self):
return self._id.get_object()
def _set_id(self, value):
if type(value) in _compat.INT_TYPES:
value = ulong(value)
self._id.rewind()
self._id.put_object(value)
id = property(_get_id, _set_id,
doc="""
The id of the message.
""")
def _get_user_id(self):
return pn_message_get_user_id(self._msg)
def _set_user_id(self, value):
self._check(pn_message_set_user_id(self._msg, value))
user_id = property(_get_user_id, _set_user_id,
doc="""
The user id of the message creator.
""")
def _get_address(self):
return utf82unicode(pn_message_get_address(self._msg))
def _set_address(self, value):
self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
address = property(_get_address, _set_address,
doc="""
The address of the message.
""")
def _get_subject(self):
return utf82unicode(pn_message_get_subject(self._msg))
def _set_subject(self, value):
self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
subject = property(_get_subject, _set_subject,
doc="""
The subject of the message.
""")
def _get_reply_to(self):
return utf82unicode(pn_message_get_reply_to(self._msg))
def _set_reply_to(self, value):
self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
reply_to = property(_get_reply_to, _set_reply_to,
doc="""
The reply-to address for the message.
""")
def _get_correlation_id(self):
return self._correlation_id.get_object()
def _set_correlation_id(self, value):
if type(value) in _compat.INT_TYPES:
value = ulong(value)
self._correlation_id.rewind()
self._correlation_id.put_object(value)
correlation_id = property(_get_correlation_id, _set_correlation_id,
doc="""
The correlation-id for the message.
""")
def _get_content_type(self):
return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
def _set_content_type(self, value):
self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
content_type = property(_get_content_type, _set_content_type,
doc="""
The content-type of the message.
""")
def _get_content_encoding(self):
return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
def _set_content_encoding(self, value):
self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
content_encoding = property(_get_content_encoding, _set_content_encoding,
doc="""
The content-encoding of the message.
""")
def _get_expiry_time(self):
return millis2secs(pn_message_get_expiry_time(self._msg))
def _set_expiry_time(self, value):
self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
expiry_time = property(_get_expiry_time, _set_expiry_time,
doc="""
The expiry time of the message.
""")
def _get_creation_time(self):
return millis2secs(pn_message_get_creation_time(self._msg))
def _set_creation_time(self, value):
self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
creation_time = property(_get_creation_time, _set_creation_time,
doc="""
The creation time of the message.
""")
def _get_group_id(self):
return utf82unicode(pn_message_get_group_id(self._msg))
def _set_group_id(self, value):
self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
group_id = property(_get_group_id, _set_group_id,
doc="""
The group id of the message.
""")
def _get_group_sequence(self):
return pn_message_get_group_sequence(self._msg)
def _set_group_sequence(self, value):
self._check(pn_message_set_group_sequence(self._msg, value))
group_sequence = property(_get_group_sequence, _set_group_sequence,
doc="""
The sequence of the message within its group.
""")
def _get_reply_to_group_id(self):
return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
def _set_reply_to_group_id(self, value):
self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
doc="""
The group-id for any replies.
""")
def encode(self):
self._pre_encode()
sz = 16
while True:
err, data = pn_message_encode(self._msg, sz)
if err == PN_OVERFLOW:
sz *= 2
continue
else:
self._check(err)
return data
def decode(self, data):
self._check(pn_message_decode(self._msg, data))
self._post_decode()
def send(self, sender, tag=None):
dlv = sender.delivery(tag or sender.delivery_tag())
encoded = self.encode()
sender.stream(encoded)
sender.advance()
if sender.snd_settle_mode == Link.SND_SETTLED:
dlv.settle()
return dlv
def recv(self, link):
"""
Receives and decodes the message content for the current delivery
from the link. Upon success it will return the current delivery
for the link. If there is no current delivery, or if the current
delivery is incomplete, or if the link is not a receiver, it will
return None.
@type link: Link
@param link: the link to receive a message from
@return the delivery associated with the decoded message (or None)
"""
if link.is_sender: return None
dlv = link.current
if not dlv or dlv.partial: return None
dlv.encoded = link.recv(dlv.pending)
link.advance()
# the sender has already forgotten about the delivery, so we might
# as well too
if link.remote_snd_settle_mode == Link.SND_SETTLED:
dlv.settle()
self.decode(dlv.encoded)
return dlv
def __repr2__(self):
props = []
for attr in ("inferred", "address", "reply_to", "durable", "ttl",
"priority", "first_acquirer", "delivery_count", "id",
"correlation_id", "user_id", "group_id", "group_sequence",
"reply_to_group_id", "instructions", "annotations",
"properties", "body"):
value = getattr(self, attr)
if value: props.append("%s=%r" % (attr, value))
return "Message(%s)" % ", ".join(props)
def __repr__(self):
tmp = pn_string(None)
err = pn_inspect(self._msg, tmp)
result = pn_string_get(tmp)
pn_free(tmp)
self._check(err)
return result
class Subscription(object):
def __init__(self, impl):
self._impl = impl
@property
def address(self):
return pn_subscription_address(self._impl)
_DEFAULT = object()
class Selectable(Wrapper):
@staticmethod
def wrap(impl):
if impl is None:
return None
else:
return Selectable(impl)
def __init__(self, impl):
Wrapper.__init__(self, impl, pn_selectable_attachments)
def _init(self):
pass
def fileno(self, fd = _DEFAULT):
if fd is _DEFAULT:
return pn_selectable_get_fd(self._impl)
elif fd is None:
pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
else:
pn_selectable_set_fd(self._impl, fd)
def _is_reading(self):
return pn_selectable_is_reading(self._impl)
def _set_reading(self, val):
pn_selectable_set_reading(self._impl, bool(val))
reading = property(_is_reading, _set_reading)
def _is_writing(self):
return pn_selectable_is_writing(self._impl)
def _set_writing(self, val):
pn_selectable_set_writing(self._impl, bool(val))
writing = property(_is_writing, _set_writing)
def _get_deadline(self):
tstamp = pn_selectable_get_deadline(self._impl)
if tstamp:
return millis2secs(tstamp)
else:
return None
def _set_deadline(self, deadline):
pn_selectable_set_deadline(self._impl, secs2millis(deadline))
deadline = property(_get_deadline, _set_deadline)
def readable(self):
pn_selectable_readable(self._impl)
def writable(self):
pn_selectable_writable(self._impl)
def expired(self):
pn_selectable_expired(self._impl)
def _is_registered(self):
return pn_selectable_is_registered(self._impl)
def _set_registered(self, registered):
pn_selectable_set_registered(self._impl, registered)
registered = property(_is_registered, _set_registered,
doc="""
The registered property may be get/set by an I/O polling system to
indicate whether the fd has been registered or not.
""")
@property
def is_terminal(self):
return pn_selectable_is_terminal(self._impl)
def terminate(self):
pn_selectable_terminate(self._impl)
def release(self):
pn_selectable_release(self._impl)
class DataException(ProtonException):
"""
The DataException class is the root of the Data exception hierarchy.
All exceptions raised by the Data class extend this exception.
"""
pass
class UnmappedType:
def __init__(self, msg):
self.msg = msg
def __repr__(self):
return "UnmappedType(%s)" % self.msg
class ulong(long):
def __repr__(self):
return "ulong(%s)" % long.__repr__(self)
class timestamp(long):
def __repr__(self):
return "timestamp(%s)" % long.__repr__(self)
class symbol(unicode):
def __repr__(self):
return "symbol(%s)" % unicode.__repr__(self)
class char(unicode):
def __repr__(self):
return "char(%s)" % unicode.__repr__(self)
class byte(int):
def __repr__(self):
return "byte(%s)" % int.__repr__(self)
class short(int):
def __repr__(self):
return "short(%s)" % int.__repr__(self)
class int32(int):
def __repr__(self):
return "int32(%s)" % int.__repr__(self)
class ubyte(int):
def __repr__(self):
return "ubyte(%s)" % int.__repr__(self)
class ushort(int):
def __repr__(self):
return "ushort(%s)" % int.__repr__(self)
class uint(long):
def __repr__(self):
return "uint(%s)" % long.__repr__(self)
class float32(float):
def __repr__(self):
return "float32(%s)" % float.__repr__(self)
class decimal32(int):
def __repr__(self):
return "decimal32(%s)" % int.__repr__(self)
class decimal64(long):
def __repr__(self):
return "decimal64(%s)" % long.__repr__(self)
class decimal128(bytes):
def __repr__(self):
return "decimal128(%s)" % bytes.__repr__(self)
class Described(object):
def __init__(self, descriptor, value):
self.descriptor = descriptor
self.value = value
def __repr__(self):
return "Described(%r, %r)" % (self.descriptor, self.value)
def __eq__(self, o):
if isinstance(o, Described):
return self.descriptor == o.descriptor and self.value == o.value
else:
return False
UNDESCRIBED = Constant("UNDESCRIBED")
class Array(object):
def __init__(self, descriptor, type, *elements):
self.descriptor = descriptor
self.type = type
self.elements = elements
def __iter__(self):
return iter(self.elements)
def __repr__(self):
if self.elements:
els = ", %s" % (", ".join(map(repr, self.elements)))
else:
els = ""
return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
def __eq__(self, o):
if isinstance(o, Array):
return self.descriptor == o.descriptor and \
self.type == o.type and self.elements == o.elements
else:
return False
class Data:
"""
The L{Data} class provides an interface for decoding, extracting,
creating, and encoding arbitrary AMQP data. A L{Data} object
contains a tree of AMQP values. Leaf nodes in this tree correspond
to scalars in the AMQP type system such as L{ints<INT>} or
L{strings<STRING>}. Non-leaf nodes in this tree correspond to
compound values in the AMQP type system such as L{lists<LIST>},
L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
The root node of the tree is the L{Data} object itself and can have
an arbitrary number of children.
A L{Data} object maintains the notion of the current sibling node
and a current parent node. Siblings are ordered within their parent.
Values are accessed and/or added by using the L{next}, L{prev},
L{enter}, and L{exit} methods to navigate to the desired location in
the tree and using the supplied variety of put_*/get_* methods to
access or add a value of the desired type.
The put_* methods will always add a value I{after} the current node
in the tree. If the current node has a next sibling the put_* method
will overwrite the value on this node. If there is no current node
or the current node has no next sibling then one will be added. The
put_* methods always set the added/modified node to the current
node. The get_* methods read the value of the current node and do
not change which node is current.
The following types of scalar values are supported:
- L{NULL}
- L{BOOL}
- L{UBYTE}
- L{USHORT}
- L{SHORT}
- L{UINT}
- L{INT}
- L{ULONG}
- L{LONG}
- L{FLOAT}
- L{DOUBLE}
- L{BINARY}
- L{STRING}
- L{SYMBOL}
The following types of compound values are supported:
- L{DESCRIBED}
- L{ARRAY}
- L{LIST}
- L{MAP}
"""
NULL = PN_NULL; "A null value."
BOOL = PN_BOOL; "A boolean value."
UBYTE = PN_UBYTE; "An unsigned byte value."
BYTE = PN_BYTE; "A signed byte value."
USHORT = PN_USHORT; "An unsigned short value."
SHORT = PN_SHORT; "A short value."
UINT = PN_UINT; "An unsigned int value."
INT = PN_INT; "A signed int value."
CHAR = PN_CHAR; "A character value."
ULONG = PN_ULONG; "An unsigned long value."
LONG = PN_LONG; "A signed long value."
TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
FLOAT = PN_FLOAT; "A float value."
DOUBLE = PN_DOUBLE; "A double value."
DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
UUID = PN_UUID; "A UUID value."
BINARY = PN_BINARY; "A binary string."
STRING = PN_STRING; "A unicode string."
SYMBOL = PN_SYMBOL; "A symbolic string."
DESCRIBED = PN_DESCRIBED; "A described value."
ARRAY = PN_ARRAY; "An array value."
LIST = PN_LIST; "A list value."
MAP = PN_MAP; "A map value."
type_names = {
NULL: "null",
BOOL: "bool",
BYTE: "byte",
UBYTE: "ubyte",
SHORT: "short",
USHORT: "ushort",
INT: "int",
UINT: "uint",
CHAR: "char",
LONG: "long",
ULONG: "ulong",
TIMESTAMP: "timestamp",
FLOAT: "float",
DOUBLE: "double",
DECIMAL32: "decimal32",
DECIMAL64: "decimal64",
DECIMAL128: "decimal128",
UUID: "uuid",
BINARY: "binary",
STRING: "string",
SYMBOL: "symbol",
DESCRIBED: "described",
ARRAY: "array",
LIST: "list",
MAP: "map"
}
@classmethod
def type_name(type): return Data.type_names[type]
def __init__(self, capacity=16):
if type(capacity) in _compat.INT_TYPES:
self._data = pn_data(capacity)
self._free = True
else:
self._data = capacity
self._free = False
def __del__(self):
if self._free and hasattr(self, "_data"):
pn_data_free(self._data)
del self._data
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, DataException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
else:
return err
def clear(self):
"""
Clears the data object.
"""
pn_data_clear(self._data)
def rewind(self):
"""
Clears current node and sets the parent to the root node. Clearing the
current node sets it _before_ the first node, calling next() will advance to
the first node.
"""
assert self._data is not None
pn_data_rewind(self._data)
def next(self):
"""
Advances the current node to its next sibling and returns its
type. If there is no next sibling the current node remains
unchanged and None is returned.
"""
found = pn_data_next(self._data)
if found:
return self.type()
else:
return None
def prev(self):
"""
Advances the current node to its previous sibling and returns its
type. If there is no previous sibling the current node remains
unchanged and None is returned.
"""
found = pn_data_prev(self._data)
if found:
return self.type()
else:
return None
def enter(self):
"""
Sets the parent node to the current node and clears the current node.
Clearing the current node sets it _before_ the first child,
call next() advances to the first child.
"""
return pn_data_enter(self._data)
def exit(self):
"""
Sets the current node to the parent node and the parent node to
its own parent.
"""
return pn_data_exit(self._data)
def lookup(self, name):
return pn_data_lookup(self._data, name)
def narrow(self):
pn_data_narrow(self._data)
def widen(self):
pn_data_widen(self._data)
def type(self):
"""
Returns the type of the current node.
"""
dtype = pn_data_type(self._data)
if dtype == -1:
return None
else:
return dtype
def encoded_size(self):
"""
Returns the size in bytes needed to encode the data in AMQP format.
"""
return pn_data_encoded_size(self._data)
def encode(self):
"""
Returns a representation of the data encoded in AMQP format.
"""
size = 1024
while True:
cd, enc = pn_data_encode(self._data, size)
if cd == PN_OVERFLOW:
size *= 2
elif cd >= 0:
return enc
else:
self._check(cd)
def decode(self, encoded):
"""
Decodes the first value from supplied AMQP data and returns the
number of bytes consumed.
@type encoded: binary
@param encoded: AMQP encoded binary data
"""
return self._check(pn_data_decode(self._data, encoded))
def put_list(self):
"""
Puts a list value. Elements may be filled by entering the list
node and putting element values.
>>> data = Data()
>>> data.put_list()
>>> data.enter()
>>> data.put_int(1)
>>> data.put_int(2)
>>> data.put_int(3)
>>> data.exit()
"""
self._check(pn_data_put_list(self._data))
def put_map(self):
"""
Puts a map value. Elements may be filled by entering the map node
and putting alternating key value pairs.
>>> data = Data()
>>> data.put_map()
>>> data.enter()
>>> data.put_string("key")
>>> data.put_string("value")
>>> data.exit()
"""
self._check(pn_data_put_map(self._data))
def put_array(self, described, element_type):
"""
Puts an array value. Elements may be filled by entering the array
node and putting the element values. The values must all be of the
specified array element type. If an array is described then the
first child value of the array is the descriptor and may be of any
type.
>>> data = Data()
>>>
>>> data.put_array(False, Data.INT)
>>> data.enter()
>>> data.put_int(1)
>>> data.put_int(2)
>>> data.put_int(3)
>>> data.exit()
>>>
>>> data.put_array(True, Data.DOUBLE)
>>> data.enter()
>>> data.put_symbol("array-descriptor")
>>> data.put_double(1.1)
>>> data.put_double(1.2)
>>> data.put_double(1.3)
>>> data.exit()
@type described: bool
@param described: specifies whether the array is described
@type element_type: int
@param element_type: the type of the array elements
"""
self._check(pn_data_put_array(self._data, described, element_type))
def put_described(self):
"""
Puts a described value. A described node has two children, the
descriptor and the value. These are specified by entering the node
and putting the desired values.
>>> data = Data()
>>> data.put_described()
>>> data.enter()
>>> data.put_symbol("value-descriptor")
>>> data.put_string("the value")
>>> data.exit()
"""
self._check(pn_data_put_described(self._data))
def put_null(self):
"""
Puts a null value.
"""
self._check(pn_data_put_null(self._data))
def put_bool(self, b):
"""
Puts a boolean value.
@param b: a boolean value
"""
self._check(pn_data_put_bool(self._data, b))
def put_ubyte(self, ub):
"""
Puts an unsigned byte value.
@param ub: an integral value
"""
self._check(pn_data_put_ubyte(self._data, ub))
def put_byte(self, b):
"""
Puts a signed byte value.
@param b: an integral value
"""
self._check(pn_data_put_byte(self._data, b))
def put_ushort(self, us):
"""
Puts an unsigned short value.
@param us: an integral value.
"""
self._check(pn_data_put_ushort(self._data, us))
def put_short(self, s):
"""
Puts a signed short value.
@param s: an integral value
"""
self._check(pn_data_put_short(self._data, s))
def put_uint(self, ui):
"""
Puts an unsigned int value.
@param ui: an integral value
"""
self._check(pn_data_put_uint(self._data, ui))
def put_int(self, i):
"""
Puts a signed int value.
@param i: an integral value
"""
self._check(pn_data_put_int(self._data, i))
def put_char(self, c):
"""
Puts a char value.
@param c: a single character
"""
self._check(pn_data_put_char(self._data, ord(c)))
def put_ulong(self, ul):
"""
Puts an unsigned long value.
@param ul: an integral value
"""
self._check(pn_data_put_ulong(self._data, ul))
def put_long(self, l):
"""
Puts a signed long value.
@param l: an integral value
"""
self._check(pn_data_put_long(self._data, l))
def put_timestamp(self, t):
"""
Puts a timestamp value.
@param t: an integral value
"""
self._check(pn_data_put_timestamp(self._data, t))
def put_float(self, f):
"""
Puts a float value.
@param f: a floating point value
"""
self._check(pn_data_put_float(self._data, f))
def put_double(self, d):
"""
Puts a double value.
@param d: a floating point value.
"""
self._check(pn_data_put_double(self._data, d))
def put_decimal32(self, d):
"""
Puts a decimal32 value.
@param d: a decimal32 value
"""
self._check(pn_data_put_decimal32(self._data, d))
def put_decimal64(self, d):
"""
Puts a decimal64 value.
@param d: a decimal64 value
"""
self._check(pn_data_put_decimal64(self._data, d))
def put_decimal128(self, d):
"""
Puts a decimal128 value.
@param d: a decimal128 value
"""
self._check(pn_data_put_decimal128(self._data, d))
def put_uuid(self, u):
"""
Puts a UUID value.
@param u: a uuid value
"""
self._check(pn_data_put_uuid(self._data, u.bytes))
def put_binary(self, b):
"""
Puts a binary value.
@type b: binary
@param b: a binary value
"""
self._check(pn_data_put_binary(self._data, b))
def put_string(self, s):
"""
Puts a unicode value.
@type s: unicode
@param s: a unicode value
"""
self._check(pn_data_put_string(self._data, s.encode("utf8")))
def put_symbol(self, s):
"""
Puts a symbolic value.
@type s: string
@param s: the symbol name
"""
self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
def get_list(self):
"""
If the current node is a list, return the number of elements,
otherwise return zero. List elements can be accessed by entering
the list.
>>> count = data.get_list()
>>> data.enter()
>>> for i in range(count):
... type = data.next()
... if type == Data.STRING:
... print data.get_string()
... elif type == ...:
... ...
>>> data.exit()
"""
return pn_data_get_list(self._data)
def get_map(self):
"""
If the current node is a map, return the number of child elements,
otherwise return zero. Key value pairs can be accessed by entering
the map.
>>> count = data.get_map()
>>> data.enter()
>>> for i in range(count/2):
... type = data.next()
... if type == Data.STRING:
... print data.get_string()
... elif type == ...:
... ...
>>> data.exit()
"""
return pn_data_get_map(self._data)
def get_array(self):
"""
If the current node is an array, return a tuple of the element
count, a boolean indicating whether the array is described, and
the type of each element, otherwise return (0, False, None). Array
data can be accessed by entering the array.
>>> # read an array of strings with a symbolic descriptor
>>> count, described, type = data.get_array()
>>> data.enter()
>>> data.next()
>>> print "Descriptor:", data.get_symbol()
>>> for i in range(count):
... data.next()
... print "Element:", data.get_string()
>>> data.exit()
"""
count = pn_data_get_array(self._data)
described = pn_data_is_array_described(self._data)
type = pn_data_get_array_type(self._data)
if type == -1:
type = None
return count, described, type
def is_described(self):
"""
Checks if the current node is a described value. The descriptor
and value may be accessed by entering the described value.
>>> # read a symbolically described string
>>> assert data.is_described() # will error if the current node is not described
>>> data.enter()
>>> data.next()
>>> print data.get_symbol()
>>> data.next()
>>> print data.get_string()
>>> data.exit()
"""
return pn_data_is_described(self._data)
def is_null(self):
"""
Checks if the current node is a null.
"""
return pn_data_is_null(self._data)
def get_bool(self):
"""
If the current node is a boolean, returns its value, returns False
otherwise.
"""
return pn_data_get_bool(self._data)
def get_ubyte(self):
"""
If the current node is an unsigned byte, returns its value,
returns 0 otherwise.
"""
return ubyte(pn_data_get_ubyte(self._data))
def get_byte(self):
"""
If the current node is a signed byte, returns its value, returns 0
otherwise.
"""
return byte(pn_data_get_byte(self._data))
def get_ushort(self):
"""
If the current node is an unsigned short, returns its value,
returns 0 otherwise.
"""
return ushort(pn_data_get_ushort(self._data))
def get_short(self):
"""
If the current node is a signed short, returns its value, returns
0 otherwise.
"""
return short(pn_data_get_short(self._data))
def get_uint(self):
"""
If the current node is an unsigned int, returns its value, returns
0 otherwise.
"""
return uint(pn_data_get_uint(self._data))
def get_int(self):
"""
If the current node is a signed int, returns its value, returns 0
otherwise.
"""
return int32(pn_data_get_int(self._data))
def get_char(self):
"""
If the current node is a char, returns its value, returns 0
otherwise.
"""
return char(_compat.unichar(pn_data_get_char(self._data)))
def get_ulong(self):
"""
If the current node is an unsigned long, returns its value,
returns 0 otherwise.
"""
return ulong(pn_data_get_ulong(self._data))
def get_long(self):
"""
If the current node is an signed long, returns its value, returns
0 otherwise.
"""
return long(pn_data_get_long(self._data))
def get_timestamp(self):
"""
If the current node is a timestamp, returns its value, returns 0
otherwise.
"""
return timestamp(pn_data_get_timestamp(self._data))
def get_float(self):
"""
If the current node is a float, returns its value, raises 0
otherwise.
"""
return float32(pn_data_get_float(self._data))
def get_double(self):
"""
If the current node is a double, returns its value, returns 0
otherwise.
"""
return pn_data_get_double(self._data)
# XXX: need to convert
def get_decimal32(self):
"""
If the current node is a decimal32, returns its value, returns 0
otherwise.
"""
return decimal32(pn_data_get_decimal32(self._data))
# XXX: need to convert
def get_decimal64(self):
"""
If the current node is a decimal64, returns its value, returns 0
otherwise.
"""
return decimal64(pn_data_get_decimal64(self._data))
# XXX: need to convert
def get_decimal128(self):
"""
If the current node is a decimal128, returns its value, returns 0
otherwise.
"""
return decimal128(pn_data_get_decimal128(self._data))
def get_uuid(self):
"""
If the current node is a UUID, returns its value, returns None
otherwise.
"""
if pn_data_type(self._data) == Data.UUID:
return uuid.UUID(bytes=pn_data_get_uuid(self._data))
else:
return None
def get_binary(self):
"""
If the current node is binary, returns its value, returns ""
otherwise.
"""
return pn_data_get_binary(self._data)
def get_string(self):
"""
If the current node is a string, returns its value, returns ""
otherwise.
"""
return pn_data_get_string(self._data).decode("utf8")
def get_symbol(self):
"""
If the current node is a symbol, returns its value, returns ""
otherwise.
"""
return symbol(pn_data_get_symbol(self._data).decode('ascii'))
def copy(self, src):
self._check(pn_data_copy(self._data, src._data))
def format(self):
sz = 16
while True:
err, result = pn_data_format(self._data, sz)
if err == PN_OVERFLOW:
sz *= 2
continue
else:
self._check(err)
return result
def dump(self):
pn_data_dump(self._data)
def put_dict(self, d):
self.put_map()
self.enter()
try:
for k, v in d.items():
self.put_object(k)
self.put_object(v)
finally:
self.exit()
def get_dict(self):
if self.enter():
try:
result = {}
while self.next():
k = self.get_object()
if self.next():
v = self.get_object()
else:
v = None
result[k] = v
finally:
self.exit()
return result
def put_sequence(self, s):
self.put_list()
self.enter()
try:
for o in s:
self.put_object(o)
finally:
self.exit()
def get_sequence(self):
if self.enter():
try:
result = []
while self.next():
result.append(self.get_object())
finally:
self.exit()
return result
def get_py_described(self):
if self.enter():
try:
self.next()
descriptor = self.get_object()
self.next()
value = self.get_object()
finally:
self.exit()
return Described(descriptor, value)
def put_py_described(self, d):
self.put_described()
self.enter()
try:
self.put_object(d.descriptor)
self.put_object(d.value)
finally:
self.exit()
def get_py_array(self):
"""
If the current node is an array, return an Array object
representing the array and its contents. Otherwise return None.
This is a convenience wrapper around get_array, enter, etc.
"""
count, described, type = self.get_array()
if type is None: return None
if self.enter():
try:
if described:
self.next()
descriptor = self.get_object()
else:
descriptor = UNDESCRIBED
elements = []
while self.next():
elements.append(self.get_object())
finally:
self.exit()
return Array(descriptor, type, *elements)
def put_py_array(self, a):
described = a.descriptor != UNDESCRIBED
self.put_array(described, a.type)
self.enter()
try:
if described:
self.put_object(a.descriptor)
for e in a.elements:
self.put_object(e)
finally:
self.exit()
put_mappings = {
None.__class__: lambda s, _: s.put_null(),
bool: put_bool,
ubyte: put_ubyte,
ushort: put_ushort,
uint: put_uint,
ulong: put_ulong,
byte: put_byte,
short: put_short,
int32: put_int,
long: put_long,
float32: put_float,
float: put_double,
decimal32: put_decimal32,
decimal64: put_decimal64,
decimal128: put_decimal128,
char: put_char,
timestamp: put_timestamp,
uuid.UUID: put_uuid,
bytes: put_binary,
unicode: put_string,
symbol: put_symbol,
list: put_sequence,
tuple: put_sequence,
dict: put_dict,
Described: put_py_described,
Array: put_py_array
}
# for python 3.x, long is merely an alias for int, but for python 2.x
# we need to add an explicit int since it is a different type
if int not in put_mappings:
put_mappings[int] = put_int
get_mappings = {
NULL: lambda s: None,
BOOL: get_bool,
BYTE: get_byte,
UBYTE: get_ubyte,
SHORT: get_short,
USHORT: get_ushort,
INT: get_int,
UINT: get_uint,
CHAR: get_char,
LONG: get_long,
ULONG: get_ulong,
TIMESTAMP: get_timestamp,
FLOAT: get_float,
DOUBLE: get_double,
DECIMAL32: get_decimal32,
DECIMAL64: get_decimal64,
DECIMAL128: get_decimal128,
UUID: get_uuid,
BINARY: get_binary,
STRING: get_string,
SYMBOL: get_symbol,
DESCRIBED: get_py_described,
ARRAY: get_py_array,
LIST: get_sequence,
MAP: get_dict
}
def put_object(self, obj):
putter = self.put_mappings[obj.__class__]
putter(self, obj)
def get_object(self):
type = self.type()
if type is None: return None
getter = self.get_mappings.get(type)
if getter:
return getter(self)
else:
return UnmappedType(str(type))
class ConnectionException(ProtonException):
pass
class Endpoint(object):
LOCAL_UNINIT = PN_LOCAL_UNINIT
REMOTE_UNINIT = PN_REMOTE_UNINIT
LOCAL_ACTIVE = PN_LOCAL_ACTIVE
REMOTE_ACTIVE = PN_REMOTE_ACTIVE
LOCAL_CLOSED = PN_LOCAL_CLOSED
REMOTE_CLOSED = PN_REMOTE_CLOSED
def _init(self):
self.condition = None
def _update_cond(self):
obj2cond(self.condition, self._get_cond_impl())
@property
def remote_condition(self):
return cond2obj(self._get_remote_cond_impl())
# the following must be provided by subclasses
def _get_cond_impl(self):
assert False, "Subclass must override this!"
def _get_remote_cond_impl(self):
assert False, "Subclass must override this!"
def _get_handler(self):
from . import reactor
ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
if ractor:
on_error = ractor.on_error
else:
on_error = None
record = self._get_attachments()
return WrappedHandler.wrap(pn_record_get_handler(record), on_error)
def _set_handler(self, handler):
from . import reactor
ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
if ractor:
on_error = ractor.on_error
else:
on_error = None
impl = _chandler(handler, on_error)
record = self._get_attachments()
pn_record_set_handler(record, impl)
pn_decref(impl)
handler = property(_get_handler, _set_handler)
@property
def transport(self):
return self.connection.transport
class Condition:
def __init__(self, name, description=None, info=None):
self.name = name
self.description = description
self.info = info
def __repr__(self):
return "Condition(%s)" % ", ".join([repr(x) for x in
(self.name, self.description, self.info)
if x])
def __eq__(self, o):
if not isinstance(o, Condition): return False
return self.name == o.name and \
self.description == o.description and \
self.info == o.info
def obj2cond(obj, cond):
pn_condition_clear(cond)
if obj:
pn_condition_set_name(cond, str(obj.name))
pn_condition_set_description(cond, obj.description)
info = Data(pn_condition_info(cond))
if obj.info:
info.put_object(obj.info)
def cond2obj(cond):
if pn_condition_is_set(cond):
return Condition(pn_condition_get_name(cond),
pn_condition_get_description(cond),
dat2obj(pn_condition_info(cond)))
else:
return None
def dat2obj(dimpl):
if dimpl:
d = Data(dimpl)
d.rewind()
d.next()
obj = d.get_object()
d.rewind()
return obj
def obj2dat(obj, dimpl):
if obj is not None:
d = Data(dimpl)
d.put_object(obj)
def secs2millis(secs):
return long(secs*1000)
def millis2secs(millis):
return float(millis)/1000.0
def timeout2millis(secs):
if secs is None: return PN_MILLIS_MAX
return secs2millis(secs)
def millis2timeout(millis):
if millis == PN_MILLIS_MAX: return None
return millis2secs(millis)
def unicode2utf8(string):
"""Some Proton APIs expect a null terminated string. Convert python text
types to UTF8 to avoid zero bytes introduced by other multi-byte encodings.
This method will throw if the string cannot be converted.
"""
if string is None:
return None
if _compat.IS_PY2:
if isinstance(string, unicode):
return string.encode('utf-8')
elif isinstance(string, str):
return string
else:
# decoding a string results in bytes
if isinstance(string, str):
string = string.encode('utf-8')
# fall through
if isinstance(string, bytes):
return string.decode('utf-8')
raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
def utf82unicode(string):
"""Covert C strings returned from proton-c into python unicode"""
if string is None:
return None
if isinstance(string, _compat.TEXT_TYPES):
# already unicode
return string
elif isinstance(string, _compat.BINARY_TYPES):
return string.decode('utf8')
else:
raise TypeError("Unrecognized string type")
class Connection(Wrapper, Endpoint):
"""
A representation of an AMQP connection
"""
@staticmethod
def wrap(impl):
if impl is None:
return None
else:
return Connection(impl)
def __init__(self, impl = pn_connection):
Wrapper.__init__(self, impl, pn_connection_attachments)
def _init(self):
Endpoint._init(self)
self.offered_capabilities = None
self.desired_capabilities = None
self.properties = None
def _get_attachments(self):
return pn_connection_attachments(self._impl)
@property
def connection(self):
return self
@property
def transport(self):
return Transport.wrap(pn_connection_transport(self._impl))
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, ConnectionException)
raise exc("[%s]: %s" % (err, pn_connection_error(self._impl)))
else:
return err
def _get_cond_impl(self):
return pn_connection_condition(self._impl)
def _get_remote_cond_impl(self):
return pn_connection_remote_condition(self._impl)
def collect(self, collector):
if collector is None:
pn_connection_collect(self._impl, None)
else:
pn_connection_collect(self._impl, collector._impl)
self._collector = weakref.ref(collector)
def _get_container(self):
return utf82unicode(pn_connection_get_container(self._impl))
def _set_container(self, name):
return pn_connection_set_container(self._impl, unicode2utf8(name))
container = property(_get_container, _set_container)
def _get_hostname(self):
return utf82unicode(pn_connection_get_hostname(self._impl))
def _set_hostname(self, name):
return pn_connection_set_hostname(self._impl, unicode2utf8(name))
hostname = property(_get_hostname, _set_hostname,
doc="""
Set the name of the host (either fully qualified or relative) to which this
connection is connecting to. This information may be used by the remote
peer to determine the correct back-end service to connect the client to.
This value will be sent in the Open performative, and will be used by SSL
and SASL layers to identify the peer.
""")
def _get_user(self):
return utf82unicode(pn_connection_get_user(self._impl))
def _set_user(self, name):
return pn_connection_set_user(self._impl, unicode2utf8(name))
user = property(_get_user, _set_user)
def _get_password(self):
return None
def _set_password(self, name):
return pn_connection_set_password(self._impl, unicode2utf8(name))
password = property(_get_password, _set_password)
@property
def remote_container(self):
"""The container identifier specified by the remote peer for this connection."""
return pn_connection_remote_container(self._impl)
@property
def remote_hostname(self):
"""The hostname specified by the remote peer for this connection."""
return pn_connection_remote_hostname(self._impl)
@property
def remote_offered_capabilities(self):
"""The capabilities offered by the remote peer for this connection."""
return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
@property
def remote_desired_capabilities(self):
"""The capabilities desired by the remote peer for this connection."""
return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
@property
def remote_properties(self):
"""The properties specified by the remote peer for this connection."""
return dat2obj(pn_connection_remote_properties(self._impl))
def open(self):
"""
Opens the connection.
In more detail, this moves the local state of the connection to
the ACTIVE state and triggers an open frame to be sent to the
peer. A connection is fully active once both peers have opened it.
"""
obj2dat(self.offered_capabilities,
pn_connection_offered_capabilities(self._impl))
obj2dat(self.desired_capabilities,
pn_connection_desired_capabilities(self._impl))
obj2dat(self.properties, pn_connection_properties(self._impl))
pn_connection_open(self._impl)
def close(self):
"""
Closes the connection.
In more detail, this moves the local state of the connection to
the CLOSED state and triggers a close frame to be sent to the
peer. A connection is fully closed once both peers have closed it.
"""
self._update_cond()
pn_connection_close(self._impl)
@property
def state(self):
"""
The state of the connection as a bit field. The state has a local
and a remote component. Each of these can be in one of three
states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
REMOTE_ACTIVE and REMOTE_CLOSED.
"""
return pn_connection_state(self._impl)
def session(self):
"""
Returns a new session on this connection.
"""
ssn = pn_session(self._impl)
if ssn is None:
raise(SessionException("Session allocation failed."))
else:
return Session(ssn)
def session_head(self, mask):
return Session.wrap(pn_session_head(self._impl, mask))
def link_head(self, mask):
return Link.wrap(pn_link_head(self._impl, mask))
@property
def work_head(self):
return Delivery.wrap(pn_work_head(self._impl))
@property
def error(self):
return pn_error_code(pn_connection_error(self._impl))
def free(self):
pn_connection_release(self._impl)
class SessionException(ProtonException):
pass
class Session(Wrapper, Endpoint):
@staticmethod
def wrap(impl):
if impl is None:
return None
else:
return Session(impl)
def __init__(self, impl):
Wrapper.__init__(self, impl, pn_session_attachments)
def _get_attachments(self):
return pn_session_attachments(self._impl)
def _get_cond_impl(self):
return pn_session_condition(self._impl)
def _get_remote_cond_impl(self):
return pn_session_remote_condition(self._impl)
def _get_incoming_capacity(self):
return pn_session_get_incoming_capacity(self._impl)
def _set_incoming_capacity(self, capacity):
pn_session_set_incoming_capacity(self._impl, capacity)
incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
def _get_outgoing_window(self):
return pn_session_get_outgoing_window(self._impl)
def _set_outgoing_window(self, window):
pn_session_set_outgoing_window(self._impl, window)
outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
@property
def outgoing_bytes(self):
return pn_session_outgoing_bytes(self._impl)
@property
def incoming_bytes(self):
return pn_session_incoming_bytes(self._impl)
def open(self):
pn_session_open(self._impl)
def close(self):
self._update_cond()
pn_session_close(self._impl)
def next(self, mask):
return Session.wrap(pn_session_next(self._impl, mask))
@property
def state(self):
return pn_session_state(self._impl)
@property
def connection(self):
return Connection.wrap(pn_session_connection(self._impl))
def sender(self, name):
return Sender(pn_sender(self._impl, unicode2utf8(name)))
def receiver(self, name):
return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
def free(self):
pn_session_free(self._impl)
class LinkException(ProtonException):
pass
class Link(Wrapper, Endpoint):
"""
A representation of an AMQP link, of which there are two concrete
implementations, Sender and Receiver.
"""
SND_UNSETTLED = PN_SND_UNSETTLED
SND_SETTLED = PN_SND_SETTLED
SND_MIXED = PN_SND_MIXED
RCV_FIRST = PN_RCV_FIRST
RCV_SECOND = PN_RCV_SECOND
@staticmethod
def wrap(impl):
if impl is None: return None
if pn_link_is_sender(impl):
return Sender(impl)
else:
return Receiver(impl)
def __init__(self, impl):
Wrapper.__init__(self, impl, pn_link_attachments)
def _get_attachments(self):
return pn_link_attachments(self._impl)
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, LinkException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
else:
return err
def _get_cond_impl(self):
return pn_link_condition(self._impl)
def _get_remote_cond_impl(self):
return pn_link_remote_condition(self._impl)
def open(self):
"""
Opens the link.
In more detail, this moves the local state of the link to the
ACTIVE state and triggers an attach frame to be sent to the
peer. A link is fully active once both peers have attached it.
"""
pn_link_open(self._impl)
def close(self):
"""
Closes the link.
In more detail, this moves the local state of the link to the
CLOSED state and triggers an detach frame (with the closed flag
set) to be sent to the peer. A link is fully closed once both
peers have detached it.
"""
self._update_cond()
pn_link_close(self._impl)
@property
def state(self):
"""
The state of the link as a bit field. The state has a local
and a remote component. Each of these can be in one of three
states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
REMOTE_ACTIVE and REMOTE_CLOSED.
"""
return pn_link_state(self._impl)
@property
def source(self):
"""The source of the link as described by the local peer."""
return Terminus(pn_link_source(self._impl))
@property
def target(self):
"""The target of the link as described by the local peer."""
return Terminus(pn_link_target(self._impl))
@property
def remote_source(self):
"""The source of the link as described by the remote peer."""
return Terminus(pn_link_remote_source(self._impl))
@property
def remote_target(self):
"""The target of the link as described by the remote peer."""
return Terminus(pn_link_remote_target(self._impl))
@property
def session(self):
return Session.wrap(pn_link_session(self._impl))
@property
def connection(self):
"""The connection on which this link was attached."""
return self.session.connection
def delivery(self, tag):
return Delivery(pn_delivery(self._impl, tag))
@property
def current(self):
return Delivery.wrap(pn_link_current(self._impl))
def advance(self):
return pn_link_advance(self._impl)
@property
def unsettled(self):
return pn_link_unsettled(self._impl)
@property
def credit(self):
"""The amount of oustanding credit on this link."""
return pn_link_credit(self._impl)
@property
def available(self):
return pn_link_available(self._impl)
@property
def queued(self):
return pn_link_queued(self._impl)
def next(self, mask):
return Link.wrap(pn_link_next(self._impl, mask))
@property
def name(self):
"""Returns the name of the link"""
return utf82unicode(pn_link_name(self._impl))
@property
def is_sender(self):
"""Returns true if this link is a sender."""
return pn_link_is_sender(self._impl)
@property
def is_receiver(self):
"""Returns true if this link is a receiver."""
return pn_link_is_receiver(self._impl)
@property
def remote_snd_settle_mode(self):
return pn_link_remote_snd_settle_mode(self._impl)
@property
def remote_rcv_settle_mode(self):
return pn_link_remote_rcv_settle_mode(self._impl)
def _get_snd_settle_mode(self):
return pn_link_snd_settle_mode(self._impl)
def _set_snd_settle_mode(self, mode):
pn_link_set_snd_settle_mode(self._impl, mode)
snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
def _get_rcv_settle_mode(self):
return pn_link_rcv_settle_mode(self._impl)
def _set_rcv_settle_mode(self, mode):
pn_link_set_rcv_settle_mode(self._impl, mode)
rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
def _get_drain(self):
return pn_link_get_drain(self._impl)
def _set_drain(self, b):
pn_link_set_drain(self._impl, bool(b))
drain_mode = property(_get_drain, _set_drain)
def drained(self):
return pn_link_drained(self._impl)
def detach(self):
return pn_link_detach(self._impl)
def free(self):
pn_link_free(self._impl)
class Terminus(object):
UNSPECIFIED = PN_UNSPECIFIED
SOURCE = PN_SOURCE
TARGET = PN_TARGET
COORDINATOR = PN_COORDINATOR
NONDURABLE = PN_NONDURABLE
CONFIGURATION = PN_CONFIGURATION
DELIVERIES = PN_DELIVERIES
DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
DIST_MODE_COPY = PN_DIST_MODE_COPY
DIST_MODE_MOVE = PN_DIST_MODE_MOVE
EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK
EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION
EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION
EXPIRE_NEVER = PN_EXPIRE_NEVER
def __init__(self, impl):
self._impl = impl
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, LinkException)
raise exc("[%s]" % err)
else:
return err
def _get_type(self):
return pn_terminus_get_type(self._impl)
def _set_type(self, type):
self._check(pn_terminus_set_type(self._impl, type))
type = property(_get_type, _set_type)
def _get_address(self):
"""The address that identifies the source or target node"""
return utf82unicode(pn_terminus_get_address(self._impl))
def _set_address(self, address):
self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
address = property(_get_address, _set_address)
def _get_durability(self):
return pn_terminus_get_durability(self._impl)
def _set_durability(self, seconds):
self._check(pn_terminus_set_durability(self._impl, seconds))
durability = property(_get_durability, _set_durability)
def _get_expiry_policy(self):
return pn_terminus_get_expiry_policy(self._impl)
def _set_expiry_policy(self, seconds):
self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
def _get_timeout(self):
return pn_terminus_get_timeout(self._impl)
def _set_timeout(self, seconds):
self._check(pn_terminus_set_timeout(self._impl, seconds))
timeout = property(_get_timeout, _set_timeout)
def _is_dynamic(self):
"""Indicates whether the source or target node was dynamically
created"""
return pn_terminus_is_dynamic(self._impl)
def _set_dynamic(self, dynamic):
self._check(pn_terminus_set_dynamic(self._impl, dynamic))
dynamic = property(_is_dynamic, _set_dynamic)
def _get_distribution_mode(self):
return pn_terminus_get_distribution_mode(self._impl)
def _set_distribution_mode(self, mode):
self._check(pn_terminus_set_distribution_mode(self._impl, mode))
distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
@property
def properties(self):
"""Properties of a dynamic source or target."""
return Data(pn_terminus_properties(self._impl))
@property
def capabilities(self):
"""Capabilities of the source or target."""
return Data(pn_terminus_capabilities(self._impl))
@property
def outcomes(self):
return Data(pn_terminus_outcomes(self._impl))
@property
def filter(self):
"""A filter on a source allows the set of messages transfered over
the link to be restricted"""
return Data(pn_terminus_filter(self._impl))
def copy(self, src):
self._check(pn_terminus_copy(self._impl, src._impl))
class Sender(Link):
"""
A link over which messages are sent.
"""
def offered(self, n):
pn_link_offered(self._impl, n)
def stream(self, data):
"""
Send specified data as part of the current delivery
@type data: binary
@param data: data to send
"""
return self._check(pn_link_send(self._impl, data))
def send(self, obj, tag=None):
"""
Send specified object over this sender; the object is expected to
have a send() method on it that takes the sender and an optional
tag as arguments.
Where the object is a Message, this will send the message over
this link, creating a new delivery for the purpose.
"""
if hasattr(obj, 'send'):
return obj.send(self, tag=tag)
else:
# treat object as bytes
return self.stream(obj)
def delivery_tag(self):
if not hasattr(self, 'tag_generator'):
def simple_tags():
count = 1
while True:
yield str(count)
count += 1
self.tag_generator = simple_tags()
return next(self.tag_generator)
class Receiver(Link):
"""
A link over which messages are received.
"""
def flow(self, n):
"""Increases the credit issued to the remote sender by the specified number of messages."""
pn_link_flow(self._impl, n)
def recv(self, limit):
n, binary = pn_link_recv(self._impl, limit)
if n == PN_EOS:
return None
else:
self._check(n)
return binary
def drain(self, n):
pn_link_drain(self._impl, n)
def draining(self):
return pn_link_draining(self._impl)
class NamedInt(int):
values = {}
def __new__(cls, i, name):
ni = super(NamedInt, cls).__new__(cls, i)
cls.values[i] = ni
return ni
def __init__(self, i, name):
self.name = name
def __repr__(self):
return self.name
def __str__(self):
return self.name
@classmethod
def get(cls, i):
return cls.values.get(i, i)
class DispositionType(NamedInt):
values = {}
class Disposition(object):
RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
REJECTED = DispositionType(PN_REJECTED, "REJECTED")
RELEASED = DispositionType(PN_RELEASED, "RELEASED")
MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
def __init__(self, impl, local):
self._impl = impl
self.local = local
self._data = None
self._condition = None
self._annotations = None
@property
def type(self):
return DispositionType.get(pn_disposition_type(self._impl))
def _get_section_number(self):
return pn_disposition_get_section_number(self._impl)
def _set_section_number(self, n):
pn_disposition_set_section_number(self._impl, n)
section_number = property(_get_section_number, _set_section_number)
def _get_section_offset(self):
return pn_disposition_get_section_offset(self._impl)
def _set_section_offset(self, n):
pn_disposition_set_section_offset(self._impl, n)
section_offset = property(_get_section_offset, _set_section_offset)
def _get_failed(self):
return pn_disposition_is_failed(self._impl)
def _set_failed(self, b):
pn_disposition_set_failed(self._impl, b)
failed = property(_get_failed, _set_failed)
def _get_undeliverable(self):
return pn_disposition_is_undeliverable(self._impl)
def _set_undeliverable(self, b):
pn_disposition_set_undeliverable(self._impl, b)
undeliverable = property(_get_undeliverable, _set_undeliverable)
def _get_data(self):
if self.local:
return self._data
else:
return dat2obj(pn_disposition_data(self._impl))
def _set_data(self, obj):
if self.local:
self._data = obj
else:
raise AttributeError("data attribute is read-only")
data = property(_get_data, _set_data)
def _get_annotations(self):
if self.local:
return self._annotations
else:
return dat2obj(pn_disposition_annotations(self._impl))
def _set_annotations(self, obj):
if self.local:
self._annotations = obj
else:
raise AttributeError("annotations attribute is read-only")
annotations = property(_get_annotations, _set_annotations)
def _get_condition(self):
if self.local:
return self._condition
else:
return cond2obj(pn_disposition_condition(self._impl))
def _set_condition(self, obj):
if self.local:
self._condition = obj
else:
raise AttributeError("condition attribute is read-only")
condition = property(_get_condition, _set_condition)
class Delivery(Wrapper):
"""
Tracks and/or records the delivery of a message over a link.
"""
RECEIVED = Disposition.RECEIVED
ACCEPTED = Disposition.ACCEPTED
REJECTED = Disposition.REJECTED
RELEASED = Disposition.RELEASED
MODIFIED = Disposition.MODIFIED
@staticmethod
def wrap(impl):
if impl is None:
return None
else:
return Delivery(impl)
def __init__(self, impl):
Wrapper.__init__(self, impl, pn_delivery_attachments)
def _init(self):
self.local = Disposition(pn_delivery_local(self._impl), True)
self.remote = Disposition(pn_delivery_remote(self._impl), False)
@property
def tag(self):
"""The identifier for the delivery."""
return pn_delivery_tag(self._impl)
@property
def writable(self):
"""Returns true for an outgoing delivery to which data can now be written."""
return pn_delivery_writable(self._impl)
@property
def readable(self):
"""Returns true for an incoming delivery that has data to read."""
return pn_delivery_readable(self._impl)
@property
def updated(self):
"""Returns true if the state of the delivery has been updated
(e.g. it has been settled and/or accepted, rejected etc)."""
return pn_delivery_updated(self._impl)
def update(self, state):
"""
Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED.
"""
obj2dat(self.local._data, pn_disposition_data(self.local._impl))
obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
pn_delivery_update(self._impl, state)
@property
def pending(self):
return pn_delivery_pending(self._impl)
@property
def partial(self):
"""
Returns true for an incoming delivery if not all the data is
yet available.
"""
return pn_delivery_partial(self._impl)
@property
def local_state(self):
"""Returns the local state of the delivery."""
return DispositionType.get(pn_delivery_local_state(self._impl))
@property
def remote_state(self):
"""
Returns the state of the delivery as indicated by the remote
peer.
"""
return DispositionType.get(pn_delivery_remote_state(self._impl))
@property
def settled(self):
"""
Returns true if the delivery has been settled by the remote peer.
"""
return pn_delivery_settled(self._impl)
def settle(self):
"""
Settles the delivery locally. This indicates the aplication
considers the delivery complete and does not wish to receive any
further events about it. Every delivery should be settled locally.
"""
pn_delivery_settle(self._impl)
@property
def work_next(self):
return Delivery.wrap(pn_work_next(self._impl))
@property
def link(self):
"""
Returns the link on which the delivery was sent or received.
"""
return Link.wrap(pn_delivery_link(self._impl))
@property
def session(self):
"""
Returns the session over which the delivery was sent or received.
"""
return self.link.session
@property
def connection(self):
"""
Returns the connection over which the delivery was sent or received.
"""
return self.session.connection
@property
def transport(self):
return self.connection.transport
class TransportException(ProtonException):
pass
class TraceAdapter:
def __init__(self, tracer):
self.tracer = tracer
def __call__(self, trans_impl, message):
self.tracer(Transport.wrap(trans_impl), message)
class Transport(Wrapper):
TRACE_OFF = PN_TRACE_OFF
TRACE_DRV = PN_TRACE_DRV
TRACE_FRM = PN_TRACE_FRM
TRACE_RAW = PN_TRACE_RAW
CLIENT = 1
SERVER = 2
@staticmethod
def wrap(impl):
if impl is None:
return None
else:
return Transport(_impl=impl)
def __init__(self, mode=None, _impl = pn_transport):
Wrapper.__init__(self, _impl, pn_transport_attachments)
if mode == Transport.SERVER:
pn_transport_set_server(self._impl)
elif mode is None or mode==Transport.CLIENT:
pass
else:
raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
def _init(self):
self._sasl = None
self._ssl = None
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, TransportException)
raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl))))
else:
return err
def _set_tracer(self, tracer):
pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
def _get_tracer(self):
adapter = pn_transport_get_pytracer(self._impl)
if adapter:
return adapter.tracer
else:
return None
tracer = property(_get_tracer, _set_tracer,
doc="""
A callback for trace logging. The callback is passed the transport and log message.
""")
def log(self, message):
pn_transport_log(self._impl, message)
def require_auth(self, bool):
pn_transport_require_auth(self._impl, bool)
@property
def authenticated(self):
return pn_transport_is_authenticated(self._impl)
def require_encryption(self, bool):
pn_transport_require_encryption(self._impl, bool)
@property
def encrypted(self):
return pn_transport_is_encrypted(self._impl)
@property
def user(self):
return pn_transport_get_user(self._impl)
def bind(self, connection):
"""Assign a connection to the transport"""
self._check(pn_transport_bind(self._impl, connection._impl))
def unbind(self):
"""Release the connection"""
self._check(pn_transport_unbind(self._impl))
def trace(self, n):
pn_transport_trace(self._impl, n)
def tick(self, now):
"""Process any timed events (like heartbeat generation).
now = seconds since epoch (float).
"""
return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
def capacity(self):
c = pn_transport_capacity(self._impl)
if c >= PN_EOS:
return c
else:
return self._check(c)
def push(self, binary):
n = self._check(pn_transport_push(self._impl, binary))
if n != len(binary):
raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
def close_tail(self):
self._check(pn_transport_close_tail(self._impl))
def pending(self):
p = pn_transport_pending(self._impl)
if p >= PN_EOS:
return p
else:
return self._check(p)
def peek(self, size):
cd, out = pn_transport_peek(self._impl, size)
if cd == PN_EOS:
return None
else:
self._check(cd)
return out
def pop(self, size):
pn_transport_pop(self._impl, size)
def close_head(self):
self._check(pn_transport_close_head(self._impl))
@property
def closed(self):
return pn_transport_closed(self._impl)
# AMQP 1.0 max-frame-size
def _get_max_frame_size(self):
return pn_transport_get_max_frame(self._impl)
def _set_max_frame_size(self, value):
pn_transport_set_max_frame(self._impl, value)
max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
doc="""
Sets the maximum size for received frames (in bytes).
""")
@property
def remote_max_frame_size(self):
return pn_transport_get_remote_max_frame(self._impl)
def _get_channel_max(self):
return pn_transport_get_channel_max(self._impl)
def _set_channel_max(self, value):
if pn_transport_set_channel_max(self._impl, value):
raise SessionException("Too late to change channel max.")
channel_max = property(_get_channel_max, _set_channel_max,
doc="""
Sets the maximum channel that may be used on the transport.
""")
@property
def remote_channel_max(self):
return pn_transport_remote_channel_max(self._impl)
# AMQP 1.0 idle-time-out
def _get_idle_timeout(self):
return millis2secs(pn_transport_get_idle_timeout(self._impl))
def _set_idle_timeout(self, sec):
pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
doc="""
The idle timeout of the connection (float, in seconds).
""")
@property
def remote_idle_timeout(self):
return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
@property
def frames_output(self):
return pn_transport_get_frames_output(self._impl)
@property
def frames_input(self):
return pn_transport_get_frames_input(self._impl)
def sasl(self):
return SASL(self)
def ssl(self, domain=None, session_details=None):
# SSL factory (singleton for this transport)
if not self._ssl:
self._ssl = SSL(self, domain, session_details)
return self._ssl
@property
def condition(self):
return cond2obj(pn_transport_condition(self._impl))
@property
def connection(self):
return Connection.wrap(pn_transport_connection(self._impl))
class SASLException(TransportException):
pass
class SASL(Wrapper):
OK = PN_SASL_OK
AUTH = PN_SASL_AUTH
SYS = PN_SASL_SYS
PERM = PN_SASL_PERM
TEMP = PN_SASL_TEMP
@staticmethod
def extended():
return pn_sasl_extended()
def __init__(self, transport):
Wrapper.__init__(self, transport._impl, pn_transport_attachments)
self._sasl = pn_sasl(transport._impl)
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, SASLException)
raise exc("[%s]" % (err))
else:
return err
@property
def user(self):
return pn_sasl_get_user(self._sasl)
@property
def mech(self):
return pn_sasl_get_mech(self._sasl)
@property
def outcome(self):
outcome = pn_sasl_outcome(self._sasl)
if outcome == PN_SASL_NONE:
return None
else:
return outcome
def allowed_mechs(self, mechs):
pn_sasl_allowed_mechs(self._sasl, mechs)
def _get_allow_insecure_mechs(self):
return pn_sasl_get_allow_insecure_mechs(self._sasl)
def _set_allow_insecure_mechs(self, insecure):
pn_sasl_set_allow_insecure_mechs(self._sasl, insecure)
allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs,
doc="""
Allow unencrypted cleartext passwords (PLAIN mech)
""")
def done(self, outcome):
pn_sasl_done(self._sasl, outcome)
def config_name(self, name):
pn_sasl_config_name(self._sasl, name)
def config_path(self, path):
pn_sasl_config_path(self._sasl, path)
class SSLException(TransportException):
pass
class SSLUnavailable(SSLException):
pass
class SSLDomain(object):
MODE_CLIENT = PN_SSL_MODE_CLIENT
MODE_SERVER = PN_SSL_MODE_SERVER
VERIFY_PEER = PN_SSL_VERIFY_PEER
VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
def __init__(self, mode):
self._domain = pn_ssl_domain(mode)
if self._domain is None:
raise SSLUnavailable()
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, SSLException)
raise exc("SSL failure.")
else:
return err
def set_credentials(self, cert_file, key_file, password):
return self._check( pn_ssl_domain_set_credentials(self._domain,
cert_file, key_file,
password) )
def set_trusted_ca_db(self, certificate_db):
return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain,
certificate_db) )
def set_peer_authentication(self, verify_mode, trusted_CAs=None):
return self._check( pn_ssl_domain_set_peer_authentication(self._domain,
verify_mode,
trusted_CAs) )
def allow_unsecured_client(self):
return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
def __del__(self):
pn_ssl_domain_free(self._domain)
class SSL(object):
@staticmethod
def present():
return pn_ssl_present()
def _check(self, err):
if err < 0:
exc = EXCEPTIONS.get(err, SSLException)
raise exc("SSL failure.")
else:
return err
def __new__(cls, transport, domain, session_details=None):
"""Enforce a singleton SSL object per Transport"""
if transport._ssl:
# unfortunately, we've combined the allocation and the configuration in a
# single step. So catch any attempt by the application to provide what
# may be a different configuration than the original (hack)
ssl = transport._ssl
if (domain and (ssl._domain is not domain) or
session_details and (ssl._session_details is not session_details)):
raise SSLException("Cannot re-configure existing SSL object!")
else:
obj = super(SSL, cls).__new__(cls)
obj._domain = domain
obj._session_details = session_details
session_id = None
if session_details:
session_id = session_details.get_session_id()
obj._ssl = pn_ssl( transport._impl )
if obj._ssl is None:
raise SSLUnavailable()
if domain:
pn_ssl_init( obj._ssl, domain._domain, session_id )
transport._ssl = obj
return transport._ssl
def cipher_name(self):
rc, name = pn_ssl_get_cipher_name( self._ssl, 128 )
if rc:
return name
return None
def protocol_name(self):
rc, name = pn_ssl_get_protocol_name( self._ssl, 128 )
if rc:
return name
return None
SHA1 = PN_SSL_SHA1
SHA256 = PN_SSL_SHA256
SHA512 = PN_SSL_SHA512
MD5 = PN_SSL_MD5
CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME
CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE
CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY
CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME
CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT
CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME
def get_cert_subject_subfield(self, subfield_name):
subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name)
return subfield_value
def get_cert_subject(self):
subject = pn_ssl_get_remote_subject(self._ssl)
return subject
def _get_cert_subject_unknown_subfield(self):
# Pass in an unhandled enum
return self.get_cert_subject_subfield(10)
# Convenience functions for obtaining the subfields of the subject field.
def get_cert_common_name(self):
return self.get_cert_subject_subfield(SSL.CERT_COMMON_NAME)
def get_cert_organization(self):
return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_NAME)
def get_cert_organization_unit(self):
return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_UNIT)
def get_cert_locality_or_city(self):
return self.get_cert_subject_subfield(SSL.CERT_CITY_OR_LOCALITY)
def get_cert_country(self):
return self.get_cert_subject_subfield(SSL.CERT_COUNTRY_NAME)
def get_cert_state_or_province(self):
return self.get_cert_subject_subfield(SSL.CERT_STATE_OR_PROVINCE)
def get_cert_fingerprint(self, fingerprint_length, digest_name):
rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name)
if rc == PN_OK:
return fingerprint_str
return None
# Convenience functions for obtaining fingerprint for specific hashing algorithms
def _get_cert_fingerprint_unknown_hash_alg(self):
return self.get_cert_fingerprint(41, 10)
def get_cert_fingerprint_sha1(self):
return self.get_cert_fingerprint(41, SSL.SHA1)
def get_cert_fingerprint_sha256(self):
# sha256 produces a fingerprint that is 64 characters long
return self.get_cert_fingerprint(65, SSL.SHA256)
def get_cert_fingerprint_sha512(self):
# sha512 produces a fingerprint that is 128 characters long
return self.get_cert_fingerprint(129, SSL.SHA512)
def get_cert_fingerprint_md5(self):
return self.get_cert_fingerprint(33, SSL.MD5)
@property
def remote_subject(self):
return pn_ssl_get_remote_subject( self._ssl )
RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
RESUME_NEW = PN_SSL_RESUME_NEW
RESUME_REUSED = PN_SSL_RESUME_REUSED
def resume_status(self):
return pn_ssl_resume_status( self._ssl )
def _set_peer_hostname(self, hostname):
self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
def _get_peer_hostname(self):
err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 )
self._check(err)
return utf82unicode(name)
peer_hostname = property(_get_peer_hostname, _set_peer_hostname,
doc="""
Manage the expected name of the remote peer. Used to authenticate the remote.
""")
class SSLSessionDetails(object):
""" Unique identifier for the SSL session. Used to resume previous session on a new
SSL connection.
"""
def __init__(self, session_id):
self._session_id = session_id
def get_session_id(self):
return self._session_id
wrappers = {
"pn_void": lambda x: pn_void2py(x),
"pn_pyref": lambda x: pn_void2py(x),
"pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)),
"pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)),
"pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)),
"pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)),
"pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)),
"pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x))
}
class Collector:
def __init__(self):
self._impl = pn_collector()
def put(self, obj, etype):
pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
def peek(self):
return Event.wrap(pn_collector_peek(self._impl))
def pop(self):
ev = self.peek()
pn_collector_pop(self._impl)
def __del__(self):
pn_collector_free(self._impl)
del self._impl
if "TypeExtender" not in globals():
class TypeExtender:
def __init__(self, number):
self.number = number
def next(self):
try:
return self.number
finally:
self.number += 1
class EventType(object):
_lock = threading.Lock()
_extended = TypeExtender(10000)
TYPES = {}
def __init__(self, name=None, number=None, method=None):
if name is None and number is None:
raise TypeError("extended events require a name")
try:
self._lock.acquire()
if name is None:
name = pn_event_type_name(number)
if number is None:
number = self._extended.next()
if method is None:
method = "on_%s" % name
self.name = name
self.number = number
self.method = method
self.TYPES[number] = self
finally:
self._lock.release()
def __repr__(self):
return self.name
def dispatch(handler, method, *args):
m = getattr(handler, method, None)
if m:
return m(*args)
elif hasattr(handler, "on_unhandled"):
return handler.on_unhandled(method, *args)
class EventBase(object):
def __init__(self, clazz, context, type):
self.clazz = clazz
self.context = context
self.type = type
def dispatch(self, handler):
return dispatch(handler, self.type.method, self)
def _none(x): return None
DELEGATED = Constant("DELEGATED")
def _core(number, method):
return EventType(number=number, method=method)
class Event(Wrapper, EventBase):
REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init")
REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced")
REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final")
TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task")
CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init")
CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound")
CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound")
CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open")
CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close")
CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open")
CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close")
CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final")
SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init")
SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open")
SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close")
SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open")
SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close")
SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final")
LINK_INIT = _core(PN_LINK_INIT, "on_link_init")
LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open")
LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close")
LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach")
LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open")
LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close")
LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach")
LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow")
LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final")
DELIVERY = _core(PN_DELIVERY, "on_delivery")
TRANSPORT = _core(PN_TRANSPORT, "on_transport")
TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error")
TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed")
TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed")
SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init")
SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated")
SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable")
SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable")
SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired")
SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error")
SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final")
@staticmethod
def wrap(impl, number=None):
if impl is None:
return None
if number is None:
number = pn_event_type(impl)
event = Event(impl, number)
if isinstance(event.context, EventBase):
return event.context
else:
return event
def __init__(self, impl, number):
Wrapper.__init__(self, impl, pn_event_attachments)
self.__dict__["type"] = EventType.TYPES[number]
def _init(self):
pass
def copy(self):
copy = pn_event_copy(self._impl)
return Event.wrap(copy)
@property
def clazz(self):
cls = pn_event_class(self._impl)
if cls:
return pn_class_name(cls)
else:
return None
@property
def root(self):
return WrappedHandler.wrap(pn_event_root(self._impl))
@property
def context(self):
"""Returns the context object associated with the event. The type of this depend on the type of event."""
return wrappers[self.clazz](pn_event_context(self._impl))
def dispatch(self, handler, type=None):
type = type or self.type
if isinstance(handler, WrappedHandler):
pn_handler_dispatch(handler._impl, self._impl, type.number)
else:
result = dispatch(handler, type.method, self)
if result != DELEGATED and hasattr(handler, "handlers"):
for h in handler.handlers:
self.dispatch(h, type)
@property
def reactor(self):
"""Returns the reactor associated with the event."""
return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
def __getattr__(self, name):
r = self.reactor
if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name:
return r
else:
return super(Event, self).__getattr__(name)
@property
def transport(self):
"""Returns the transport associated with the event, or null if none is associated with it."""
return Transport.wrap(pn_event_transport(self._impl))
@property
def connection(self):
"""Returns the connection associated with the event, or null if none is associated with it."""
return Connection.wrap(pn_event_connection(self._impl))
@property
def session(self):
"""Returns the session associated with the event, or null if none is associated with it."""
return Session.wrap(pn_event_session(self._impl))
@property
def link(self):
"""Returns the link associated with the event, or null if none is associated with it."""
return Link.wrap(pn_event_link(self._impl))
@property
def sender(self):
"""Returns the sender link associated with the event, or null if
none is associated with it. This is essentially an alias for
link(), that does an additional checkon the type of the
link."""
l = self.link
if l and l.is_sender:
return l
else:
return None
@property
def receiver(self):
"""Returns the receiver link associated with the event, or null if
none is associated with it. This is essentially an alias for
link(), that does an additional checkon the type of the link."""
l = self.link
if l and l.is_receiver:
return l
else:
return None
@property
def delivery(self):
"""Returns the delivery associated with the event, or null if none is associated with it."""
return Delivery.wrap(pn_event_delivery(self._impl))
def __repr__(self):
return "%s(%s)" % (self.type, self.context)
class LazyHandlers(object):
def __get__(self, obj, clazz):
if obj is None:
return self
ret = []
obj.__dict__['handlers'] = ret
return ret
class Handler(object):
handlers = LazyHandlers()
def on_unhandled(self, method, *args):
pass
class _cadapter:
def __init__(self, handler, on_error=None):
self.handler = handler
self.on_error = on_error
def dispatch(self, cevent, ctype):
ev = Event.wrap(cevent, ctype)
ev.dispatch(self.handler)
def exception(self, exc, val, tb):
if self.on_error is None:
_compat.raise_(exc, val, tb)
else:
self.on_error((exc, val, tb))
class WrappedHandlersChildSurrogate:
def __init__(self, delegate):
self.handlers = []
self.delegate = weakref.ref(delegate)
def on_unhandled(self, method, event):
delegate = self.delegate()
if delegate:
dispatch(delegate, method, event)
class WrappedHandlersProperty(object):
def __get__(self, obj, clazz):
if obj is None:
return None
return self.surrogate(obj).handlers
def __set__(self, obj, value):
self.surrogate(obj).handlers = value
def surrogate(self, obj):
key = "_surrogate"
objdict = obj.__dict__
surrogate = objdict.get(key, None)
if surrogate is None:
objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj)
obj.add(surrogate)
return surrogate
class WrappedHandler(Wrapper):
handlers = WrappedHandlersProperty()
@classmethod
def wrap(cls, impl, on_error=None):
if impl is None:
return None
else:
handler = cls(impl)
handler.__dict__["on_error"] = on_error
return handler
def __init__(self, impl_or_constructor):
Wrapper.__init__(self, impl_or_constructor)
if list(self.__class__.__mro__).index(WrappedHandler) > 1:
# instantiate the surrogate
self.handlers.extend([])
def _on_error(self, info):
on_error = getattr(self, "on_error", None)
if on_error is None:
_compat.raise_(info[0], info[1], info[2])
else:
on_error(info)
def add(self, handler):
if handler is None: return
impl = _chandler(handler, self._on_error)
pn_handler_add(self._impl, impl)
pn_decref(impl)
def clear(self):
pn_handler_clear(self._impl)
def _chandler(obj, on_error=None):
if obj is None:
return None
elif isinstance(obj, WrappedHandler):
impl = obj._impl
pn_incref(impl)
return impl
else:
return pn_pyhandler(_cadapter(obj, on_error))
class Url(object):
"""
Simple URL parser/constructor, handles URLs of the form:
<scheme>://<user>:<password>@<host>:<port>/<path>
All components can be None if not specifeid in the URL string.
The port can be specified as a service name, e.g. 'amqp' in the
URL string but Url.port always gives the integer value.
@ivar scheme: Url scheme e.g. 'amqp' or 'amqps'
@ivar user: Username
@ivar password: Password
@ivar host: Host name, ipv6 literal or ipv4 dotted quad.
@ivar port: Integer port.
@ivar host_port: Returns host:port
"""
AMQPS = "amqps"
AMQP = "amqp"
class Port(int):
"""An integer port number that can be constructed from a service name string"""
def __new__(cls, value):
"""@param value: integer port number or string service name."""
port = super(Url.Port, cls).__new__(cls, cls._port_int(value))
setattr(port, 'name', str(value))
return port
def __eq__(self, x): return str(self) == x or int(self) == x
def __ne__(self, x): return not self == x
def __str__(self): return str(self.name)
@staticmethod
def _port_int(value):
"""Convert service, an integer or a service name, into an integer port number."""
try:
return int(value)
except ValueError:
try:
return socket.getservbyname(value)
except socket.error:
# Not every system has amqp/amqps defined as a service
if value == Url.AMQPS: return 5671
elif value == Url.AMQP: return 5672
else:
raise ValueError("Not a valid port number or service name: '%s'" % value)
def __init__(self, url=None, defaults=True, **kwargs):
"""
@param url: URL string to parse.
@param defaults: If true, fill in missing default values in the URL.
If false, you can fill them in later by calling self.defaults()
@param kwargs: scheme, user, password, host, port, path.
If specified, replaces corresponding part in url string.
"""
if url:
self._url = pn_url_parse(unicode2utf8(str(url)))
if not self._url: raise ValueError("Invalid URL '%s'" % url)
else:
self._url = pn_url()
for k in kwargs: # Let kwargs override values parsed from url
getattr(self, k) # Check for invalid kwargs
setattr(self, k, kwargs[k])
if defaults: self.defaults()
class PartDescriptor(object):
def __init__(self, part):
self.getter = globals()["pn_url_get_%s" % part]
self.setter = globals()["pn_url_set_%s" % part]
def __get__(self, obj, type=None): return self.getter(obj._url)
def __set__(self, obj, value): return self.setter(obj._url, str(value))
scheme = PartDescriptor('scheme')
username = PartDescriptor('username')
password = PartDescriptor('password')
host = PartDescriptor('host')
path = PartDescriptor('path')
def _get_port(self):
portstr = pn_url_get_port(self._url)
return portstr and Url.Port(portstr)
def _set_port(self, value):
if value is None: pn_url_set_port(self._url, None)
else: pn_url_set_port(self._url, str(Url.Port(value)))
port = property(_get_port, _set_port)
def __str__(self): return pn_url_str(self._url)
def __repr__(self): return "Url(%r)" % str(self)
def __eq__(self, x): return str(self) == str(x)
def __ne__(self, x): return not self == x
def __del__(self):
pn_url_free(self._url);
del self._url
def defaults(self):
"""
Fill in missing values (scheme, host or port) with defaults
@return: self
"""
self.scheme = self.scheme or self.AMQP
self.host = self.host or '0.0.0.0'
self.port = self.port or self.Port(self.scheme)
return self
__all__ = [
"API_LANGUAGE",
"IMPLEMENTATION_LANGUAGE",
"ABORTED",
"ACCEPTED",
"AUTOMATIC",
"PENDING",
"MANUAL",
"REJECTED",
"RELEASED",
"MODIFIED",
"SETTLED",
"UNDESCRIBED",
"Array",
"Collector",
"Condition",
"Connection",
"Data",
"Delivery",
"Disposition",
"Described",
"Endpoint",
"Event",
"EventType",
"Handler",
"Link",
"Message",
"MessageException",
"Messenger",
"MessengerException",
"ProtonException",
"VERSION_MAJOR",
"VERSION_MINOR",
"Receiver",
"SASL",
"Sender",
"Session",
"SessionException",
"SSL",
"SSLDomain",
"SSLSessionDetails",
"SSLUnavailable",
"SSLException",
"Terminus",
"Timeout",
"Interrupt",
"Transport",
"TransportException",
"Url",
"char",
"dispatch",
"symbol",
"timestamp",
"ulong",
"byte",
"short",
"int32",
"ubyte",
"ushort",
"uint",
"float32",
"decimal32",
"decimal64",
"decimal128"
]