| from __future__ import absolute_import |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| import logging, os, socket, time, types |
| from heapq import heappush, heappop, nsmallest |
| from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch |
| from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message |
| from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol |
| from proton import Terminus, Timeout, Transport, TransportException, ulong, Url |
| from select import select |
| from proton.handlers import OutgoingMessageHandler |
| from proton import unicode2utf8, utf82unicode |
| |
| import traceback |
| from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable |
| from .wrapper import Wrapper, PYCTX |
| from cproton import * |
| from . import _compat |
| |
| try: |
| import Queue |
| except ImportError: |
| import queue as Queue |
| |
| class Task(Wrapper): |
| |
| @staticmethod |
| def wrap(impl): |
| if impl is None: |
| return None |
| else: |
| return Task(impl) |
| |
| def __init__(self, impl): |
| Wrapper.__init__(self, impl, pn_task_attachments) |
| |
| def _init(self): |
| pass |
| |
| def cancel(self): |
| pn_task_cancel(self._impl) |
| |
| class Acceptor(Wrapper): |
| |
| def __init__(self, impl): |
| Wrapper.__init__(self, impl) |
| |
| def set_ssl_domain(self, ssl_domain): |
| pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain) |
| |
| def close(self): |
| pn_acceptor_close(self._impl) |
| |
| class Reactor(Wrapper): |
| |
| @staticmethod |
| def wrap(impl): |
| if impl is None: |
| return None |
| else: |
| record = pn_reactor_attachments(impl) |
| attrs = pn_void2py(pn_record_get(record, PYCTX)) |
| if attrs and 'subclass' in attrs: |
| return attrs['subclass'](impl=impl) |
| else: |
| return Reactor(impl=impl) |
| |
| def __init__(self, *handlers, **kwargs): |
| Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) |
| for h in handlers: |
| self.handler.add(h) |
| |
| def _init(self): |
| self.errors = [] |
| |
| def on_error(self, info): |
| self.errors.append(info) |
| self.yield_() |
| |
| def _get_global(self): |
| return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error) |
| |
| def _set_global(self, handler): |
| impl = _chandler(handler, self.on_error) |
| pn_reactor_set_global_handler(self._impl, impl) |
| pn_decref(impl) |
| |
| global_handler = property(_get_global, _set_global) |
| |
| def _get_timeout(self): |
| return millis2timeout(pn_reactor_get_timeout(self._impl)) |
| |
| def _set_timeout(self, secs): |
| return pn_reactor_set_timeout(self._impl, timeout2millis(secs)) |
| |
| timeout = property(_get_timeout, _set_timeout) |
| |
| def yield_(self): |
| pn_reactor_yield(self._impl) |
| |
| def mark(self): |
| return pn_reactor_mark(self._impl) |
| |
| def _get_handler(self): |
| return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error) |
| |
| def _set_handler(self, handler): |
| impl = _chandler(handler, self.on_error) |
| pn_reactor_set_handler(self._impl, impl) |
| pn_decref(impl) |
| |
| handler = property(_get_handler, _set_handler) |
| |
| def run(self): |
| self.timeout = 3.14159265359 |
| self.start() |
| while self.process(): pass |
| self.stop() |
| self.process() |
| self.global_handler = None |
| self.handler = None |
| |
| def wakeup(self): |
| n = pn_reactor_wakeup(self._impl) |
| if n: raise IOError(pn_error_text(pn_reactor_error(self._impl))) |
| |
| def start(self): |
| pn_reactor_start(self._impl) |
| |
| @property |
| def quiesced(self): |
| return pn_reactor_quiesced(self._impl) |
| |
| def _check_errors(self): |
| if self.errors: |
| for exc, value, tb in self.errors[:-1]: |
| traceback.print_exception(exc, value, tb) |
| exc, value, tb = self.errors[-1] |
| _compat.raise_(exc, value, tb) |
| |
| def process(self): |
| result = pn_reactor_process(self._impl) |
| self._check_errors() |
| return result |
| |
| def stop(self): |
| pn_reactor_stop(self._impl) |
| self._check_errors() |
| |
| def schedule(self, delay, task): |
| impl = _chandler(task, self.on_error) |
| task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) |
| pn_decref(impl) |
| return task |
| |
| def acceptor(self, host, port, handler=None): |
| impl = _chandler(handler, self.on_error) |
| aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) |
| pn_decref(impl) |
| if aimpl: |
| return Acceptor(aimpl) |
| else: |
| raise IOError("%s (%s:%s)" % pn_error_text(pn_reactor_error(self._impl)), host, port) |
| |
| def connection(self, handler=None): |
| """Deprecated: use connection_to_host() instead |
| """ |
| impl = _chandler(handler, self.on_error) |
| result = Connection.wrap(pn_reactor_connection(self._impl, impl)) |
| if impl: pn_decref(impl) |
| return result |
| |
| def connection_to_host(self, host, port, handler=None): |
| """Create an outgoing Connection that will be managed by the reactor. |
| The reator's pn_iohandler will create a socket connection to the host |
| once the connection is opened. |
| """ |
| conn = self.connection(handler) |
| self.set_connection_host(conn, host, port) |
| return conn |
| |
| def set_connection_host(self, connection, host, port): |
| """Change the address used by the connection. The address is |
| used by the reactor's iohandler to create an outgoing socket |
| connection. This must be set prior to opening the connection. |
| """ |
| pn_reactor_set_connection_host(self._impl, |
| connection._impl, |
| unicode2utf8(str(host)), |
| unicode2utf8(str(port))) |
| |
| def get_connection_address(self, connection): |
| """This may be used to retrieve the remote peer address. |
| @return: string containing the address in URL format or None if no |
| address is available. Use the proton.Url class to create a Url object |
| from the returned value. |
| """ |
| _url = pn_reactor_get_connection_address(self._impl, connection._impl) |
| return utf82unicode(_url) |
| |
| def selectable(self, handler=None): |
| impl = _chandler(handler, self.on_error) |
| result = Selectable.wrap(pn_reactor_selectable(self._impl)) |
| if impl: |
| record = pn_selectable_attachments(result._impl) |
| pn_record_set_handler(record, impl) |
| pn_decref(impl) |
| return result |
| |
| def update(self, sel): |
| pn_reactor_update(self._impl, sel._impl) |
| |
| def push_event(self, obj, etype): |
| pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number) |
| |
| from proton import wrappers as _wrappers |
| _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) |
| _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x)) |
| |
| |
| class EventInjector(object): |
| """ |
| Can be added to a reactor to allow events to be triggered by an |
| external thread but handled on the event thread associated with |
| the reactor. An instance of this class can be passed to the |
| Reactor.selectable() method of the reactor in order to activate |
| it. The close() method should be called when it is no longer |
| needed, to allow the event loop to end if needed. |
| """ |
| def __init__(self): |
| self.queue = Queue.Queue() |
| self.pipe = os.pipe() |
| self._closed = False |
| |
| def trigger(self, event): |
| """ |
| Request that the given event be dispatched on the event thread |
| of the reactor to which this EventInjector was added. |
| """ |
| self.queue.put(event) |
| os.write(self.pipe[1], _compat.str2bin("!")) |
| |
| def close(self): |
| """ |
| Request that this EventInjector be closed. Existing events |
| will be dispctahed on the reactors event dispactch thread, |
| then this will be removed from the set of interest. |
| """ |
| self._closed = True |
| os.write(self.pipe[1], _compat.str2bin("!")) |
| |
| def fileno(self): |
| return self.pipe[0] |
| |
| def on_selectable_init(self, event): |
| sel = event.context |
| sel.fileno(self.fileno()) |
| sel.reading = True |
| event.reactor.update(sel) |
| |
| def on_selectable_readable(self, event): |
| os.read(self.pipe[0], 512) |
| while not self.queue.empty(): |
| requested = self.queue.get() |
| event.reactor.push_event(requested.context, requested.type) |
| if self._closed: |
| s = event.context |
| s.terminate() |
| event.reactor.update(s) |
| |
| |
| class ApplicationEvent(EventBase): |
| """ |
| Application defined event, which can optionally be associated with |
| an engine object and or an arbitrary subject |
| """ |
| def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None): |
| super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) |
| self.connection = connection |
| self.session = session |
| self.link = link |
| self.delivery = delivery |
| if self.delivery: |
| self.link = self.delivery.link |
| if self.link: |
| self.session = self.link.session |
| if self.session: |
| self.connection = self.session.connection |
| self.subject = subject |
| |
| def __repr__(self): |
| objects = [self.connection, self.session, self.link, self.delivery, self.subject] |
| return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None])) |
| |
| class Transaction(object): |
| """ |
| Class to track state of an AMQP 1.0 transaction. |
| """ |
| def __init__(self, txn_ctrl, handler, settle_before_discharge=False): |
| self.txn_ctrl = txn_ctrl |
| self.handler = handler |
| self.id = None |
| self._declare = None |
| self._discharge = None |
| self.failed = False |
| self._pending = [] |
| self.settle_before_discharge = settle_before_discharge |
| self.declare() |
| |
| def commit(self): |
| self.discharge(False) |
| |
| def abort(self): |
| self.discharge(True) |
| |
| def declare(self): |
| self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None]) |
| |
| def discharge(self, failed): |
| self.failed = failed |
| self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed]) |
| |
| def _send_ctrl(self, descriptor, value): |
| delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) |
| delivery.transaction = self |
| return delivery |
| |
| def send(self, sender, msg, tag=None): |
| dlv = sender.send(msg, tag=tag) |
| dlv.local.data = [self.id] |
| dlv.update(0x34) |
| return dlv |
| |
| def accept(self, delivery): |
| self.update(delivery, PN_ACCEPTED) |
| if self.settle_before_discharge: |
| delivery.settle() |
| else: |
| self._pending.append(delivery) |
| |
| def update(self, delivery, state=None): |
| if state: |
| delivery.local.data = [self.id, Described(ulong(state), [])] |
| delivery.update(0x34) |
| |
| def _release_pending(self): |
| for d in self._pending: |
| d.update(Delivery.RELEASED) |
| d.settle() |
| self._clear_pending() |
| |
| def _clear_pending(self): |
| self._pending = [] |
| |
| def handle_outcome(self, event): |
| if event.delivery == self._declare: |
| if event.delivery.remote.data: |
| self.id = event.delivery.remote.data[0] |
| self.handler.on_transaction_declared(event) |
| elif event.delivery.remote_state == Delivery.REJECTED: |
| self.handler.on_transaction_declare_failed(event) |
| else: |
| logging.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) |
| self.handler.on_transaction_declare_failed(event) |
| elif event.delivery == self._discharge: |
| if event.delivery.remote_state == Delivery.REJECTED: |
| if not self.failed: |
| self.handler.on_transaction_commit_failed(event) |
| self._release_pending() # make this optional? |
| else: |
| if self.failed: |
| self.handler.on_transaction_aborted(event) |
| self._release_pending() |
| else: |
| self.handler.on_transaction_committed(event) |
| self._clear_pending() |
| |
| class LinkOption(object): |
| """ |
| Abstract interface for link configuration options |
| """ |
| def apply(self, link): |
| """ |
| Subclasses will implement any configuration logic in this |
| method |
| """ |
| pass |
| def test(self, link): |
| """ |
| Subclasses can override this to selectively apply an option |
| e.g. based on some link criteria |
| """ |
| return True |
| |
| class AtMostOnce(LinkOption): |
| def apply(self, link): |
| link.snd_settle_mode = Link.SND_SETTLED |
| |
| class AtLeastOnce(LinkOption): |
| def apply(self, link): |
| link.snd_settle_mode = Link.SND_UNSETTLED |
| link.rcv_settle_mode = Link.RCV_FIRST |
| |
| class SenderOption(LinkOption): |
| def apply(self, sender): pass |
| def test(self, link): return link.is_sender |
| |
| class ReceiverOption(LinkOption): |
| def apply(self, receiver): pass |
| def test(self, link): return link.is_receiver |
| |
| class DynamicNodeProperties(LinkOption): |
| def __init__(self, props={}): |
| self.properties = {} |
| for k in props: |
| if isinstance(k, symbol): |
| self.properties[k] = props[k] |
| else: |
| self.properties[symbol(k)] = props[k] |
| |
| def apply(self, link): |
| if link.is_receiver: |
| link.source.properties.put_dict(self.properties) |
| else: |
| link.target.properties.put_dict(self.properties) |
| |
| class Filter(ReceiverOption): |
| def __init__(self, filter_set={}): |
| self.filter_set = filter_set |
| |
| def apply(self, receiver): |
| receiver.source.filter.put_dict(self.filter_set) |
| |
| class Selector(Filter): |
| """ |
| Configures a link with a message selector filter |
| """ |
| def __init__(self, value, name='selector'): |
| super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)}) |
| |
| class DurableSubscription(ReceiverOption): |
| def apply(self, receiver): |
| receiver.source.durability = Terminus.DELIVERIES |
| receiver.source.expiry_policy = Terminus.EXPIRE_NEVER |
| |
| class Move(ReceiverOption): |
| def apply(self, receiver): |
| receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE |
| |
| class Copy(ReceiverOption): |
| def apply(self, receiver): |
| receiver.source.distribution_mode = Terminus.DIST_MODE_COPY |
| |
| def _apply_link_options(options, link): |
| if options: |
| if isinstance(options, list): |
| for o in options: |
| if o.test(link): o.apply(link) |
| else: |
| if options.test(link): options.apply(link) |
| |
| def _create_session(connection, handler=None): |
| session = connection.session() |
| session.open() |
| return session |
| |
| |
| def _get_attr(target, name): |
| if hasattr(target, name): |
| return getattr(target, name) |
| else: |
| return None |
| |
| class SessionPerConnection(object): |
| def __init__(self): |
| self._default_session = None |
| |
| def session(self, connection): |
| if not self._default_session: |
| self._default_session = _create_session(connection) |
| self._default_session.context = self |
| return self._default_session |
| |
| def on_session_remote_close(self, event): |
| event.connection.close() |
| self._default_session = None |
| |
| class GlobalOverrides(object): |
| """ |
| Internal handler that triggers the necessary socket connect for an |
| opened connection. |
| """ |
| def __init__(self, base): |
| self.base = base |
| |
| def on_unhandled(self, name, event): |
| if not self._override(event): |
| event.dispatch(self.base) |
| |
| def _override(self, event): |
| conn = event.connection |
| return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides) |
| |
| class Connector(Handler): |
| """ |
| Internal handler that triggers the necessary socket connect for an |
| opened connection. |
| """ |
| def __init__(self, connection): |
| self.connection = connection |
| self.address = None |
| self.heartbeat = None |
| self.reconnect = None |
| self.ssl_domain = None |
| self.allow_insecure_mechs = True |
| self.allowed_mechs = None |
| self.sasl_enabled = True |
| self.user = None |
| self.password = None |
| self.virtual_host = None |
| self.ssl_sni = None |
| |
| def _connect(self, connection, reactor): |
| assert(reactor is not None) |
| url = self.address.next() |
| reactor.set_connection_host(connection, url.host, str(url.port)) |
| # if virtual-host not set, use host from address as default |
| if self.virtual_host is None: |
| connection.hostname = url.host |
| logging.debug("connecting to %s..." % url) |
| |
| transport = Transport() |
| if self.sasl_enabled: |
| sasl = transport.sasl() |
| sasl.allow_insecure_mechs = self.allow_insecure_mechs |
| if url.username: |
| connection.user = url.username |
| elif self.user: |
| connection.user = self.user |
| if url.password: |
| connection.password = url.password |
| elif self.password: |
| connection.password = self.password |
| if self.allowed_mechs: |
| sasl.allowed_mechs(self.allowed_mechs) |
| transport.bind(connection) |
| if self.heartbeat: |
| transport.idle_timeout = self.heartbeat |
| if url.scheme == 'amqps': |
| if not self.ssl_domain: |
| raise SSLUnavailable("amqps: SSL libraries not found") |
| self.ssl = SSL(transport, self.ssl_domain) |
| self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host |
| |
| def on_connection_local_open(self, event): |
| self._connect(event.connection, event.reactor) |
| |
| def on_connection_remote_open(self, event): |
| logging.debug("connected to %s" % event.connection.hostname) |
| if self.reconnect: |
| self.reconnect.reset() |
| self.transport = None |
| |
| def on_transport_tail_closed(self, event): |
| self.on_transport_closed(event) |
| |
| def on_transport_closed(self, event): |
| if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE: |
| if self.reconnect: |
| event.transport.unbind() |
| delay = self.reconnect.next() |
| if delay == 0: |
| logging.info("Disconnected, reconnecting...") |
| self._connect(self.connection, event.reactor) |
| else: |
| logging.info("Disconnected will try to reconnect after %s seconds" % delay) |
| event.reactor.schedule(delay, self) |
| else: |
| logging.debug("Disconnected") |
| self.connection = None |
| |
| def on_timer_task(self, event): |
| self._connect(self.connection, event.reactor) |
| |
| def on_connection_remote_close(self, event): |
| self.connection = None |
| |
| class Backoff(object): |
| """ |
| A reconnect strategy involving an increasing delay between |
| retries, up to a maximum or 10 seconds. |
| """ |
| def __init__(self): |
| self.delay = 0 |
| |
| def reset(self): |
| self.delay = 0 |
| |
| def next(self): |
| current = self.delay |
| if current == 0: |
| self.delay = 0.1 |
| else: |
| self.delay = min(10, 2*current) |
| return current |
| |
| class Urls(object): |
| def __init__(self, values): |
| self.values = [Url(v) for v in values] |
| self.i = iter(self.values) |
| |
| def __iter__(self): |
| return self |
| |
| def next(self): |
| try: |
| return next(self.i) |
| except StopIteration: |
| self.i = iter(self.values) |
| return next(self.i) |
| |
| class SSLConfig(object): |
| def __init__(self): |
| self.client = SSLDomain(SSLDomain.MODE_CLIENT) |
| self.server = SSLDomain(SSLDomain.MODE_SERVER) |
| |
| def set_credentials(self, cert_file, key_file, password): |
| self.client.set_credentials(cert_file, key_file, password) |
| self.server.set_credentials(cert_file, key_file, password) |
| |
| def set_trusted_ca_db(self, certificate_db): |
| self.client.set_trusted_ca_db(certificate_db) |
| self.server.set_trusted_ca_db(certificate_db) |
| |
| |
| class Container(Reactor): |
| """A representation of the AMQP concept of a 'container', which |
| lossely speaking is something that establishes links to or from |
| another container, over which messages are transfered. This is |
| an extension to the Reactor class that adds convenience methods |
| for creating connections and sender- or receiver- links. |
| """ |
| def __init__(self, *handlers, **kwargs): |
| super(Container, self).__init__(*handlers, **kwargs) |
| if "impl" not in kwargs: |
| try: |
| self.ssl = SSLConfig() |
| except SSLUnavailable: |
| self.ssl = None |
| self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) |
| self.trigger = None |
| self.container_id = str(generate_uuid()) |
| self.allow_insecure_mechs = True |
| self.allowed_mechs = None |
| self.sasl_enabled = True |
| self.user = None |
| self.password = None |
| Wrapper.__setattr__(self, 'subclass', self.__class__) |
| |
| def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs): |
| """ |
| Initiates the establishment of an AMQP connection. Returns an |
| instance of proton.Connection. |
| |
| @param url: URL string of process to connect to |
| |
| @param urls: list of URL strings of process to try to connect to |
| |
| Only one of url or urls should be specified. |
| |
| @param reconnect: A value of False will prevent the library |
| form automatically trying to reconnect if the underlying |
| socket is disconnected before the connection has been closed. |
| |
| @param heartbeat: A value in milliseconds indicating the |
| desired frequency of heartbeats used to test the underlying |
| socket is alive. |
| |
| @param ssl_domain: SSL configuration in the form of an |
| instance of proton.SSLdomain. |
| |
| @param handler: a connection scoped handler that will be |
| called to process any events in the scope of this connection |
| or its child links |
| |
| @param kwargs: sasl_enabled, which determines whether a sasl layer is |
| used for the connection; allowed_mechs an optional list of SASL |
| mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag |
| indicating whether insecure mechanisms, such as PLAIN over a |
| non-encrypted socket, are allowed; 'virtual_host' the hostname to set |
| in the Open performative used by peer to determine the correct |
| back-end service for the client. If 'virtual_host' is not supplied the |
| host field from the URL is used instead." |
| |
| """ |
| conn = self.connection(handler) |
| conn.container = self.container_id or str(generate_uuid()) |
| conn.offered_capabilities = kwargs.get('offered_capabilities') |
| conn.desired_capabilities = kwargs.get('desired_capabilities') |
| conn.properties = kwargs.get('properties') |
| |
| connector = Connector(conn) |
| connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) |
| connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) |
| connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) |
| connector.user = kwargs.get('user', self.user) |
| connector.password = kwargs.get('password', self.password) |
| connector.virtual_host = kwargs.get('virtual_host') |
| if connector.virtual_host: |
| # only set hostname if virtual-host is a non-empty string |
| conn.hostname = connector.virtual_host |
| connector.ssl_sni = kwargs.get('sni') |
| |
| conn._overrides = connector |
| if url: connector.address = Urls([url]) |
| elif urls: connector.address = Urls(urls) |
| elif address: connector.address = address |
| else: raise ValueError("One of url, urls or address required") |
| if heartbeat: |
| connector.heartbeat = heartbeat |
| if reconnect: |
| connector.reconnect = reconnect |
| elif reconnect is None: |
| connector.reconnect = Backoff() |
| # use container's default client domain if none specified. This is |
| # only necessary of the URL specifies the "amqps:" scheme |
| connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) |
| conn._session_policy = SessionPerConnection() #todo: make configurable |
| conn.open() |
| return conn |
| |
| def _get_id(self, container, remote, local): |
| if local and remote: "%s-%s-%s" % (container, remote, local) |
| elif local: return "%s-%s" % (container, local) |
| elif remote: return "%s-%s" % (container, remote) |
| else: return "%s-%s" % (container, str(generate_uuid())) |
| |
| def _get_session(self, context): |
| if isinstance(context, Url): |
| return self._get_session(self.connect(url=context)) |
| elif isinstance(context, Session): |
| return context |
| elif isinstance(context, Connection): |
| if hasattr(context, '_session_policy'): |
| return context._session_policy.session(context) |
| else: |
| return _create_session(context) |
| else: |
| return context.session() |
| |
| def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None): |
| """ |
| Initiates the establishment of a link over which messages can |
| be sent. Returns an instance of proton.Sender. |
| |
| There are two patterns of use. (1) A connection can be passed |
| as the first argument, in which case the link is established |
| on that connection. In this case the target address can be |
| specified as the second argument (or as a keyword |
| argument). The source address can also be specified if |
| desired. (2) Alternatively a URL can be passed as the first |
| argument. In this case a new connection will be establised on |
| which the link will be attached. If a path is specified and |
| the target is not, then the path of the URL is used as the |
| target address. |
| |
| The name of the link may be specified if desired, otherwise a |
| unique name will be generated. |
| |
| Various LinkOptions can be specified to further control the |
| attachment. |
| """ |
| if isinstance(context, _compat.STRING_TYPES): |
| context = Url(context) |
| if isinstance(context, Url) and not target: |
| target = context.path |
| session = self._get_session(context) |
| snd = session.sender(name or self._get_id(session.connection.container, target, source)) |
| if source: |
| snd.source.address = source |
| if target: |
| snd.target.address = target |
| if handler != None: |
| snd.handler = handler |
| if tags: |
| snd.tag_generator = tags |
| _apply_link_options(options, snd) |
| snd.open() |
| return snd |
| |
| def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None): |
| """ |
| Initiates the establishment of a link over which messages can |
| be received (aka a subscription). Returns an instance of |
| proton.Receiver. |
| |
| There are two patterns of use. (1) A connection can be passed |
| as the first argument, in which case the link is established |
| on that connection. In this case the source address can be |
| specified as the second argument (or as a keyword |
| argument). The target address can also be specified if |
| desired. (2) Alternatively a URL can be passed as the first |
| argument. In this case a new connection will be establised on |
| which the link will be attached. If a path is specified and |
| the source is not, then the path of the URL is used as the |
| target address. |
| |
| The name of the link may be specified if desired, otherwise a |
| unique name will be generated. |
| |
| Various LinkOptions can be specified to further control the |
| attachment. |
| """ |
| if isinstance(context, _compat.STRING_TYPES): |
| context = Url(context) |
| if isinstance(context, Url) and not source: |
| source = context.path |
| session = self._get_session(context) |
| rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) |
| if source: |
| rcv.source.address = source |
| if dynamic: |
| rcv.source.dynamic = True |
| if target: |
| rcv.target.address = target |
| if handler != None: |
| rcv.handler = handler |
| _apply_link_options(options, rcv) |
| rcv.open() |
| return rcv |
| |
| def declare_transaction(self, context, handler=None, settle_before_discharge=False): |
| if not _get_attr(context, '_txn_ctrl'): |
| class InternalTransactionHandler(OutgoingMessageHandler): |
| def __init__(self): |
| super(InternalTransactionHandler, self).__init__(auto_settle=True) |
| |
| def on_settled(self, event): |
| if hasattr(event.delivery, "transaction"): |
| event.transaction = event.delivery.transaction |
| event.delivery.transaction.handle_outcome(event) |
| context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) |
| context._txn_ctrl.target.type = Terminus.COORDINATOR |
| context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) |
| return Transaction(context._txn_ctrl, handler, settle_before_discharge) |
| |
| def listen(self, url, ssl_domain=None): |
| """ |
| Initiates a server socket, accepting incoming AMQP connections |
| on the interface and port specified. |
| """ |
| url = Url(url) |
| acceptor = self.acceptor(url.host, url.port) |
| ssl_config = ssl_domain |
| if not ssl_config and url.scheme == 'amqps': |
| # use container's default server domain |
| if self.ssl: |
| ssl_config = self.ssl.server |
| else: |
| raise SSLUnavailable("amqps: SSL libraries not found") |
| if ssl_config: |
| acceptor.set_ssl_domain(ssl_config) |
| return acceptor |
| |
| def do_work(self, timeout=None): |
| if timeout: |
| self.timeout = timeout |
| return self.process() |