blob: 628cdd58b70f47ac4b246273fed7d46852e045d5 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import absolute_import
#from functools import total_ordering
import heapq
import json
import logging
import re
import os
import time
import traceback
import uuid
from cproton import PN_PYREF, PN_ACCEPTED, PN_EVENT_NONE
from ._delivery import Delivery
from ._endpoints import Connection, Endpoint, Link, Session, Terminus
from ._exceptions import SSLUnavailable
from ._data import Described, symbol, ulong
from ._message import Message
from ._transport import Transport, SSL, SSLDomain
from ._url import Url
from ._common import isstring, unicode2utf8, utf82unicode
from ._events import Collector, EventType, EventBase, Handler, Event
from ._selectable import Selectable
from ._handlers import OutgoingMessageHandler, IOHandler
from ._io import IO, PN_INVALID_SOCKET
from . import _compat
from ._compat import queue
_logger = logging.getLogger("proton")
def _generate_uuid():
return uuid.uuid4()
def _now():
return time.time()
#@total_ordering
class Task(object):
def __init__(self, reactor, deadline, handler):
self._deadline = deadline
self._handler = handler
self._reactor = reactor
self._cancelled = False
def __lt__(self, rhs):
return self._deadline < rhs._deadline
def cancel(self):
self._cancelled = True
@property
def handler(self):
return self._handler
@property
def container(self):
return self._reactor
class TimerSelectable(Selectable):
def __init__(self, reactor, collector):
super(TimerSelectable, self).__init__(None, reactor)
self.collect(collector)
collector.put(self, Event.SELECTABLE_INIT)
def fileno(self):
return PN_INVALID_SOCKET
def readable(self):
pass
def writable(self):
pass
def expired(self):
self._reactor.timer_tick()
self.deadline = self._reactor.timer_deadline
self.update()
class Reactor(object):
def __init__(self, *handlers, **kwargs):
self._previous = PN_EVENT_NONE
self._timeout = 0
self.mark()
self._yield = False
self._stop = False
self._collector = Collector()
self._selectable = None
self._selectables = 0
self._global_handler = IOHandler()
self._handler = Handler()
self._timerheap = []
self._timers = 0
self.errors = []
for h in handlers:
self.handler.add(h, on_error=self.on_error)
def on_error(self, info):
self.errors.append(info)
self.yield_()
# TODO: need to make this actually return a proxy which catches exceptions and calls
# on error.
# [Or arrange another way to deal with exceptions thrown by handlers]
def _make_handler(self, handler):
"""
Return a proxy handler that dispatches to the provided handler.
If handler throws an exception then on_error is called with info
"""
return handler
def _get_global(self):
return self._global_handler
def _set_global(self, handler):
self._global_handler = self._make_handler(handler)
global_handler = property(_get_global, _set_global)
def _get_timeout(self):
return self._timeout
def _set_timeout(self, secs):
self._timeout = secs
timeout = property(_get_timeout, _set_timeout)
def yield_(self):
self._yield = True
def mark(self):
""" This sets the reactor now instant to the current time """
self._now = _now()
return self._now
@property
def now(self):
return self._now
def _get_handler(self):
return self._handler
def _set_handler(self, handler):
self._handler = self._make_handler(handler)
handler = property(_get_handler, _set_handler)
def run(self):
# TODO: Why do we timeout like this?
self.timeout = 3.14159265359
self.start()
while self.process(): pass
self.stop()
self.process()
# TODO: This isn't correct if we ever run again
self._global_handler = None
self._handler = None
# Cross thread reactor wakeup
def wakeup(self):
# TODO: Do this with pipe and write?
#os.write(self._wakeup[1], "x", 1);
pass
def start(self):
self.push_event(self, Event.REACTOR_INIT)
self._selectable = TimerSelectable(self, self._collector)
self._selectable.deadline = self.timer_deadline
# TODO set up fd to read for wakeups - but problematic on windows
#self._selectable.fileno(self._wakeup[0])
#self._selectable.reading = True
self.update(self._selectable)
@property
def quiesced(self):
event = self._collector.peek()
if not event:
return True
if self._collector.more():
return False
return event.type is Event.REACTOR_QUIESCED
def _check_errors(self):
""" This """
if self.errors:
for exc, value, tb in self.errors[:-1]:
traceback.print_exception(exc, value, tb)
exc, value, tb = self.errors[-1]
_compat.raise_(exc, value, tb)
def process(self):
# result = pn_reactor_process(self._impl)
# self._check_errors()
# return result
self.mark()
previous = PN_EVENT_NONE
while True:
if self._yield:
self._yield = False
_logger.debug('%s Yielding', self)
return True
event = self._collector.peek()
if event:
_logger.debug('%s recvd Event: %r', self, event)
type = event.type
# regular handler
handler = event.handler or self._handler
event.dispatch(handler)
event.dispatch(self._global_handler)
previous = type
self._previous = type
self._collector.pop()
elif not self._stop and (self._timers > 0 or self._selectables > 1):
if previous is not Event.REACTOR_QUIESCED and self._previous is not Event.REACTOR_FINAL:
self.push_event(self, Event.REACTOR_QUIESCED)
self.yield_()
else:
if self._selectable:
self._selectable.terminate()
self.update(self._selectable)
self._selectable = None
else:
if self._previous is not Event.REACTOR_FINAL:
self.push_event(self, Event.REACTOR_FINAL)
_logger.debug('%s Stopping', self)
return False
def stop(self):
self._stop = True
self._check_errors()
def stop_events(self):
self._collector.release()
def schedule(self, delay, handler):
himpl = self._make_handler(handler)
task = Task(self, self._now+delay, himpl)
heapq.heappush(self._timerheap, task)
self._timers += 1
deadline = self._timerheap[0]._deadline
if self._selectable:
self._selectable.deadline = deadline
self.update(self._selectable)
return task
def timer_tick(self):
while self._timers > 0:
t = self._timerheap[0]
if t._cancelled:
heapq.heappop(self._timerheap)
self._timers -= 1
elif t._deadline > self._now:
return
else:
heapq.heappop(self._timerheap)
self._timers -= 1
self.push_event(t, Event.TIMER_TASK)
@property
def timer_deadline(self):
while self._timers > 0:
t = self._timerheap[0]
if t._cancelled:
heapq.heappop(self._timerheap)
self._timers -= 1
else:
return t._deadline
return None
def acceptor(self, host, port, handler=None):
impl = self._make_handler(handler)
a = Acceptor(self, unicode2utf8(host), int(port), impl)
if a:
return a
else:
raise IOError("%s (%s:%s)" % (str(self.errors), host, port))
def connection(self, handler=None):
"""Deprecated: use connection_to_host() instead
"""
impl = self._make_handler(handler)
result = Connection()
if impl:
result.handler = impl
result._reactor = self
result.collect(self._collector)
return result
def connection_to_host(self, host, port, handler=None):
"""Create an outgoing Connection that will be managed by the reactor.
The reactor's pn_iohandler will create a socket connection to the host
once the connection is opened.
"""
conn = self.connection(handler)
self.set_connection_host(conn, host, port)
return conn
def set_connection_host(self, connection, host, port):
"""Change the address used by the connection. The address is
used by the reactor's iohandler to create an outgoing socket
connection. This must be set prior to opening the connection.
"""
connection.set_address(host, port)
def get_connection_address(self, connection):
"""This may be used to retrieve the remote peer address.
@return: string containing the address in URL format or None if no
address is available. Use the proton.Url class to create a Url object
from the returned value.
"""
_url = connection.get_address()
return utf82unicode(_url)
def selectable(self, handler=None, delegate=None):
if delegate is None:
delegate = handler
result = Selectable(delegate, self)
result.collect(self._collector)
result.handler = handler
result.push_event(result, Event.SELECTABLE_INIT)
return result
def update(self, selectable):
selectable.update()
def push_event(self, obj, etype):
self._collector.put(obj, etype)
class EventInjector(object):
"""
Can be added to a reactor to allow events to be triggered by an
external thread but handled on the event thread associated with
the reactor. An instance of this class can be passed to the
Reactor.selectable() method of the reactor in order to activate
it. The close() method should be called when it is no longer
needed, to allow the event loop to end if needed.
"""
def __init__(self):
self.queue = queue.Queue()
self.pipe = os.pipe()
self._transport = None
self._closed = False
def trigger(self, event):
"""
Request that the given event be dispatched on the event thread
of the reactor to which this EventInjector was added.
"""
self.queue.put(event)
os.write(self.pipe[1], b"!")
def close(self):
"""
Request that this EventInjector be closed. Existing events
will be dispatched on the reactors event dispatch thread,
then this will be removed from the set of interest.
"""
self._closed = True
os.write(self.pipe[1], b"!")
def fileno(self):
return self.pipe[0]
def on_selectable_init(self, event):
sel = event.context
#sel.fileno(self.fileno())
sel.reading = True
sel.update()
def on_selectable_readable(self, event):
s = event.context
os.read(self.pipe[0], 512)
while not self.queue.empty():
requested = self.queue.get()
s.push_event(requested.context, requested.type)
if self._closed:
s.terminate()
s.update()
class ApplicationEvent(EventBase):
"""
Application defined event, which can optionally be associated with
an engine object and or an arbitrary subject
"""
def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
super(ApplicationEvent, self).__init__(EventType(typename))
self.clazz = PN_PYREF
self.connection = connection
self.session = session
self.link = link
self.delivery = delivery
if self.delivery:
self.link = self.delivery.link
if self.link:
self.session = self.link.session
if self.session:
self.connection = self.session.connection
self.subject = subject
@property
def context(self):
return self
def __repr__(self):
objects = [self.connection, self.session, self.link, self.delivery, self.subject]
return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
class Transaction(object):
"""
Class to track state of an AMQP 1.0 transaction.
"""
def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
self.txn_ctrl = txn_ctrl
self.handler = handler
self.id = None
self._declare = None
self._discharge = None
self.failed = False
self._pending = []
self.settle_before_discharge = settle_before_discharge
self.declare()
def commit(self):
self.discharge(False)
def abort(self):
self.discharge(True)
def declare(self):
self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
def discharge(self, failed):
self.failed = failed
self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
def _send_ctrl(self, descriptor, value):
delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
delivery.transaction = self
return delivery
def send(self, sender, msg, tag=None):
dlv = sender.send(msg, tag=tag)
dlv.local.data = [self.id]
dlv.update(0x34)
return dlv
def accept(self, delivery):
self.update(delivery, PN_ACCEPTED)
if self.settle_before_discharge:
delivery.settle()
else:
self._pending.append(delivery)
def update(self, delivery, state=None):
if state:
delivery.local.data = [self.id, Described(ulong(state), [])]
delivery.update(0x34)
def _release_pending(self):
for d in self._pending:
d.update(Delivery.RELEASED)
d.settle()
self._clear_pending()
def _clear_pending(self):
self._pending = []
def handle_outcome(self, event):
if event.delivery == self._declare:
if event.delivery.remote.data:
self.id = event.delivery.remote.data[0]
self.handler.on_transaction_declared(event)
elif event.delivery.remote_state == Delivery.REJECTED:
self.handler.on_transaction_declare_failed(event)
else:
_logger.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state)
self.handler.on_transaction_declare_failed(event)
elif event.delivery == self._discharge:
if event.delivery.remote_state == Delivery.REJECTED:
if not self.failed:
self.handler.on_transaction_commit_failed(event)
self._release_pending() # make this optional?
else:
if self.failed:
self.handler.on_transaction_aborted(event)
self._release_pending()
else:
self.handler.on_transaction_committed(event)
self._clear_pending()
class LinkOption(object):
"""
Abstract interface for link configuration options
"""
def apply(self, link):
"""
Subclasses will implement any configuration logic in this
method
"""
pass
def test(self, link):
"""
Subclasses can override this to selectively apply an option
e.g. based on some link criteria
"""
return True
class AtMostOnce(LinkOption):
def apply(self, link):
link.snd_settle_mode = Link.SND_SETTLED
class AtLeastOnce(LinkOption):
def apply(self, link):
link.snd_settle_mode = Link.SND_UNSETTLED
link.rcv_settle_mode = Link.RCV_FIRST
class SenderOption(LinkOption):
def apply(self, sender): pass
def test(self, link): return link.is_sender
class ReceiverOption(LinkOption):
def apply(self, receiver): pass
def test(self, link): return link.is_receiver
class DynamicNodeProperties(LinkOption):
def __init__(self, props={}):
self.properties = {}
for k in props:
if isinstance(k, symbol):
self.properties[k] = props[k]
else:
self.properties[symbol(k)] = props[k]
def apply(self, link):
if link.is_receiver:
link.source.properties.put_dict(self.properties)
else:
link.target.properties.put_dict(self.properties)
class Filter(ReceiverOption):
def __init__(self, filter_set={}):
self.filter_set = filter_set
def apply(self, receiver):
receiver.source.filter.put_dict(self.filter_set)
class Selector(Filter):
"""
Configures a link with a message selector filter
"""
def __init__(self, value, name='selector'):
super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), utf82unicode(value))})
class DurableSubscription(ReceiverOption):
def apply(self, receiver):
receiver.source.durability = Terminus.DELIVERIES
receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
class Move(ReceiverOption):
def apply(self, receiver):
receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
class Copy(ReceiverOption):
def apply(self, receiver):
receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
def _apply_link_options(options, link):
if options:
if isinstance(options, list):
for o in options:
if o.test(link): o.apply(link)
else:
if options.test(link): options.apply(link)
def _create_session(connection, handler=None):
session = connection.session()
session.open()
return session
def _get_attr(target, name):
if hasattr(target, name):
return getattr(target, name)
else:
return None
class SessionPerConnection(object):
def __init__(self):
self._default_session = None
def session(self, connection):
if not self._default_session:
self._default_session = _create_session(connection)
return self._default_session
class GlobalOverrides(Handler):
"""
Internal handler that triggers the necessary socket connect for an
opened connection.
"""
def __init__(self, base):
self.base = base
def on_unhandled(self, name, event):
if not self._override(event):
event.dispatch(self.base)
def _override(self, event):
conn = event.connection
return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
class Acceptor(Handler):
def __init__(self, reactor, host, port, handler=None):
self._ssl_domain = None
self._reactor = reactor
self._handler = handler
sock = IO.listen(host, port)
s = reactor.selectable(handler=self, delegate=sock)
s.reading = True
s._transport = None
self._selectable = s
reactor.update(s)
def set_ssl_domain(self, ssl_domain):
self._ssl_domain = ssl_domain
def close(self):
if not self._selectable.is_terminal:
IO.close(self._selectable)
self._selectable.terminate()
self._reactor.update(self._selectable)
def on_selectable_readable(self, event):
s = event.selectable
sock, name = IO.accept(self._selectable)
_logger.debug("Accepted connection from %s", name)
r = self._reactor
handler = self._handler or r.handler
c = r.connection(handler)
c._acceptor = self
c.url = Url(host=name[0], port=name[1])
t = Transport(Transport.SERVER)
if self._ssl_domain:
t.ssl(self._ssl_domain)
t.bind(c)
s = r.selectable(delegate=sock)
s._transport = t
t._selectable = s
IOHandler.update(t, s, r.now)
class Connector(Handler):
"""
Internal handler that triggers the necessary socket connect for an
opened connection.
"""
def __init__(self, connection):
self.connection = connection
self.address = None
self.heartbeat = None
self.reconnect = None
self.ssl_domain = None
self.allow_insecure_mechs = True
self.allowed_mechs = None
self.sasl_enabled = True
self.user = None
self.password = None
self.virtual_host = None
self.ssl_sni = None
self.max_frame_size = None
def _connect(self, connection):
url = self.address.next()
connection.url = url
# if virtual-host not set, use host from address as default
if self.virtual_host is None:
connection.hostname = url.host
_logger.debug("connecting to %r..." % url)
transport = Transport()
if self.sasl_enabled:
sasl = transport.sasl()
sasl.allow_insecure_mechs = self.allow_insecure_mechs
if url.username:
connection.user = url.username
elif self.user:
connection.user = self.user
if url.password:
connection.password = url.password
elif self.password:
connection.password = self.password
if self.allowed_mechs:
sasl.allowed_mechs(self.allowed_mechs)
transport.bind(connection)
if self.heartbeat:
transport.idle_timeout = self.heartbeat
if url.scheme == 'amqps':
if not self.ssl_domain:
raise SSLUnavailable("amqps: SSL libraries not found")
self.ssl = SSL(transport, self.ssl_domain)
self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host
if self.max_frame_size:
transport.max_frame_size = self.max_frame_size
def on_connection_local_open(self, event):
self._connect(event.connection)
def on_connection_remote_open(self, event):
_logger.debug("connected to %s" % event.connection.hostname)
if self.reconnect:
self.reconnect.reset()
def on_transport_tail_closed(self, event):
event.transport.close_head()
def on_transport_closed(self, event):
if self.connection is None: return
if self.connection.state & Endpoint.LOCAL_ACTIVE:
if self.reconnect:
event.transport.unbind()
delay = self.reconnect.next()
if delay == 0:
_logger.info("Disconnected, reconnecting...")
self._connect(self.connection)
return
else:
_logger.info("Disconnected will try to reconnect after %s seconds" % delay)
event.reactor.schedule(delay, self)
return
else:
_logger.debug("Disconnected")
# See connector.cpp: conn.free()/pn_connection_release() here?
self.connection = None
def on_timer_task(self, event):
self._connect(self.connection)
class Backoff(object):
"""
A reconnect strategy involving an increasing delay between
retries, up to a maximum or 10 seconds.
"""
def __init__(self):
self.delay = 0
def reset(self):
self.delay = 0
def next(self):
current = self.delay
if current == 0:
self.delay = 0.1
else:
self.delay = min(10, 2 * current)
return current
class Urls(object):
def __init__(self, values):
self.values = [Url(v) for v in values]
self.i = iter(self.values)
def __iter__(self):
return self
def next(self):
try:
return next(self.i)
except StopIteration:
self.i = iter(self.values)
return next(self.i)
class SSLConfig(object):
def __init__(self):
self.client = SSLDomain(SSLDomain.MODE_CLIENT)
self.server = SSLDomain(SSLDomain.MODE_SERVER)
def set_credentials(self, cert_file, key_file, password):
self.client.set_credentials(cert_file, key_file, password)
self.server.set_credentials(cert_file, key_file, password)
def set_trusted_ca_db(self, certificate_db):
self.client.set_trusted_ca_db(certificate_db)
self.server.set_trusted_ca_db(certificate_db)
def _find_config_file():
confname = 'connect.json'
confpath = ['.', '~/.config/messaging','/etc/messaging']
for d in confpath:
f = os.path.join(d, confname)
if os.path.isfile(f):
return f
return None
def _get_default_config():
conf = os.environ.get('MESSAGING_CONNECT_FILE') or _find_config_file()
if conf and os.path.isfile(conf):
with open(conf, 'r') as f:
json_text = f.read()
json_text = _strip_json_comments(json_text)
return json.loads(json_text)
else:
return {}
def _strip_json_comments(json_text):
"""This strips c-style comments from text, taking into account '/*comments*/' and '//comments'
nested inside a string etc."""
def replacer(match):
s = match.group(0)
if s.startswith('/'):
return " " # note: a space and not an empty string
else:
return s
pattern = re.compile(r'//.*?$|/\*.*?\*/|\'(?:\\.|[^\\\'])*\'|"(?:\\.|[^\\"])*"', re.DOTALL | re.MULTILINE)
return re.sub(pattern, replacer, json_text)
def _get_default_port_for_scheme(scheme):
if scheme == 'amqps':
return 5671
else:
return 5672
class Container(Reactor):
"""A representation of the AMQP concept of a 'container', which
loosely speaking is something that establishes links to or from
another container, over which messages are transfered. This is
an extension to the Reactor class that adds convenience methods
for creating connections and sender- or receiver- links.
"""
def __init__(self, *handlers, **kwargs):
super(Container, self).__init__(*handlers, **kwargs)
if "impl" not in kwargs:
try:
self.ssl = SSLConfig()
except SSLUnavailable:
self.ssl = None
self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler))
self.trigger = None
self.container_id = str(_generate_uuid())
self.allow_insecure_mechs = True
self.allowed_mechs = None
self.sasl_enabled = True
self.user = None
self.password = None
def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
**kwargs):
"""
Initiates the establishment of an AMQP connection. Returns an
instance of proton.Connection.
@param url: URL string of process to connect to
@param urls: list of URL strings of process to try to connect to
Only one of url or urls should be specified.
@param reconnect: Reconnect is enabled by default. You can
pass in an instance of Backoff to control reconnect behavior.
A value of False will prevent the library from automatically
trying to reconnect if the underlying socket is disconnected
before the connection has been closed.
@param heartbeat: A value in milliseconds indicating the
desired frequency of heartbeats used to test the underlying
socket is alive.
@param ssl_domain: SSL configuration in the form of an
instance of proton.SSLDomain.
@param handler: a connection scoped handler that will be
called to process any events in the scope of this connection
or its child links
@param kwargs: 'sasl_enabled', which determines whether a sasl
layer is used for the connection. 'allowed_mechs', an optional
string specifying the SASL mechanisms allowed for this
connection; the value is a space-separated list of mechanism
names; the mechanisms allowed by default are determined by
your SASL library and system configuration, with two
exceptions: GSSAPI and GSS-SPNEGO are disabled by default; to
enable them, you must explicitly add them using this option;
clients must set the allowed mechanisms before the the
outgoing connection is attempted; servers must set them before
the listening connection is setup. 'allow_insecure_mechs', a
flag indicating whether insecure mechanisms, such as PLAIN
over a non-encrypted socket, are allowed. 'virtual_host', the
hostname to set in the Open performative used by peer to
determine the correct back-end service for the client; if
'virtual_host' is not supplied the host field from the URL is
used instead. 'user', the user to authenticate. 'password',
the authentication secret.
"""
if not url and not urls and not address:
config = _get_default_config()
scheme = config.get('scheme', 'amqp')
_url = "%s://%s:%s" % (scheme, config.get('host', 'localhost'), config.get('port', _get_default_port_for_scheme(scheme)))
_ssl_domain = None
_kwargs = kwargs
if config.get('user'):
_kwargs['user'] = config.get('user')
if config.get('password'):
_kwargs['password'] = config.get('password')
sasl_config = config.get('sasl', {})
_kwargs['sasl_enabled'] = sasl_config.get('enabled', True)
if sasl_config.get('mechanisms'):
_kwargs['allowed_mechs'] = sasl_config.get('mechanisms')
tls_config = config.get('tls', {})
if scheme == 'amqps':
_ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
ca = tls_config.get('ca')
cert = tls_config.get('cert')
key = tls_config.get('key')
if ca:
_ssl_domain.set_trusted_ca_db(str(ca))
if tls_config.get('verify', True):
_ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, str(ca))
if cert and key:
_ssl_domain.set_credentials(str(cert), str(key), None)
return self._connect(_url, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=_ssl_domain, **_kwargs)
else:
return self._connect(url=url, urls=urls, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=ssl_domain, **kwargs)
def _connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
conn = self.connection(handler)
conn.container = self.container_id or str(_generate_uuid())
conn.offered_capabilities = kwargs.get('offered_capabilities')
conn.desired_capabilities = kwargs.get('desired_capabilities')
conn.properties = kwargs.get('properties')
connector = Connector(conn)
connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
connector.user = kwargs.get('user', self.user)
connector.password = kwargs.get('password', self.password)
connector.virtual_host = kwargs.get('virtual_host')
if connector.virtual_host:
# only set hostname if virtual-host is a non-empty string
conn.hostname = connector.virtual_host
connector.ssl_sni = kwargs.get('sni')
connector.max_frame_size = kwargs.get('max_frame_size')
conn._overrides = connector
if url:
connector.address = Urls([url])
elif urls:
connector.address = Urls(urls)
elif address:
connector.address = address
else:
raise ValueError("One of url, urls or address required")
if heartbeat:
connector.heartbeat = heartbeat
if reconnect:
connector.reconnect = reconnect
elif reconnect is None:
connector.reconnect = Backoff()
# use container's default client domain if none specified. This is
# only necessary of the URL specifies the "amqps:" scheme
connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
conn._session_policy = SessionPerConnection() # todo: make configurable
conn.open()
return conn
def _get_id(self, container, remote, local):
if local and remote:
"%s-%s-%s" % (container, remote, local)
elif local:
return "%s-%s" % (container, local)
elif remote:
return "%s-%s" % (container, remote)
else:
return "%s-%s" % (container, str(_generate_uuid()))
def _get_session(self, context):
if isinstance(context, Url):
return self._get_session(self.connect(url=context))
elif isinstance(context, Session):
return context
elif isinstance(context, Connection):
if hasattr(context, '_session_policy'):
return context._session_policy.session(context)
else:
return _create_session(context)
else:
return context.session()
def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
"""
Initiates the establishment of a link over which messages can
be sent. Returns an instance of proton.Sender.
There are two patterns of use. (1) A connection can be passed
as the first argument, in which case the link is established
on that connection. In this case the target address can be
specified as the second argument (or as a keyword
argument). The source address can also be specified if
desired. (2) Alternatively a URL can be passed as the first
argument. In this case a new connection will be established on
which the link will be attached. If a path is specified and
the target is not, then the path of the URL is used as the
target address.
The name of the link may be specified if desired, otherwise a
unique name will be generated.
Various LinkOptions can be specified to further control the
attachment.
"""
if isstring(context):
context = Url(context)
if isinstance(context, Url) and not target:
target = context.path
session = self._get_session(context)
snd = session.sender(name or self._get_id(session.connection.container, target, source))
if source:
snd.source.address = source
if target:
snd.target.address = target
if handler is not None:
snd.handler = handler
if tags:
snd.tag_generator = tags
_apply_link_options(options, snd)
snd.open()
return snd
def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
"""
Initiates the establishment of a link over which messages can
be received (aka a subscription). Returns an instance of
proton.Receiver.
There are two patterns of use. (1) A connection can be passed
as the first argument, in which case the link is established
on that connection. In this case the source address can be
specified as the second argument (or as a keyword
argument). The target address can also be specified if
desired. (2) Alternatively a URL can be passed as the first
argument. In this case a new connection will be established on
which the link will be attached. If a path is specified and
the source is not, then the path of the URL is used as the
target address.
The name of the link may be specified if desired, otherwise a
unique name will be generated.
Various LinkOptions can be specified to further control the
attachment.
"""
if isstring(context):
context = Url(context)
if isinstance(context, Url) and not source:
source = context.path
session = self._get_session(context)
rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
if source:
rcv.source.address = source
if dynamic:
rcv.source.dynamic = True
if target:
rcv.target.address = target
if handler is not None:
rcv.handler = handler
_apply_link_options(options, rcv)
rcv.open()
return rcv
def declare_transaction(self, context, handler=None, settle_before_discharge=False):
if not _get_attr(context, '_txn_ctrl'):
class InternalTransactionHandler(OutgoingMessageHandler):
def __init__(self):
super(InternalTransactionHandler, self).__init__(auto_settle=True)
def on_settled(self, event):
if hasattr(event.delivery, "transaction"):
event.transaction = event.delivery.transaction
event.delivery.transaction.handle_outcome(event)
def on_unhandled(self, method, event):
if handler:
event.dispatch(handler)
context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
context._txn_ctrl.target.type = Terminus.COORDINATOR
context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
return Transaction(context._txn_ctrl, handler, settle_before_discharge)
def listen(self, url, ssl_domain=None):
"""
Initiates a server socket, accepting incoming AMQP connections
on the interface and port specified.
"""
url = Url(url)
acceptor = self.acceptor(url.host, url.port)
ssl_config = ssl_domain
if not ssl_config and url.scheme == 'amqps':
# use container's default server domain
if self.ssl:
ssl_config = self.ssl.server
else:
raise SSLUnavailable("amqps: SSL libraries not found")
if ssl_config:
acceptor.set_ssl_domain(ssl_config)
return acceptor
def do_work(self, timeout=None):
if timeout:
self.timeout = timeout
return self.process()