| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| """ |
| A candidate high level messaging API for python. |
| |
| Areas that still need work: |
| |
| - definition of the arguments for L{Session.sender} and L{Session.receiver} |
| - standard L{Message} properties |
| - L{Message} content encoding |
| - protocol negotiation/multiprotocol impl |
| """ |
| |
| from logging import getLogger |
| from math import ceil |
| from qpid.codec010 import StringCodec |
| from qpid.concurrency import synchronized, Waiter, Condition |
| from qpid.datatypes import Serial, uuid4 |
| from qpid.messaging.constants import * |
| from qpid.messaging.exceptions import * |
| from qpid.messaging.message import * |
| from qpid.ops import PRIMITIVE |
| from qpid.util import default, URL |
| from threading import Thread, RLock |
| |
| log = getLogger("qpid.messaging") |
| |
| static = staticmethod |
| |
| class Endpoint: |
| |
| def _ecwait(self, predicate, timeout=None): |
| result = self._ewait(lambda: self.closed or predicate(), timeout) |
| self.check_closed() |
| return result |
| |
| class Connection(Endpoint): |
| |
| """ |
| A Connection manages a group of L{Sessions<Session>} and connects |
| them with a remote endpoint. |
| """ |
| |
| @static |
| def establish(url=None, **options): |
| """ |
| Constructs a L{Connection} with the supplied parameters and opens |
| it. |
| """ |
| conn = Connection(url, **options) |
| conn.open() |
| return conn |
| |
| def __init__(self, url=None, **options): |
| """ |
| Creates a connection. A newly created connection must be connected |
| with the Connection.connect() method before it can be used. |
| |
| @type url: str |
| @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] |
| @type host: str |
| @param host: the name or ip address of the remote host (overriden by url) |
| @type port: int |
| @param port: the port number of the remote host (overriden by url) |
| @type transport: str |
| @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) |
| @type heartbeat: int |
| @param heartbeat: heartbeat interval in seconds |
| |
| @type username: str |
| @param username: the username for authentication (overriden by url) |
| @type password: str |
| @param password: the password for authentication (overriden by url) |
| |
| @type sasl_mechanisms: str |
| @param sasl_mechanisms: space separated list of permitted sasl mechanisms |
| @type sasl_service: str |
| @param sasl_service: ??? |
| @type sasl_min_ssf: ??? |
| @param sasl_min_ssf: ??? |
| @type sasl_max_ssf: ??? |
| @param sasl_max_ssf: ??? |
| |
| @type reconnect: bool |
| @param reconnect: enable/disable automatic reconnect |
| @type reconnect_timeout: float |
| @param reconnect_timeout: total time to attempt reconnect |
| @type reconnect_internal_min: float |
| @param reconnect_internal_min: minimum interval between reconnect attempts |
| @type reconnect_internal_max: float |
| @param reconnect_internal_max: maximum interval between reconnect attempts |
| @type reconnect_internal: float |
| @param reconnect_interval: set both min and max reconnect intervals |
| @type reconnect_limit: int |
| @param reconnect_limit: limit the total number of reconnect attempts |
| @type reconnect_urls: list[str] |
| @param reconnect_urls: list of backup hosts specified as urls |
| |
| @type address_ttl: float |
| @param address_ttl: time until cached address resolution expires |
| |
| @type ssl_keyfile: str |
| @param ssl_keyfile: file with client's private key (PEM format) |
| @type ssl_certfile: str |
| @param ssl_certfile: file with client's public (eventually priv+pub) key (PEM format) |
| @type ssl_trustfile: str |
| @param ssl_trustfile: file trusted certificates to validate the server |
| |
| @rtype: Connection |
| @return: a disconnected Connection |
| """ |
| if url is None: |
| url = options.get("host") |
| if isinstance(url, basestring): |
| url = URL(url) |
| self.host = url.host |
| if options.has_key("transport"): |
| self.transport = options.get("transport") |
| elif url.scheme == url.AMQP: |
| self.transport = "tcp" |
| elif url.scheme == url.AMQPS: |
| self.transport = "ssl" |
| else: |
| self.transport = "tcp" |
| if self.transport in ("ssl", "tcp+tls"): |
| self.port = default(url.port, options.get("port", AMQPS_PORT)) |
| else: |
| self.port = default(url.port, options.get("port", AMQP_PORT)) |
| self.heartbeat = options.get("heartbeat") |
| self.username = default(url.user, options.get("username", None)) |
| self.password = default(url.password, options.get("password", None)) |
| self.auth_username = None |
| |
| self.sasl_mechanisms = options.get("sasl_mechanisms") |
| self.sasl_service = options.get("sasl_service", "qpidd") |
| self.sasl_min_ssf = options.get("sasl_min_ssf") |
| self.sasl_max_ssf = options.get("sasl_max_ssf") |
| |
| self.reconnect = options.get("reconnect", False) |
| self.reconnect_timeout = options.get("reconnect_timeout") |
| reconnect_interval = options.get("reconnect_interval") |
| self.reconnect_interval_min = options.get("reconnect_interval_min", |
| default(reconnect_interval, 1)) |
| self.reconnect_interval_max = options.get("reconnect_interval_max", |
| default(reconnect_interval, 2*60)) |
| self.reconnect_limit = options.get("reconnect_limit") |
| self.reconnect_urls = options.get("reconnect_urls", []) |
| self.reconnect_log = options.get("reconnect_log", True) |
| |
| self.address_ttl = options.get("address_ttl", 60) |
| self.tcp_nodelay = options.get("tcp_nodelay", False) |
| |
| self.ssl_keyfile = options.get("ssl_keyfile", None) |
| self.ssl_certfile = options.get("ssl_certfile", None) |
| self.ssl_trustfile = options.get("ssl_trustfile", None) |
| self.client_properties = options.get("client_properties", {}) |
| |
| self.options = options |
| |
| |
| self.id = str(uuid4()) |
| self.session_counter = 0 |
| self.sessions = {} |
| self._open = False |
| self._connected = False |
| self._transport_connected = False |
| self._lock = RLock() |
| self._condition = Condition(self._lock) |
| self._waiter = Waiter(self._condition) |
| self._modcount = Serial(0) |
| self.error = None |
| from driver import Driver |
| self._driver = Driver(self) |
| |
| def _wait(self, predicate, timeout=None): |
| return self._waiter.wait(predicate, timeout=timeout) |
| |
| def _wakeup(self): |
| self._modcount += 1 |
| self._driver.wakeup() |
| |
| def check_error(self): |
| if self.error: |
| self._condition.gc() |
| raise self.error |
| |
| def get_error(self): |
| return self.error |
| |
| def _ewait(self, predicate, timeout=None): |
| result = self._wait(lambda: self.error or predicate(), timeout) |
| self.check_error() |
| return result |
| |
| def check_closed(self): |
| if not self._connected: |
| self._condition.gc() |
| raise ConnectionClosed() |
| |
| @synchronized |
| def session(self, name=None, transactional=False): |
| """ |
| Creates or retrieves the named session. If the name is omitted or |
| None, then a unique name is chosen based on a randomly generated |
| uuid. |
| |
| @type name: str |
| @param name: the session name |
| @rtype: Session |
| @return: the named Session |
| """ |
| |
| if name is None: |
| name = "%s:%s" % (self.id, self.session_counter) |
| self.session_counter += 1 |
| else: |
| name = "%s:%s" % (self.id, name) |
| |
| if self.sessions.has_key(name): |
| return self.sessions[name] |
| else: |
| ssn = Session(self, name, transactional) |
| self.sessions[name] = ssn |
| self._wakeup() |
| return ssn |
| |
| @synchronized |
| def _remove_session(self, ssn): |
| self.sessions.pop(ssn.name, 0) |
| |
| @synchronized |
| def open(self): |
| """ |
| Opens a connection. |
| """ |
| if self._open: |
| raise ConnectionError("already open") |
| self._open = True |
| self.attach() |
| |
| @synchronized |
| def opened(self): |
| """ |
| Return true if the connection is open, false otherwise. |
| """ |
| return self._open |
| |
| @synchronized |
| def attach(self): |
| """ |
| Attach to the remote endpoint. |
| """ |
| if not self._connected: |
| self._connected = True |
| self._driver.start() |
| self._wakeup() |
| self._ewait(lambda: self._transport_connected and not self._unlinked()) |
| |
| def _unlinked(self): |
| return [l |
| for ssn in self.sessions.values() |
| if not (ssn.error or ssn.closed) |
| for l in ssn.senders + ssn.receivers |
| if not (l.linked or l.error or l.closed)] |
| |
| @synchronized |
| def detach(self, timeout=None): |
| """ |
| Detach from the remote endpoint. |
| """ |
| if self._connected: |
| self._connected = False |
| self._wakeup() |
| cleanup = True |
| else: |
| cleanup = False |
| try: |
| if not self._wait(lambda: not self._transport_connected, timeout=timeout): |
| raise Timeout("detach timed out") |
| finally: |
| if cleanup: |
| self._driver.stop() |
| self._condition.gc() |
| |
| @synchronized |
| def attached(self): |
| """ |
| Return true if the connection is attached, false otherwise. |
| """ |
| return self._connected |
| |
| @synchronized |
| def close(self, timeout=None): |
| """ |
| Close the connection and all sessions. |
| """ |
| try: |
| for ssn in self.sessions.values(): |
| ssn.close(timeout=timeout) |
| finally: |
| self.detach(timeout=timeout) |
| self._open = False |
| |
| class Session(Endpoint): |
| |
| """ |
| Sessions provide a linear context for sending and receiving |
| L{Messages<Message>}. L{Messages<Message>} are sent and received |
| using the L{Sender.send} and L{Receiver.fetch} methods of the |
| L{Sender} and L{Receiver} objects associated with a Session. |
| |
| Each L{Sender} and L{Receiver} is created by supplying either a |
| target or source address to the L{sender} and L{receiver} methods of |
| the Session. The address is supplied via a string syntax documented |
| below. |
| |
| Addresses |
| ========= |
| |
| An address identifies a source or target for messages. In its |
| simplest form this is just a name. In general a target address may |
| also be used as a source address, however not all source addresses |
| may be used as a target, e.g. a source might additionally have some |
| filtering criteria that would not be present in a target. |
| |
| A subject may optionally be specified along with the name. When an |
| address is used as a target, any subject specified in the address is |
| used as the default subject of outgoing messages for that target. |
| When an address is used as a source, any subject specified in the |
| address is pattern matched against the subject of available messages |
| as a filter for incoming messages from that source. |
| |
| The options map contains additional information about the address |
| including: |
| |
| - policies for automatically creating, and deleting the node to |
| which an address refers |
| |
| - policies for asserting facts about the node to which an address |
| refers |
| |
| - extension points that can be used for sender/receiver |
| configuration |
| |
| Mapping to AMQP 0-10 |
| -------------------- |
| The name is resolved to either an exchange or a queue by querying |
| the broker. |
| |
| The subject is set as a property on the message. Additionally, if |
| the name refers to an exchange, the routing key is set to the |
| subject. |
| |
| Syntax |
| ------ |
| The following regular expressions define the tokens used to parse |
| addresses:: |
| LBRACE: \\{ |
| RBRACE: \\} |
| LBRACK: \\[ |
| RBRACK: \\] |
| COLON: : |
| SEMI: ; |
| SLASH: / |
| COMMA: , |
| NUMBER: [+-]?[0-9]*\\.?[0-9]+ |
| ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? |
| STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' |
| ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] |
| SYM: [.#*%@$^!+-] |
| WSPACE: [ \\n\\r\\t]+ |
| |
| The formal grammar for addresses is given below:: |
| address = name [ "/" subject ] [ ";" options ] |
| name = ( part | quoted )+ |
| subject = ( part | quoted | "/" )* |
| quoted = STRING / ESC |
| part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM |
| options = map |
| map = "{" ( keyval ( "," keyval )* )? "}" |
| keyval = ID ":" value |
| value = NUMBER / STRING / ID / map / list |
| list = "[" ( value ( "," value )* )? "]" |
| |
| This grammar resuls in the following informal syntax:: |
| |
| <name> [ / <subject> ] [ ; <options> ] |
| |
| Where options is:: |
| |
| { <key> : <value>, ... } |
| |
| And values may be: |
| - numbers |
| - single, double, or non quoted strings |
| - maps (dictionaries) |
| - lists |
| |
| Options |
| ------- |
| The options map permits the following parameters:: |
| |
| <name> [ / <subject> ] ; { |
| create: always | sender | receiver | never, |
| delete: always | sender | receiver | never, |
| assert: always | sender | receiver | never, |
| mode: browse | consume, |
| node: { |
| type: queue | topic, |
| durable: True | False, |
| x-declare: { ... <declare-overrides> ... }, |
| x-bindings: [<binding_1>, ... <binding_n>] |
| }, |
| link: { |
| name: <link-name>, |
| durable: True | False, |
| reliability: unreliable | at-most-once | at-least-once | exactly-once, |
| x-declare: { ... <declare-overrides> ... }, |
| x-bindings: [<binding_1>, ... <binding_n>], |
| x-subscribe: { ... <subscribe-overrides> ... } |
| } |
| } |
| |
| Bindings are specified as a map with the following options:: |
| |
| { |
| exchange: <exchange>, |
| queue: <queue>, |
| key: <key>, |
| arguments: <arguments> |
| } |
| |
| The create, delete, and assert policies specify who should perfom |
| the associated action: |
| |
| - I{always}: the action will always be performed |
| - I{sender}: the action will only be performed by the sender |
| - I{receiver}: the action will only be performed by the receiver |
| - I{never}: the action will never be performed (this is the default) |
| |
| The node-type is one of: |
| |
| - I{topic}: a topic node will default to the topic exchange, |
| x-declare may be used to specify other exchange types |
| - I{queue}: this is the default node-type |
| |
| The x-declare map permits protocol specific keys and values to be |
| specified when exchanges or queues are declared. These keys and |
| values are passed through when creating a node or asserting facts |
| about an existing node. |
| |
| Examples |
| -------- |
| A simple name resolves to any named node, usually a queue or a |
| topic:: |
| |
| my-queue-or-topic |
| |
| A simple name with a subject will also resolve to a node, but the |
| presence of the subject will cause a sender using this address to |
| set the subject on outgoing messages, and receivers to filter based |
| on the subject:: |
| |
| my-queue-or-topic/my-subject |
| |
| A subject pattern can be used and will cause filtering if used by |
| the receiver. If used for a sender, the literal value gets set as |
| the subject:: |
| |
| my-queue-or-topic/my-* |
| |
| In all the above cases, the address is resolved to an existing node. |
| If you want the node to be auto-created, then you can do the |
| following. By default nonexistent nodes are assumed to be queues:: |
| |
| my-queue; {create: always} |
| |
| You can customize the properties of the queue:: |
| |
| my-queue; {create: always, node: {durable: True}} |
| |
| You can create a topic instead if you want:: |
| |
| my-queue; {create: always, node: {type: topic}} |
| |
| You can assert that the address resolves to a node with particular |
| properties:: |
| |
| my-transient-topic; { |
| assert: always, |
| node: { |
| type: topic, |
| durable: False |
| } |
| } |
| """ |
| |
| def __init__(self, connection, name, transactional): |
| self.connection = connection |
| self.name = name |
| self.log_id = "%x" % id(self) |
| |
| self.transactional = transactional |
| |
| self.committing = False |
| self.committed = True |
| self.aborting = False |
| self.aborted = False |
| |
| self.next_sender_id = 0 |
| self.senders = [] |
| self.next_receiver_id = 0 |
| self.receivers = [] |
| self.outgoing = [] |
| self.incoming = [] |
| self.unacked = [] |
| self.acked = [] |
| # XXX: I hate this name. |
| self.ack_capacity = UNLIMITED |
| |
| self.error = None |
| self.closing = False |
| self.closed = False |
| |
| self._lock = connection._lock |
| |
| def __repr__(self): |
| return "<Session %s>" % self.name |
| |
| def _wait(self, predicate, timeout=None): |
| return self.connection._wait(predicate, timeout=timeout) |
| |
| def _wakeup(self): |
| self.connection._wakeup() |
| |
| def check_error(self): |
| self.connection.check_error() |
| if self.error: |
| raise self.error |
| |
| def get_error(self): |
| err = self.connection.get_error() |
| if err: |
| return err |
| else: |
| return self.error |
| |
| def _ewait(self, predicate, timeout=None): |
| result = self.connection._ewait(lambda: self.error or predicate(), timeout) |
| self.check_error() |
| return result |
| |
| def check_closed(self): |
| if self.closed: |
| raise SessionClosed() |
| |
| @synchronized |
| def sender(self, target, **options): |
| """ |
| Creates a L{Sender} that may be used to send L{Messages<Message>} |
| to the specified target. |
| |
| @type target: str |
| @param target: the target to which messages will be sent |
| @rtype: Sender |
| @return: a new Sender for the specified target |
| """ |
| target = _mangle(target) |
| sender = Sender(self, self.next_sender_id, target, options) |
| self.next_sender_id += 1 |
| self.senders.append(sender) |
| if not self.closed and self.connection._connected: |
| self._wakeup() |
| try: |
| sender._ewait(lambda: sender.linked) |
| except LinkError, e: |
| sender.close() |
| raise e |
| return sender |
| |
| @synchronized |
| def receiver(self, source, **options): |
| """ |
| Creates a receiver that may be used to fetch L{Messages<Message>} |
| from the specified source. |
| |
| @type source: str |
| @param source: the source of L{Messages<Message>} |
| @rtype: Receiver |
| @return: a new Receiver for the specified source |
| """ |
| source = _mangle(source) |
| receiver = Receiver(self, self.next_receiver_id, source, options) |
| self.next_receiver_id += 1 |
| self.receivers.append(receiver) |
| if not self.closed and self.connection._connected: |
| self._wakeup() |
| try: |
| receiver._ewait(lambda: receiver.linked) |
| except LinkError, e: |
| receiver.close() |
| raise e |
| return receiver |
| |
| @synchronized |
| def _count(self, predicate): |
| result = 0 |
| for msg in self.incoming: |
| if predicate(msg): |
| result += 1 |
| return result |
| |
| def _peek(self, receiver): |
| for msg in self.incoming: |
| if msg._receiver == receiver: |
| return msg |
| |
| def _pop(self, receiver): |
| i = 0 |
| while i < len(self.incoming): |
| msg = self.incoming[i] |
| if msg._receiver == receiver: |
| del self.incoming[i] |
| return msg |
| else: |
| i += 1 |
| |
| @synchronized |
| def _get(self, receiver, timeout=None): |
| if self._ewait(lambda: ((self._peek(receiver) is not None) or |
| self.closing or receiver.closed), |
| timeout): |
| msg = self._pop(receiver) |
| if msg is not None: |
| msg._receiver.returned += 1 |
| self.unacked.append(msg) |
| log.debug("RETR[%s]: %s", self.log_id, msg) |
| return msg |
| return None |
| |
| @synchronized |
| def next_receiver(self, timeout=None): |
| if self._ecwait(lambda: self.incoming, timeout): |
| return self.incoming[0]._receiver |
| else: |
| raise Empty |
| |
| @synchronized |
| def acknowledge(self, message=None, disposition=None, sync=True): |
| """ |
| Acknowledge the given L{Message}. If message is None, then all |
| unacknowledged messages on the session are acknowledged. |
| |
| @type message: Message |
| @param message: the message to acknowledge or None |
| @type sync: boolean |
| @param sync: if true then block until the message(s) are acknowledged |
| """ |
| if message is None: |
| messages = self.unacked[:] |
| else: |
| messages = [message] |
| |
| for m in messages: |
| if self.ack_capacity is not UNLIMITED: |
| if self.ack_capacity <= 0: |
| # XXX: this is currently a SendError, maybe it should be a SessionError? |
| raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) |
| self._wakeup() |
| self._ecwait(lambda: len(self.acked) < self.ack_capacity) |
| m._disposition = disposition |
| self.unacked.remove(m) |
| self.acked.append(m) |
| |
| self._wakeup() |
| if sync: |
| self._ecwait(lambda: not [m for m in messages if m in self.acked]) |
| |
| @synchronized |
| def commit(self): |
| """ |
| Commit outstanding transactional work. This consists of all |
| message sends and receives since the prior commit or rollback. |
| """ |
| if not self.transactional: |
| raise NontransactionalSession() |
| self.committing = True |
| self._wakeup() |
| self._ecwait(lambda: not self.committing) |
| if self.aborted: |
| raise TransactionAborted() |
| assert self.committed |
| |
| @synchronized |
| def rollback(self): |
| """ |
| Rollback outstanding transactional work. This consists of all |
| message sends and receives since the prior commit or rollback. |
| """ |
| if not self.transactional: |
| raise NontransactionalSession() |
| self.aborting = True |
| self._wakeup() |
| self._ecwait(lambda: not self.aborting) |
| assert self.aborted |
| |
| @synchronized |
| def sync(self, timeout=None): |
| """ |
| Sync the session. |
| """ |
| for snd in self.senders: |
| snd.sync(timeout=timeout) |
| if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout): |
| raise Timeout("session sync timed out") |
| |
| @synchronized |
| def close(self, timeout=None): |
| """ |
| Close the session. |
| """ |
| self.sync(timeout=timeout) |
| |
| for link in self.receivers + self.senders: |
| link.close(timeout=timeout) |
| |
| if not self.closing: |
| self.closing = True |
| self._wakeup() |
| |
| try: |
| if not self._ewait(lambda: self.closed, timeout=timeout): |
| raise Timeout("session close timed out") |
| finally: |
| self.connection._remove_session(self) |
| |
| def _mangle(addr): |
| if addr and addr.startswith("#"): |
| return str(uuid4()) + addr |
| else: |
| return addr |
| |
| class Sender(Endpoint): |
| |
| """ |
| Sends outgoing messages. |
| """ |
| |
| def __init__(self, session, id, target, options): |
| self.session = session |
| self.id = id |
| self.target = target |
| self.options = options |
| self.capacity = options.get("capacity", UNLIMITED) |
| self.threshold = 0.5 |
| self.durable = options.get("durable") |
| self.queued = Serial(0) |
| self.synced = Serial(0) |
| self.acked = Serial(0) |
| self.error = None |
| self.linked = False |
| self.closing = False |
| self.closed = False |
| self._lock = self.session._lock |
| |
| def _wakeup(self): |
| self.session._wakeup() |
| |
| def check_error(self): |
| self.session.check_error() |
| if self.error: |
| raise self.error |
| |
| def get_error(self): |
| err = self.session.get_error() |
| if err: |
| return err |
| else: |
| return self.error |
| |
| def _ewait(self, predicate, timeout=None): |
| result = self.session._ewait(lambda: self.error or predicate(), timeout) |
| self.check_error() |
| return result |
| |
| def check_closed(self): |
| if self.closed: |
| raise LinkClosed() |
| |
| @synchronized |
| def unsettled(self): |
| """ |
| Returns the number of messages awaiting acknowledgment. |
| @rtype: int |
| @return: the number of unacknowledged messages |
| """ |
| return self.queued - self.acked |
| |
| @synchronized |
| def available(self): |
| if self.capacity is UNLIMITED: |
| return UNLIMITED |
| else: |
| return self.capacity - self.unsettled() |
| |
| @synchronized |
| def send(self, object, sync=True, timeout=None): |
| """ |
| Send a message. If the object passed in is of type L{unicode}, |
| L{str}, L{list}, or L{dict}, it will automatically be wrapped in a |
| L{Message} and sent. If it is of type L{Message}, it will be sent |
| directly. If the sender capacity is not L{UNLIMITED} then send |
| will block until there is available capacity to send the message. |
| If the timeout parameter is specified, then send will throw an |
| L{InsufficientCapacity} exception if capacity does not become |
| available within the specified time. |
| |
| @type object: unicode, str, list, dict, Message |
| @param object: the message or content to send |
| |
| @type sync: boolean |
| @param sync: if true then block until the message is sent |
| |
| @type timeout: float |
| @param timeout: the time to wait for available capacity |
| """ |
| |
| if not self.session.connection._connected or self.session.closing: |
| raise Detached() |
| |
| self._ecwait(lambda: self.linked) |
| |
| if isinstance(object, Message): |
| message = object |
| else: |
| message = Message(object) |
| |
| if message.durable is None: |
| message.durable = self.durable |
| |
| if self.capacity is not UNLIMITED: |
| if self.capacity <= 0: |
| raise InsufficientCapacity("capacity = %s" % self.capacity) |
| if not self._ecwait(self.available, timeout=timeout): |
| raise InsufficientCapacity("capacity = %s" % self.capacity) |
| |
| # XXX: what if we send the same message to multiple senders? |
| message._sender = self |
| if self.capacity is not UNLIMITED: |
| message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity)) |
| else: |
| message._sync = sync |
| self.session.outgoing.append(message) |
| self.queued += 1 |
| |
| if sync: |
| self.sync() |
| assert message not in self.session.outgoing |
| else: |
| self._wakeup() |
| |
| @synchronized |
| def sync(self, timeout=None): |
| mno = self.queued |
| if self.synced < mno: |
| self.synced = mno |
| self._wakeup() |
| if not self._ewait(lambda: self.acked >= mno, timeout=timeout): |
| raise Timeout("sender sync timed out") |
| |
| @synchronized |
| def close(self, timeout=None): |
| """ |
| Close the Sender. |
| """ |
| # avoid erroring out when closing a sender that was never |
| # established |
| if self.acked < self.queued: |
| self.sync(timeout=timeout) |
| |
| if not self.closing: |
| self.closing = True |
| self._wakeup() |
| |
| try: |
| if not self.session._ewait(lambda: self.closed, timeout=timeout): |
| raise Timeout("sender close timed out") |
| finally: |
| try: |
| self.session.senders.remove(self) |
| except ValueError: |
| pass |
| |
| class Receiver(Endpoint, object): |
| |
| """ |
| Receives incoming messages from a remote source. Messages may be |
| fetched with L{fetch}. |
| """ |
| |
| def __init__(self, session, id, source, options): |
| self.session = session |
| self.id = id |
| self.source = source |
| self.options = options |
| |
| self.granted = Serial(0) |
| self.draining = False |
| self.impending = Serial(0) |
| self.received = Serial(0) |
| self.returned = Serial(0) |
| |
| self.error = None |
| self.linked = False |
| self.closing = False |
| self.closed = False |
| self._lock = self.session._lock |
| self._capacity = 0 |
| self._set_capacity(options.get("capacity", 0), False) |
| self.threshold = 0.5 |
| |
| @synchronized |
| def _set_capacity(self, c, wakeup=True): |
| if c is UNLIMITED: |
| self._capacity = c.value |
| else: |
| self._capacity = c |
| self._grant() |
| if wakeup: |
| self._wakeup() |
| |
| def _get_capacity(self): |
| if self._capacity == UNLIMITED.value: |
| return UNLIMITED |
| else: |
| return self._capacity |
| |
| capacity = property(_get_capacity, _set_capacity) |
| |
| def _wakeup(self): |
| self.session._wakeup() |
| |
| def check_error(self): |
| self.session.check_error() |
| if self.error: |
| raise self.error |
| |
| def get_error(self): |
| err = self.session.get_error() |
| if err: |
| return err |
| else: |
| return self.error |
| |
| def _ewait(self, predicate, timeout=None): |
| result = self.session._ewait(lambda: self.error or predicate(), timeout) |
| self.check_error() |
| return result |
| |
| def check_closed(self): |
| if self.closed: |
| raise LinkClosed() |
| |
| @synchronized |
| def unsettled(self): |
| """ |
| Returns the number of acknowledged messages awaiting confirmation. |
| """ |
| return len([m for m in self.acked if m._receiver is self]) |
| |
| @synchronized |
| def available(self): |
| """ |
| Returns the number of messages available to be fetched by the |
| application. |
| |
| @rtype: int |
| @return: the number of available messages |
| """ |
| return self.received - self.returned |
| |
| @synchronized |
| def fetch(self, timeout=None): |
| """ |
| Fetch and return a single message. A timeout of None will block |
| forever waiting for a message to arrive, a timeout of zero will |
| return immediately if no messages are available. |
| |
| @type timeout: float |
| @param timeout: the time to wait for a message to be available |
| """ |
| |
| self._ecwait(lambda: self.linked) |
| |
| if self._capacity == 0: |
| self.granted = self.returned + 1 |
| self._wakeup() |
| self._ecwait(lambda: self.impending >= self.granted) |
| msg = self.session._get(self, timeout=timeout) |
| if msg is None: |
| self.check_closed() |
| self.draining = True |
| self._wakeup() |
| self._ecwait(lambda: not self.draining) |
| msg = self.session._get(self, timeout=0) |
| self._grant() |
| self._wakeup() |
| if msg is None: |
| raise Empty() |
| elif self._capacity not in (0, UNLIMITED.value): |
| t = int(ceil(self.threshold * self._capacity)) |
| if self.received - self.returned <= t: |
| self.granted = self.returned + self._capacity |
| self._wakeup() |
| return msg |
| |
| def _grant(self): |
| if self._capacity == UNLIMITED.value: |
| self.granted = UNLIMITED |
| else: |
| self.granted = self.returned + self._capacity |
| |
| @synchronized |
| def close(self, timeout=None): |
| """ |
| Close the receiver. |
| """ |
| if not self.closing: |
| self.closing = True |
| self._wakeup() |
| |
| try: |
| if not self.session._ewait(lambda: self.closed, timeout=timeout): |
| raise Timeout("receiver close timed out") |
| finally: |
| try: |
| self.session.receivers.remove(self) |
| except ValueError: |
| pass |
| |
| __all__ = ["Connection", "Session", "Sender", "Receiver"] |