blob: 202820ce9d0f5b085ecd639f3f38f900633febeb [file] [log] [blame]
from __future__ import absolute_import
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging, os, socket, time, types
from heapq import heappush, heappop, nsmallest
from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
from select import select
from proton.handlers import OutgoingMessageHandler
from proton import unicode2utf8, utf82unicode
import traceback
from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
from .wrapper import Wrapper, PYCTX
from cproton import *
from . import _compat
import Queue
except ImportError:
import queue as Queue
class Task(Wrapper):
def wrap(impl):
if impl is None:
return None
return Task(impl)
def __init__(self, impl):
Wrapper.__init__(self, impl, pn_task_attachments)
def _init(self):
def cancel(self):
class Acceptor(Wrapper):
def __init__(self, impl):
Wrapper.__init__(self, impl)
def set_ssl_domain(self, ssl_domain):
pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
def close(self):
class Reactor(Wrapper):
def wrap(impl):
if impl is None:
return None
record = pn_reactor_attachments(impl)
attrs = pn_void2py(pn_record_get(record, PYCTX))
if attrs and 'subclass' in attrs:
return attrs['subclass'](impl=impl)
return Reactor(impl=impl)
def __init__(self, *handlers, **kwargs):
Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments)
for h in handlers:
def _init(self):
self.errors = []
def on_error(self, info):
def _get_global(self):
return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
def _set_global(self, handler):
impl = _chandler(handler, self.on_error)
pn_reactor_set_global_handler(self._impl, impl)
global_handler = property(_get_global, _set_global)
def _get_timeout(self):
return millis2timeout(pn_reactor_get_timeout(self._impl))
def _set_timeout(self, secs):
return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
timeout = property(_get_timeout, _set_timeout)
def yield_(self):
def mark(self):
return pn_reactor_mark(self._impl)
def _get_handler(self):
return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
def _set_handler(self, handler):
impl = _chandler(handler, self.on_error)
pn_reactor_set_handler(self._impl, impl)
handler = property(_get_handler, _set_handler)
def run(self):
self.timeout = 3.14159265359
while self.process(): pass
def wakeup(self):
n = pn_reactor_wakeup(self._impl)
if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
def start(self):
def quiesced(self):
return pn_reactor_quiesced(self._impl)
def _check_errors(self):
if self.errors:
for exc, value, tb in self.errors[:-1]:
traceback.print_exception(exc, value, tb)
exc, value, tb = self.errors[-1]
_compat.raise_(exc, value, tb)
def process(self):
result = pn_reactor_process(self._impl)
return result
def stop(self):
self.global_handler = None
self.handler = None
def schedule(self, delay, task):
impl = _chandler(task, self.on_error)
task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
return task
def acceptor(self, host, port, handler=None):
impl = _chandler(handler, self.on_error)
aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
if aimpl:
return Acceptor(aimpl)
raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
def connection(self, handler=None):
"""Deprecated: use connection_to_host() instead
impl = _chandler(handler, self.on_error)
result = Connection.wrap(pn_reactor_connection(self._impl, impl))
if impl: pn_decref(impl)
return result
def connection_to_host(self, host, port, handler=None):
"""Create an outgoing Connection that will be managed by the reactor.
The reator's pn_iohandler will create a socket connection to the host
once the connection is opened.
conn = self.connection(handler)
self.set_connection_host(conn, host, port)
return conn
def set_connection_host(self, connection, host, port):
"""Change the address used by the connection. The address is
used by the reactor's iohandler to create an outgoing socket
connection. This must be set prior to opening the connection.
def get_connection_address(self, connection):
"""This may be used to retrieve the remote peer address.
@return: string containing the address in URL format or None if no
address is available. Use the proton.Url class to create a Url object
from the returned value.
_url = pn_reactor_get_connection_address(self._impl, connection._impl)
return utf82unicode(_url)
def selectable(self, handler=None):
impl = _chandler(handler, self.on_error)
result = Selectable.wrap(pn_reactor_selectable(self._impl))
if impl:
record = pn_selectable_attachments(result._impl)
pn_record_set_handler(record, impl)
return result
def update(self, sel):
pn_reactor_update(self._impl, sel._impl)
def push_event(self, obj, etype):
pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
from proton import wrappers as _wrappers
_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
class EventInjector(object):
Can be added to a reactor to allow events to be triggered by an
external thread but handled on the event thread associated with
the reactor. An instance of this class can be passed to the
Reactor.selectable() method of the reactor in order to activate
it. The close() method should be called when it is no longer
needed, to allow the event loop to end if needed.
def __init__(self):
self.queue = Queue.Queue()
self.pipe = os.pipe()
self._closed = False
def trigger(self, event):
Request that the given event be dispatched on the event thread
of the reactor to which this EventInjector was added.
os.write(self.pipe[1], _compat.str2bin("!"))
def close(self):
Request that this EventInjector be closed. Existing events
will be dispctahed on the reactors event dispactch thread,
then this will be removed from the set of interest.
self._closed = True
os.write(self.pipe[1], _compat.str2bin("!"))
def fileno(self):
return self.pipe[0]
def on_selectable_init(self, event):
sel = event.context
sel.reading = True
def on_selectable_readable(self, event):[0], 512)
while not self.queue.empty():
requested = self.queue.get()
event.reactor.push_event(requested.context, requested.type)
if self._closed:
s = event.context
class ApplicationEvent(EventBase):
Application defined event, which can optionally be associated with
an engine object and or an arbitrary subject
def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename))
self.connection = connection
self.session = session = link = delivery
if =
self.session =
if self.session:
self.connection = self.session.connection
self.subject = subject
def __repr__(self):
objects = [self.connection, self.session,,, self.subject]
return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
class Transaction(object):
Class to track state of an AMQP 1.0 transaction.
def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
self.txn_ctrl = txn_ctrl
self.handler = handler = None
self._declare = None
self._discharge = None
self.failed = False
self._pending = []
self.settle_before_discharge = settle_before_discharge
def commit(self):
def abort(self):
def declare(self):
self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
def discharge(self, failed):
self.failed = failed
self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [, failed])
def _send_ctrl(self, descriptor, value):
delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
delivery.transaction = self
return delivery
def send(self, sender, msg, tag=None):
dlv = sender.send(msg, tag=tag) = []
return dlv
def accept(self, delivery):
self.update(delivery, PN_ACCEPTED)
if self.settle_before_discharge:
def update(self, delivery, state=None):
if state: = [, Described(ulong(state), [])]
def _release_pending(self):
for d in self._pending:
def _clear_pending(self):
self._pending = []
def handle_outcome(self, event):
if == self._declare:
if =[0]
elif == Delivery.REJECTED:
logging.warning("Unexpected outcome for declare: %s" %
elif == self._discharge:
if == Delivery.REJECTED:
if not self.failed:
self._release_pending() # make this optional?
if self.failed:
class LinkOption(object):
Abstract interface for link configuration options
def apply(self, link):
Subclasses will implement any configuration logic in this
def test(self, link):
Subclasses can override this to selectively apply an option
e.g. based on some link criteria
return True
class AtMostOnce(LinkOption):
def apply(self, link):
link.snd_settle_mode = Link.SND_SETTLED
class AtLeastOnce(LinkOption):
def apply(self, link):
link.snd_settle_mode = Link.SND_UNSETTLED
link.rcv_settle_mode = Link.RCV_FIRST
class SenderOption(LinkOption):
def apply(self, sender): pass
def test(self, link): return link.is_sender
class ReceiverOption(LinkOption):
def apply(self, receiver): pass
def test(self, link): return link.is_receiver
class DynamicNodeProperties(LinkOption):
def __init__(self, props={}): = {}
for k in props:
if isinstance(k, symbol):[k] = props[k]
else:[symbol(k)] = props[k]
def apply(self, link):
if link.is_receiver:
class Filter(ReceiverOption):
def __init__(self, filter_set={}):
self.filter_set = filter_set
def apply(self, receiver):
class Selector(Filter):
Configures a link with a message selector filter
def __init__(self, value, name='selector'):
super(Selector, self).__init__({symbol(name): Described(symbol(''), value)})
class DurableSubscription(ReceiverOption):
def apply(self, receiver):
receiver.source.durability = Terminus.DELIVERIES
receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
class Move(ReceiverOption):
def apply(self, receiver):
receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
class Copy(ReceiverOption):
def apply(self, receiver):
receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
def _apply_link_options(options, link):
if options:
if isinstance(options, list):
for o in options:
if o.test(link): o.apply(link)
if options.test(link): options.apply(link)
def _create_session(connection, handler=None):
session = connection.session()
return session
def _get_attr(target, name):
if hasattr(target, name):
return getattr(target, name)
return None
class SessionPerConnection(object):
def __init__(self):
self._default_session = None
def session(self, connection):
if not self._default_session:
self._default_session = _create_session(connection)
self._default_session.context = self
return self._default_session
def on_session_remote_close(self, event):
self._default_session = None
class GlobalOverrides(object):
Internal handler that triggers the necessary socket connect for an
opened connection.
def __init__(self, base):
self.base = base
def on_unhandled(self, name, event):
if not self._override(event):
def _override(self, event):
conn = event.connection
return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
class Connector(Handler):
Internal handler that triggers the necessary socket connect for an
opened connection.
def __init__(self, connection):
self.connection = connection
self.address = None
self.heartbeat = None
self.reconnect = None
self.ssl_domain = None
self.allow_insecure_mechs = True
self.allowed_mechs = None
self.sasl_enabled = True
self.user = None
self.password = None
self.virtual_host = None
def _connect(self, connection, reactor):
assert(reactor is not None)
url =
reactor.set_connection_host(connection,, str(url.port))
# if virtual-host not set, use host from address as default
if self.virtual_host is None:
connection.hostname =
logging.debug("connecting to %s..." % url)
transport = Transport()
if self.sasl_enabled:
sasl = transport.sasl()
sasl.allow_insecure_mechs = self.allow_insecure_mechs
if url.username:
connection.user = url.username
elif self.user:
connection.user = self.user
if url.password:
connection.password = url.password
elif self.password:
connection.password = self.password
if self.allowed_mechs:
if self.heartbeat:
transport.idle_timeout = self.heartbeat
if url.scheme == 'amqps':
if not self.ssl_domain:
raise SSLUnavailable("amqps: SSL libraries not found")
self.ssl = SSL(transport, self.ssl_domain)
self.ssl.peer_hostname =
def on_connection_local_open(self, event):
self._connect(event.connection, event.reactor)
def on_connection_remote_open(self, event):
logging.debug("connected to %s" % event.connection.hostname)
if self.reconnect:
self.transport = None
def on_transport_tail_closed(self, event):
def on_transport_closed(self, event):
if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE:
if self.reconnect:
delay =
if delay == 0:"Disconnected, reconnecting...")
self._connect(self.connection, event.reactor)
else:"Disconnected will try to reconnect after %s seconds" % delay)
event.reactor.schedule(delay, self)
self.connection = None
def on_timer_task(self, event):
self._connect(self.connection, event.reactor)
def on_connection_remote_close(self, event):
self.connection = None
class Backoff(object):
A reconnect strategy involving an increasing delay between
retries, up to a maximum or 10 seconds.
def __init__(self):
self.delay = 0
def reset(self):
self.delay = 0
def next(self):
current = self.delay
if current == 0:
self.delay = 0.1
self.delay = min(10, 2*current)
return current
class Urls(object):
def __init__(self, values):
self.values = [Url(v) for v in values]
self.i = iter(self.values)
def __iter__(self):
return self
def next(self):
return next(self.i)
except StopIteration:
self.i = iter(self.values)
return next(self.i)
class SSLConfig(object):
def __init__(self):
self.client = SSLDomain(SSLDomain.MODE_CLIENT)
self.server = SSLDomain(SSLDomain.MODE_SERVER)
def set_credentials(self, cert_file, key_file, password):
self.client.set_credentials(cert_file, key_file, password)
self.server.set_credentials(cert_file, key_file, password)
def set_trusted_ca_db(self, certificate_db):
class Container(Reactor):
"""A representation of the AMQP concept of a 'container', which
lossely speaking is something that establishes links to or from
another container, over which messages are transfered. This is
an extension to the Reactor class that adds convenience methods
for creating connections and sender- or receiver- links.
def __init__(self, *handlers, **kwargs):
super(Container, self).__init__(*handlers, **kwargs)
if "impl" not in kwargs:
self.ssl = SSLConfig()
except SSLUnavailable:
self.ssl = None
self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler))
self.trigger = None
self.container_id = str(generate_uuid())
self.allow_insecure_mechs = True
self.allowed_mechs = None
self.sasl_enabled = True
self.user = None
self.password = None
Wrapper.__setattr__(self, 'subclass', self.__class__)
def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
Initiates the establishment of an AMQP connection. Returns an
instance of proton.Connection.
@param url: URL string of process to connect to
@param urls: list of URL strings of process to try to connect to
Only one of url or urls should be specified.
@param reconnect: A value of False will prevent the library
form automatically trying to reconnect if the underlying
socket is disconnected before the connection has been closed.
@param heartbeat: A value in milliseconds indicating the
desired frequency of heartbeats used to test the underlying
socket is alive.
@param ssl_domain: SSL configuration in the form of an
instance of proton.SSLdomain.
@param handler: a connection scoped handler that will be
called to process any events in the scope of this connection
or its child links
@param kwargs: sasl_enabled, which determines whether a sasl layer is
used for the connection; allowed_mechs an optional list of SASL
mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag
indicating whether insecure mechanisms, such as PLAIN over a
non-encrypted socket, are allowed; 'virtual_host' the hostname to set
in the Open performative used by peer to determine the correct
back-end service for the client. If 'virtual_host' is not supplied the
host field from the URL is used instead."
conn = self.connection(handler)
conn.container = self.container_id or str(generate_uuid())
conn.offered_capabilities = kwargs.get('offered_capabilities')
conn.desired_capabilities = kwargs.get('desired_capabilities') = kwargs.get('properties')
connector = Connector(conn)
connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
connector.user = kwargs.get('user', self.user)
connector.password = kwargs.get('password', self.password)
connector.virtual_host = kwargs.get('virtual_host')
if connector.virtual_host:
# only set hostname if virtual-host is a non-empty string
conn.hostname = connector.virtual_host
conn._overrides = connector
if url: connector.address = Urls([url])
elif urls: connector.address = Urls(urls)
elif address: connector.address = address
else: raise ValueError("One of url, urls or address required")
if heartbeat:
connector.heartbeat = heartbeat
if reconnect:
connector.reconnect = reconnect
elif reconnect is None:
connector.reconnect = Backoff()
# use container's default client domain if none specified. This is
# only necessary of the URL specifies the "amqps:" scheme
connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
conn._session_policy = SessionPerConnection() #todo: make configurable
return conn
def _get_id(self, container, remote, local):
if local and remote: "%s-%s-%s" % (container, remote, local)
elif local: return "%s-%s" % (container, local)
elif remote: return "%s-%s" % (container, remote)
else: return "%s-%s" % (container, str(generate_uuid()))
def _get_session(self, context):
if isinstance(context, Url):
return self._get_session(self.connect(url=context))
elif isinstance(context, Session):
return context
elif isinstance(context, Connection):
if hasattr(context, '_session_policy'):
return context._session_policy.session(context)
return _create_session(context)
return context.session()
def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
Initiates the establishment of a link over which messages can
be sent. Returns an instance of proton.Sender.
There are two patterns of use. (1) A connection can be passed
as the first argument, in which case the link is established
on that connection. In this case the target address can be
specified as the second argument (or as a keyword
argument). The source address can also be specified if
desired. (2) Alternatively a URL can be passed as the first
argument. In this case a new connection will be establised on
which the link will be attached. If a path is specified and
the target is not, then the path of the URL is used as the
target address.
The name of the link may be specified if desired, otherwise a
unique name will be generated.
Various LinkOptions can be specified to further control the
if isinstance(context, _compat.STRING_TYPES):
context = Url(context)
if isinstance(context, Url) and not target:
target = context.path
session = self._get_session(context)
snd = session.sender(name or self._get_id(session.connection.container, target, source))
if source:
snd.source.address = source
if target: = target
if handler != None:
snd.handler = handler
if tags:
snd.tag_generator = tags
_apply_link_options(options, snd)
return snd
def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
Initiates the establishment of a link over which messages can
be received (aka a subscription). Returns an instance of
There are two patterns of use. (1) A connection can be passed
as the first argument, in which case the link is established
on that connection. In this case the source address can be
specified as the second argument (or as a keyword
argument). The target address can also be specified if
desired. (2) Alternatively a URL can be passed as the first
argument. In this case a new connection will be establised on
which the link will be attached. If a path is specified and
the source is not, then the path of the URL is used as the
target address.
The name of the link may be specified if desired, otherwise a
unique name will be generated.
Various LinkOptions can be specified to further control the
if isinstance(context, _compat.STRING_TYPES):
context = Url(context)
if isinstance(context, Url) and not source:
source = context.path
session = self._get_session(context)
rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
if source:
rcv.source.address = source
if dynamic:
rcv.source.dynamic = True
if target: = target
if handler != None:
rcv.handler = handler
_apply_link_options(options, rcv)
return rcv
def declare_transaction(self, context, handler=None, settle_before_discharge=False):
if not _get_attr(context, '_txn_ctrl'):
class InternalTransactionHandler(OutgoingMessageHandler):
def __init__(self):
super(InternalTransactionHandler, self).__init__(auto_settle=True)
def on_settled(self, event):
if hasattr(, "transaction"):
event.transaction =
context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) = Terminus.COORDINATOR'amqp:local-transactions'))
return Transaction(context._txn_ctrl, handler, settle_before_discharge)
def listen(self, url, ssl_domain=None):
Initiates a server socket, accepting incoming AMQP connections
on the interface and port specified.
url = Url(url)
acceptor = self.acceptor(, url.port)
ssl_config = ssl_domain
if not ssl_config and url.scheme == 'amqps':
# use container's default server domain
if self.ssl:
ssl_config = self.ssl.server
raise SSLUnavailable("amqps: SSL libraries not found")
if ssl_config:
return acceptor
def do_work(self, timeout=None):
if timeout:
self.timeout = timeout
return self.process()