PROTON-2407 Introduce more optional typing annotations to Python binding (#326)
diff --git a/python/proton/_condition.py b/python/proton/_condition.py
index 445c912..ab390af 100644
--- a/python/proton/_condition.py
+++ b/python/proton/_condition.py
@@ -17,13 +17,16 @@
# under the License.
#
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
from cproton import pn_condition_clear, pn_condition_set_name, pn_condition_set_description, pn_condition_info, \
pn_condition_is_set, pn_condition_get_name, pn_condition_get_description
from ._data import Data, dat2obj
+if TYPE_CHECKING:
+ from ._data import PythonAMQPData
+
class Condition:
"""
@@ -48,16 +51,18 @@
information relevant to the identified condition.
:ivar ~.name: The name of the condition.
- :vartype ~.name: ``str``
:ivar ~.description: A description of the condition.
- :vartype ~.description: ``str``
:ivar ~.info: A data object that holds the additional information associated
with the condition. The data object may be used both to access and to
modify the additional information associated with the condition.
- :vartype ~.info: :class:`Data`
"""
- def __init__(self, name, description=None, info=None):
+ def __init__(
+ self,
+ name: str,
+ description: Optional[str] = None,
+ info: Optional['PythonAMQPData'] = None
+ ) -> None:
self.name = name
self.description = description
self.info = info
diff --git a/python/proton/_data.py b/python/proton/_data.py
index 6e91a37..f88cec5 100644
--- a/python/proton/_data.py
+++ b/python/proton/_data.py
@@ -22,10 +22,14 @@
try:
from typing import Literal
except ImportError:
- class Literal:
- def __class_getitem__(cls, item):
+ # https://www.python.org/dev/peps/pep-0560/#class-getitem
+ class GenericMeta(type):
+ def __getitem__(self, item):
pass
+ class Literal(metaclass=GenericMeta):
+ pass
+
from cproton import PN_ARRAY, PN_BINARY, PN_BOOL, PN_BYTE, PN_CHAR, PN_DECIMAL128, PN_DECIMAL32, PN_DECIMAL64, \
PN_DESCRIBED, PN_DOUBLE, PN_FLOAT, PN_INT, PN_LIST, PN_LONG, PN_MAP, PN_NULL, PN_OVERFLOW, PN_SHORT, PN_STRING, \
PN_SYMBOL, PN_TIMESTAMP, PN_UBYTE, PN_UINT, PN_ULONG, PN_USHORT, PN_UUID, pn_data, pn_data_clear, pn_data_copy, \
diff --git a/python/proton/_delivery.py b/python/proton/_delivery.py
index 7104753..3fe9875 100644
--- a/python/proton/_delivery.py
+++ b/python/proton/_delivery.py
@@ -40,7 +40,7 @@
class NamedInt(int):
values: Dict[int, str] = {}
- def __new__(cls, i, name):
+ def __new__(cls: Type['DispositionType'], i: int, name: str) -> 'DispositionType':
ni = super(NamedInt, cls).__new__(cls, i)
cls.values[i] = ni
return ni
@@ -55,7 +55,7 @@
return self.name
@classmethod
- def get(cls, i):
+ def get(cls, i: int) -> Union[int, 'DispositionType']:
return cls.values.get(i, i)
@@ -320,11 +320,9 @@
self.remote = Disposition(pn_delivery_remote(self._impl), False)
@property
- def tag(self):
+ def tag(self) -> str:
"""
The identifier for the delivery.
-
- :type: ``bytes``
"""
return pn_delivery_tag(self._impl)
diff --git a/python/proton/_endpoints.py b/python/proton/_endpoints.py
index 9520d68..b27be28 100644
--- a/python/proton/_endpoints.py
+++ b/python/proton/_endpoints.py
@@ -63,7 +63,8 @@
from ._exceptions import ConnectionException, EXCEPTIONS, LinkException, SessionException
from ._transport import Transport
from ._wrapper import Wrapper
-from typing import Callable, Dict, List, Optional, Union, TYPE_CHECKING
+from typing import Callable, Dict, List, Optional, Union, TYPE_CHECKING, Any
+
if TYPE_CHECKING:
from ._condition import Condition
from ._data import Array, symbol
@@ -165,7 +166,7 @@
else:
return Connection(impl)
- def __init__(self, impl=pn_connection):
+ def __init__(self, impl: Callable[[], Any] = pn_connection) -> None:
Wrapper.__init__(self, impl, pn_connection_attachments)
def _init(self) -> None:
@@ -375,15 +376,11 @@
return dat2obj(pn_connection_remote_properties(self._impl))
@property
- def connected_address(self):
- """
- The address for this connection.
-
- :type: ``str``
- """
+ def connected_address(self) -> str:
+ """The address for this connection."""
return self.url and str(self.url)
- def open(self):
+ def open(self) -> None:
"""
Opens the connection.
@@ -893,7 +890,7 @@
"""
return self.session.transport
- def delivery(self, tag):
+ def delivery(self, tag: str) -> Delivery:
"""
Create a delivery. Every delivery object within a
link must be supplied with a unique tag. Links
@@ -901,8 +898,6 @@
they are created.
:param tag: Delivery tag unique for this link.
- :type tag: ``bytes``
- :rtype: :class:`Delivery`
"""
return Delivery(pn_delivery(self._impl, tag))
@@ -1040,23 +1035,19 @@
return pn_link_is_receiver(self._impl)
@property
- def remote_snd_settle_mode(self):
+ def remote_snd_settle_mode(self) -> int:
"""
The remote sender settle mode for this link. One of
:const:`SND_UNSETTLED`, :const:`SND_SETTLED` or
:const:`SND_MIXED`.
-
- :type: ``int``
"""
return pn_link_remote_snd_settle_mode(self._impl)
@property
- def remote_rcv_settle_mode(self):
+ def remote_rcv_settle_mode(self) -> int:
"""
The remote receiver settle mode for this link. One of
:const:`RCV_FIRST` or :const:`RCV_SECOND`.
-
- :type: ``int``
"""
return pn_link_remote_rcv_settle_mode(self._impl)
@@ -1161,7 +1152,7 @@
"""
return pn_link_detach(self._impl)
- def free(self):
+ def free(self) -> None:
"""
Free this link object. When a link object is freed,
all :class:`Delivery` objects associated with the session (**<-- CHECK THIS**)
diff --git a/python/proton/_handlers.py b/python/proton/_handlers.py
index 0191b98..7ec1c6b 100644
--- a/python/proton/_handlers.py
+++ b/python/proton/_handlers.py
@@ -36,6 +36,7 @@
from typing import Any, List, Optional, Tuple, Union, TYPE_CHECKING, TypeVar
if TYPE_CHECKING:
+ from ._delivery import DispositionType
from ._reactor import Container, Transaction
from ._endpoints import Sender, Receiver
@@ -159,7 +160,7 @@
A class containing methods for handling received messages.
"""
- def accept(self, delivery: Delivery):
+ def accept(self, delivery: Delivery) -> None:
"""
Accepts a received message.
@@ -171,7 +172,7 @@
"""
self.settle(delivery, Delivery.ACCEPTED)
- def reject(self, delivery: Delivery):
+ def reject(self, delivery: Delivery) -> None:
"""
Rejects a received message that is considered invalid or
unprocessable.
@@ -184,7 +185,7 @@
"""
self.settle(delivery, Delivery.REJECTED)
- def release(self, delivery: Delivery, delivered: bool = True):
+ def release(self, delivery: Delivery, delivered: bool = True) -> None:
"""
Releases a received message, making it available at the source
for any (other) interested receiver. The ``delivered``
@@ -207,16 +208,14 @@
else:
self.settle(delivery, Delivery.RELEASED)
- def settle(self, delivery, state=None):
+ def settle(self, delivery: Delivery, state: Optional['DispositionType'] = None) -> None:
"""
Settles the message delivery, and optionally updating the
delivery state.
:param delivery: The message delivery tracking object
- :type delivery: :class:`proton.Delivery`
- :param state: The delivery state, or ``None`` if not update
+ :param state: The delivery state, or ``None`` if no update
is to be performed.
- :type state: ``int`` or ``None``
"""
if state:
delivery.update(state)
@@ -228,13 +227,12 @@
A utility for simpler and more intuitive handling of delivery
events related to incoming i.e. received messages.
- :type auto_accept: ``bool``
:param auto_accept: If ``True``, accept all messages (default). Otherwise
messages must be individually accepted or rejected.
:param delegate: A client handler for the endpoint event
"""
- def __init__(self, auto_accept=True, delegate=None):
+ def __init__(self, auto_accept: bool = True, delegate: Optional[Handler] = None) -> None:
self.delegate = delegate
self.auto_accept = auto_accept
@@ -316,11 +314,10 @@
:param peer_close_is_error: If ``True``, a peer endpoint closing will be
treated as an error with an error callback. Otherwise (default), the
normal callbacks for the closing will occur.
- :type peer_close_is_error: ``bool``
:param delegate: A client handler for the endpoint event
"""
- def __init__(self, peer_close_is_error=False, delegate=None):
+ def __init__(self, peer_close_is_error: bool = False, delegate: Optional[Handler] = None) -> None:
self.delegate = delegate
self.peer_close_is_error = peer_close_is_error
diff --git a/python/proton/_io.py b/python/proton/_io.py
index 2758587..5628d1d 100644
--- a/python/proton/_io.py
+++ b/python/proton/_io.py
@@ -125,7 +125,7 @@
self._writing.add(selectable)
self.update_deadline()
- def select(self, timeout):
+ def select(self, timeout: float) -> Tuple[List, List, List]:
def select_inner(timeout):
# This inner select adds the writing fds to the exception fd set
diff --git a/python/proton/_message.py b/python/proton/_message.py
index 3d12a04..54e9fe9 100644
--- a/python/proton/_message.py
+++ b/python/proton/_message.py
@@ -506,18 +506,15 @@
self._check(pn_message_decode(self._msg, data))
self._post_decode()
- def send(self, sender, tag=None):
+ def send(self, sender: 'Sender', tag: Optional[str] = None) -> 'Delivery':
"""
Encodes and sends the message content using the specified sender,
and, if present, using the specified tag. Upon success, will
return the :class:`Delivery` object for the sent message.
:param sender: The sender to send the message
- :type sender: :class:`Sender`
:param tag: The delivery tag for the sent message
- :type tag: ``bytes``
:return: The delivery associated with the sent message
- :rtype: :class:`Delivery`
"""
dlv = sender.delivery(tag or sender.delivery_tag())
encoded = self.encode()
@@ -527,7 +524,11 @@
dlv.settle()
return dlv
- def recv(self, link):
+ @overload
+ def recv(self, link: 'Sender') -> None:
+ ...
+
+ def recv(self, link: 'Receiver') -> Optional['Delivery']:
"""
Receives and decodes the message content for the current :class:`Delivery`
from the link. Upon success it will return the current delivery
@@ -536,9 +537,7 @@
return ``None``.
:param link: The link to receive a message from
- :type link: :class:`Link`
:return: the delivery associated with the decoded message (or None)
- :rtype: :class:`Delivery`
"""
if link.is_sender:
diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py
index 8e69d4a..aeabc53 100644
--- a/python/proton/_reactor.py
+++ b/python/proton/_reactor.py
@@ -23,15 +23,18 @@
import re
import os
import queue
-from typing import Any, Dict, Iterator, Optional, List, Union, Callable
+from typing import Any, Dict, Iterator, Optional, List, Union, Callable, TYPE_CHECKING
try:
from typing import Literal
except ImportError:
- class Literal:
- @classmethod
- def __class_getitem__(cls, item):
+ # https://www.python.org/dev/peps/pep-0560/#class-getitem
+ class GenericMeta(type):
+ def __getitem__(self, item):
pass
+ class Literal(metaclass=GenericMeta):
+ pass
+
import time
import traceback
import uuid
@@ -39,25 +42,22 @@
from cproton import PN_PYREF, PN_ACCEPTED, PN_EVENT_NONE
+from ._common import isstring, unicode2utf8, utf82unicode
+from ._data import Described, symbol, ulong
from ._delivery import Delivery
from ._endpoints import Connection, Endpoint, Link, Session, Terminus
+from ._events import Collector, EventType, EventBase, Handler, Event
from ._exceptions import SSLUnavailable
-from ._data import Described, symbol, ulong
+from ._handlers import OutgoingMessageHandler, IOHandler
+from ._io import IO
from ._message import Message
from ._transport import Transport, SSL, SSLDomain
from ._url import Url
-from ._common import isstring, unicode2utf8, utf82unicode
-from ._events import Collector, EventType, EventBase, Handler, Event
from ._selectable import Selectable
-from ._handlers import OutgoingMessageHandler, IOHandler
-
-from ._io import IO
-from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ._endpoints import Receiver, Sender
- from ._handlers import ConnectSelectable, TransactionHandler
- from ._utils import BlockingConnection
+ from ._handlers import TransactionHandler
from socket import socket
from uuid import UUID
@@ -140,7 +140,7 @@
# TODO: need to make this actually return a proxy which catches exceptions and calls
# on error.
# [Or arrange another way to deal with exceptions thrown by handlers]
- def _make_handler(self, handler):
+ def _make_handler(self, handler: Handler) -> Handler:
"""
Return a proxy handler that dispatches to the provided handler.
@@ -283,7 +283,7 @@
def stop_events(self) -> None:
self._collector.release()
- def schedule(self, delay, handler):
+ def schedule(self, delay: Union[float, int], handler: Handler) -> Task:
"""
Schedule a task to run on this container after a given delay,
and using the supplied handler.
@@ -325,7 +325,12 @@
return t._deadline
return None
- def acceptor(self, host, port, handler=None):
+ def acceptor(
+ self,
+ host: str,
+ port: Union[str, Url.Port],
+ handler: Optional[Handler] = None,
+ ) -> 'Acceptor':
impl = self._make_handler(handler)
a = Acceptor(self, unicode2utf8(host), int(port), impl)
if a:
@@ -333,7 +338,7 @@
else:
raise IOError("%s (%s:%s)" % (str(self.errors), host, port))
- def connection(self, handler=None):
+ def connection(self, handler: Optional[Handler] = None) -> Connection:
"""Deprecated: use connection_to_host() instead
"""
impl = self._make_handler(handler)
@@ -360,7 +365,7 @@
"""
connection.set_address(host, port)
- def get_connection_address(self, connection):
+ def get_connection_address(self, connection: Connection) -> str:
"""*Deprecated* in favor of the property proton.Connection.connected_address.
This may be used to retrieve the remote peer address.
:return: string containing the address in URL format or None if no
@@ -369,7 +374,11 @@
"""
return connection.connected_address
- def selectable(self, handler=None, delegate=None):
+ def selectable(
+ self,
+ handler: Optional[Union['Acceptor', 'EventInjector']] = None,
+ delegate: Optional['socket'] = None
+ ) -> Selectable:
"""
NO IDEA!
@@ -384,10 +393,14 @@
result.handler = handler
return result
- def update(self, selectable):
+ def update(self, selectable: Selectable) -> None:
selectable.update()
- def push_event(self, obj, etype):
+ def push_event(
+ self,
+ obj: Union[Task, 'Container', Selectable],
+ etype: EventType
+ ) -> None:
self._collector.put(obj, etype)
@@ -407,7 +420,7 @@
self._transport = None
self._closed = False
- def trigger(self, event):
+ def trigger(self, event: 'ApplicationEvent') -> None:
"""
Request that the given event be dispatched on the event thread
of the container to which this EventInjector was added.
@@ -495,7 +508,7 @@
self.subject = subject
@property
- def context(self):
+ def context(self) -> 'ApplicationEvent':
"""
A reference to this event.
"""
@@ -523,7 +536,12 @@
(for a successful transaction), or :meth:`abort` (for a failed transaction).
"""
- def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
+ def __init__(
+ self,
+ txn_ctrl: 'Sender',
+ handler: 'TransactionHandler',
+ settle_before_discharge: bool = False,
+ ) -> None:
self.txn_ctrl = txn_ctrl
self.handler = handler
self.id = None
@@ -560,18 +578,19 @@
delivery.transaction = self
return delivery
- def send(self, sender, msg, tag=None):
+ def send(
+ self,
+ sender: 'Sender',
+ msg: Message,
+ tag: Optional[bytes] = None,
+ ) -> Delivery:
"""
Send a message under this transaction.
:param sender: Link over which to send the message.
- :type sender: :class:`proton.Sender`
:param msg: Message to be sent under this transaction.
- :type msg: :class:`proton.Message`
:param tag: The delivery tag
- :type tag: ``bytes``
:return: Delivery object for this message.
- :rtype: :class:`proton.Delivery`
"""
dlv = sender.send(msg, tag=tag)
dlv.local.data = [self.id]
@@ -590,7 +609,7 @@
else:
self._pending.append(delivery)
- def update(self, delivery, state=None):
+ def update(self, delivery: Delivery, state: Optional[ulong] = None) -> None:
if state:
delivery.local.data = [self.id, Described(ulong(state), [])]
delivery.update(0x34)
@@ -725,10 +744,9 @@
they will be converted to symbols before being applied).
:param props: A map of link options to be applied to a link.
- :type props: ``dict``
"""
- def __init__(self, props={}):
+ def __init__(self, props: dict = {}) -> None:
self.properties = {}
for k in props:
if isinstance(k, symbol):
@@ -774,12 +792,10 @@
Configures a receiver with a message selector filter
:param value: Selector filter string
- :type value: ``str``
:param name: Name of the selector, defaults to ``"selector"``.
- :type name: ``str``
"""
- def __init__(self, value, name='selector'):
+ def __init__(self, value: Union[bytes, str], name: str = 'selector') -> None:
super(Selector, self).__init__({symbol(name): Described(
symbol('apache.org:selector-filter:string'), utf82unicode(value))})
@@ -836,7 +852,10 @@
receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
-def _apply_link_options(options, link):
+def _apply_link_options(
+ options: Optional[Union[LinkOption, List[LinkOption]]],
+ link: Union['Sender', 'Receiver']
+) -> None:
if options:
if isinstance(options, list):
for o in options:
@@ -994,7 +1013,7 @@
class Urls(object):
- def __init__(self, values):
+ def __init__(self, values: List[Union[Url, str]]) -> None:
self.values = [Url(v) for v in values]
def __iter__(self) -> Iterator[Url]:
@@ -1133,7 +1152,7 @@
return None
-def _get_default_config():
+def _get_default_config() -> Dict[str, Any]:
conf = os.environ.get('MESSAGING_CONNECT_FILE') or _find_config_file()
if conf and os.path.isfile(conf):
with open(conf, 'r') as f:
@@ -1189,8 +1208,17 @@
self.user = None
self.password = None
- def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
- **kwargs):
+ def connect(
+ self,
+ url: Optional[Union[str, Url]] = None,
+ urls: Optional[List[str]] = None,
+ address: Optional[str] = None,
+ handler: Optional[Handler] = None,
+ reconnect: Union[None, Literal[False], Backoff] = None,
+ heartbeat: Optional[float] = None,
+ ssl_domain: Optional[SSLDomain] = None,
+ **kwargs
+ ) -> Connection:
"""
Initiates the establishment of an AMQP connection.
@@ -1218,30 +1246,23 @@
``ca`` above (:const:`proton.SSLDomain.VERIFY_PEER_NAME`).
:param url: URL string of process to connect to
- :type url: ``str``
-
:param urls: list of URL strings of process to try to connect to
- :type urls: ``[str, str, ...]``
:param reconnect: Reconnect is enabled by default. You can
pass in an instance of :class:`Backoff` to control reconnect behavior.
A value of ``False`` will prevent the library from automatically
trying to reconnect if the underlying socket is disconnected
before the connection has been closed.
- :type reconnect: :class:`Backoff` or ``bool``
:param heartbeat: A value in seconds indicating the
desired frequency of heartbeats used to test the underlying
socket is alive.
- :type heartbeat: ``float``
:param ssl_domain: SSL configuration.
- :type ssl_domain: :class:`proton.SSLDomain`
:param handler: a connection scoped handler that will be
called to process any events in the scope of this connection
or its child links.
- :type handler: Any child of :class:`proton.Events.Handler`
:param kwargs:
@@ -1279,7 +1300,6 @@
peers.
:return: A new connection object.
- :rtype: :class:`proton.Connection`
.. note:: Only one of ``url`` or ``urls`` should be specified.
@@ -1330,7 +1350,16 @@
return self._connect(url=url, urls=urls, handler=handler, reconnect=reconnect,
heartbeat=heartbeat, ssl_domain=ssl_domain, **kwargs)
- def _connect(self, url=None, urls=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
+ def _connect(
+ self,
+ url: Optional[Union[str, Url]] = None,
+ urls: Optional[List[str]] = None,
+ handler: Optional['Handler'] = None,
+ reconnect: Optional[Union[List[Union[float, int]], bool, Backoff]] = None,
+ heartbeat: None = None,
+ ssl_domain: Optional[SSLDomain] = None,
+ **kwargs
+ ) -> Connection:
conn = self.connection(handler)
conn.container = self.container_id or str(_generate_uuid())
conn.offered_capabilities = kwargs.get('offered_capabilities')
@@ -1369,7 +1398,7 @@
conn.open()
return conn
- def _get_id(self, container, remote, local):
+ def _get_id(self, container: str, remote: Optional[str], local: Optional[str]) -> str:
if local and remote:
"%s-%s-%s" % (container, remote, local)
elif local:
@@ -1394,13 +1423,13 @@
def create_sender(
self,
- context: Union[str, Connection],
+ context: Union[str, Url, Connection],
target: Optional[str] = None,
source: Optional[str] = None,
name: Optional[str] = None,
handler: Optional[Handler] = None,
tags: Optional[Callable[[], bytes]] = None,
- options: Optional[Union['SenderOption', List['SenderOption']]] = None
+ options: Optional[Union['SenderOption', List['SenderOption'], 'LinkOption', List['LinkOption']]] = None
) -> 'Sender':
"""
Initiates the establishment of a link over which messages can
@@ -1455,13 +1484,13 @@
def create_receiver(
self,
- context: Union[Connection, str],
+ context: Union[Connection, Url, str],
source: Optional[str] = None,
target: Optional[str] = None,
name: Optional[str] = None,
dynamic: bool = False,
handler: Optional[Handler] = None,
- options: Optional[Union[ReceiverOption, List[ReceiverOption]]] = None
+ options: Optional[Union[ReceiverOption, List[ReceiverOption], LinkOption, List[LinkOption]]] = None
) -> 'Receiver':
"""
Initiates the establishment of a link over which messages can
@@ -1547,15 +1576,13 @@
context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
return Transaction(context._txn_ctrl, handler, settle_before_discharge)
- def listen(self, url, ssl_domain=None):
+ def listen(self, url: Union[str, Url], ssl_domain: Optional[SSLDomain] = None) -> Acceptor:
"""
Initiates a server socket, accepting incoming AMQP connections
on the interface and port specified.
:param url: URL on which to listen for incoming AMQP connections.
- :type url: ``str`` or :class:`Url`
:param ssl_domain: SSL configuration object if SSL is to be used, ``None`` otherwise.
- :type ssl_domain: :class:`proton.SSLDomain` or ``None``
"""
url = Url(url)
acceptor = self.acceptor(url.host, url.port)
diff --git a/python/proton/_selectable.py b/python/proton/_selectable.py
index a54d2b4..4eb370d 100644
--- a/python/proton/_selectable.py
+++ b/python/proton/_selectable.py
@@ -17,20 +17,24 @@
# under the License.
#
+from typing import Optional, Union, TYPE_CHECKING, Any
+
from ._events import Event
from ._io import PN_INVALID_SOCKET
-from typing import Callable, Optional, Union, TYPE_CHECKING
if TYPE_CHECKING:
from ._events import EventType
- from ._handlers import ConnectSelectable
- from ._reactor import Container, EventInjector, TimerSelectable
+ from ._reactor import Container, EventInjector
from socket import socket
class Selectable(object):
- def __init__(self, delegate, reactor):
+ def __init__(
+ self,
+ delegate: Optional[Union['EventInjector', 'socket']],
+ reactor: 'Container',
+ ) -> None:
self._delegate = delegate
self.reading = False
self.writing = False
@@ -51,7 +55,7 @@
else:
return PN_INVALID_SOCKET
- def __getattr__(self, name):
+ def __getattr__(self, name: str) -> Any:
return getattr(self._delegate, name)
def _get_deadline(self):
diff --git a/python/proton/_transport.py b/python/proton/_transport.py
index 50f2720..bf5e5ab 100644
--- a/python/proton/_transport.py
+++ b/python/proton/_transport.py
@@ -17,6 +17,8 @@
# under the License.
#
+from typing import Callable, Optional, Type, Union, TYPE_CHECKING, List
+
from cproton import PN_EOS, PN_OK, PN_SASL_AUTH, PN_SASL_NONE, PN_SASL_OK, PN_SASL_PERM, PN_SASL_SYS, PN_SASL_TEMP, \
PN_SSL_ANONYMOUS_PEER, PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY, PN_SSL_CERT_SUBJECT_COMMON_NAME, \
PN_SSL_CERT_SUBJECT_COUNTRY_NAME, PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME, PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT, \
@@ -45,8 +47,6 @@
from ._exceptions import EXCEPTIONS, SSLException, SSLUnavailable, SessionException, TransportException
from ._wrapper import Wrapper
-from typing import Callable, Optional, Type, Union, TYPE_CHECKING, List
-
if TYPE_CHECKING:
from ._condition import Condition
from ._endpoints import Connection # would produce circular import
@@ -54,7 +54,7 @@
class TraceAdapter:
- def __init__(self, tracer):
+ def __init__(self, tracer: Callable[['Transport', str], None]) -> None:
self.tracer = tracer
def __call__(self, trans_impl, message):
@@ -85,13 +85,17 @@
""" Transport mode is as a server. """
@staticmethod
- def wrap(impl):
+ def wrap(impl: Optional[Callable]) -> Optional['Transport']:
if impl is None:
return None
else:
return Transport(_impl=impl)
- def __init__(self, mode=None, _impl=pn_transport):
+ def __init__(
+ self,
+ mode: 'Optional[int]' = None,
+ _impl: 'Callable' = pn_transport,
+ ) -> None:
Wrapper.__init__(self, _impl, pn_transport_attachments)
if mode == Transport.SERVER:
pn_transport_set_server(self._impl)
@@ -862,7 +866,12 @@
else:
return err
- def __new__(cls, transport, domain, session_details=None):
+ def __new__(
+ cls: Type['SSL'],
+ transport: Transport,
+ domain: SSLDomain,
+ session_details: Optional['SSLSessionDetails'] = None
+ ) -> 'SSL':
"""Enforce a singleton SSL object per Transport"""
if transport._ssl:
# unfortunately, we've combined the allocation and the configuration in a
@@ -1122,7 +1131,7 @@
RESUME_REUSED = PN_SSL_RESUME_REUSED
"""Session resumed from previous session."""
- def resume_status(self):
+ def resume_status(self) -> int:
"""
Check whether the state has been resumed.
diff --git a/python/proton/_utils.py b/python/proton/_utils.py
index f14db9f..bdc1b34 100644
--- a/python/proton/_utils.py
+++ b/python/proton/_utils.py
@@ -34,14 +34,19 @@
try:
from typing import Literal
except ImportError:
- class Literal:
- def __class_getitem__(cls, item):
+ # https://www.python.org/dev/peps/pep-0560/#class-getitem
+ class GenericMeta(type):
+ def __getitem__(self, item):
pass
+ class Literal(metaclass=GenericMeta):
+ pass
+
if TYPE_CHECKING:
+ from ._delivery import DispositionType
from ._transport import SSLDomain
- from ._reactor import Backoff, SenderOption, ReceiverOption, Connection
- from ._endpoints import Receiver, Sender, Terminus
+ from ._reactor import SenderOption, ReceiverOption, Connection, LinkOption, Backoff
+ from ._endpoints import Receiver, Sender
from ._events import Event
from ._message import Message
@@ -112,7 +117,12 @@
self.link.close()
raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
- def send(self, msg, timeout=False, error_states=None):
+ def send(
+ self,
+ msg: 'Message',
+ timeout: Union[None, Literal[False], float] = False,
+ error_states: Optional[List['DispositionType']] = None,
+ ) -> Delivery:
"""
Blocking send which will return only when the send is complete
and the message settled.
@@ -120,13 +130,10 @@
:param timeout: Timeout in seconds. If ``False``, the value of ``timeout`` used in the
constructor of the :class:`BlockingConnection` object used in the constructor will be used.
If ``None``, there is no timeout. Any other value is treated as a timeout in seconds.
- :type timeout: ``None``, ``False``, ``float``
:param error_states: List of delivery flags which when present in Delivery object
will cause a :class:`SendException` exception to be raised. If ``None``, these
will default to a list containing :const:`proton.Delivery.REJECTED` and :const:`proton.Delivery.RELEASED`.
- :type error_states: ``list``
:return: Delivery object for this message.
- :rtype: :class:`proton.Delivery`
"""
delivery = self.link.send(msg)
self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name,
@@ -144,14 +151,9 @@
class Fetcher(MessagingHandler):
"""
A message handler for blocking receivers.
-
- :param connection:
- :type connection: :class:
- :param prefetch:
- :type prefetch:
"""
- def __init__(self, connection, prefetch):
+ def __init__(self, connection: 'Connection', prefetch: int):
super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False)
self.connection = connection
self.incoming = collections.deque([])
@@ -236,7 +238,10 @@
if hasattr(self, "container"):
self.link.handler = None # implicit call to reactor
- def receive(self, timeout=False):
+ def receive(
+ self,
+ timeout: Union[None, Literal[False], float] = False
+ ) -> 'Message':
"""
Blocking receive call which will return only when a message is received or
a timeout (if supplied) occurs.
@@ -244,7 +249,6 @@
:param timeout: Timeout in seconds. If ``False``, the value of ``timeout`` used in the
constructor of the :class:`BlockingConnection` object used in the constructor will be used.
If ``None``, there is no timeout. Any other value is treated as a timeout in seconds.
- :type timeout: ``None``, ``False``, ``float``
"""
if not self.fetcher:
raise Exception("Can't call receive on this receiver as a handler was not provided")
@@ -282,7 +286,7 @@
else:
self.settle(Delivery.RELEASED)
- def settle(self, state=None):
+ def settle(self, state: Optional['DispositionType'] = None):
"""
Settle any received messages.
@@ -349,23 +353,28 @@
always executed on exit.
:param url: The connection URL.
- :type url: ``str``
:param timeout: Connection timeout in seconds. If ``None``, defaults to 60 seconds.
- :type timeout: ``None`` or float
:param container: Container to process the events on the connection. If ``None``,
a new :class:`proton.Container` will be created.
:param ssl_domain:
:param heartbeat: A value in seconds indicating the desired frequency of
heartbeats used to test the underlying socket is alive.
- :type heartbeat: ``float``
:param urls: A list of connection URLs to try to connect to.
- :type urls: ``list``[``str``]
:param kwargs: Container keyword arguments. See :class:`proton.reactor.Container`
for a list of the valid kwargs.
"""
- def __init__(self, url=None, timeout=None, container=None, ssl_domain=None, heartbeat=None, urls=None,
- reconnect=None, **kwargs):
+ def __init__(
+ self,
+ url: Optional[Union[str, Url]] = None,
+ timeout: Optional[float] = None,
+ container: Optional[Container] = None,
+ ssl_domain: Optional['SSLDomain'] = None,
+ heartbeat: Optional[float] = None,
+ urls: Optional[List[str]] = None,
+ reconnect: Union[None, Literal[False], 'Backoff'] = None,
+ **kwargs
+ ) -> None:
self.disconnected = False
self.timeout = timeout or 60
self.container = container or Container()
@@ -393,21 +402,16 @@
address: Optional[str],
handler: Optional[Handler] = None,
name: Optional[str] = None,
- options: Optional[Union['SenderOption', List['SenderOption']]] = None
+ options: Optional[Union['SenderOption', List['SenderOption'], 'LinkOption', List['LinkOption']]] = None
) -> BlockingSender:
"""
Create a blocking sender.
:param address: Address of target node.
- :type address: ``str``
:param handler: Event handler for this sender.
- :type handler: Any child class of :class:`proton.Handler`
:param name: Sender name.
- :type name: ``str``
:param options: A single option, or a list of sender options
- :type options: :class:`SenderOption` or [SenderOption, SenderOption, ...]
:return: New blocking sender instance.
- :rtype: :class:`BlockingSender`
"""
return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler,
options=options))
@@ -419,7 +423,7 @@
dynamic: bool = False,
handler: Optional[Handler] = None,
name: Optional[str] = None,
- options: Optional[Union['ReceiverOption', List['ReceiverOption']]] = None
+ options: Optional[Union['ReceiverOption', List['ReceiverOption'], 'LinkOption', List['LinkOption']]] = None
) -> BlockingReceiver:
"""
Create a blocking receiver.
@@ -490,18 +494,20 @@
self.container.stop()
self.container.process()
- def wait(self, condition, timeout=False, msg=None):
+ def wait(
+ self,
+ condition: Callable[[], bool],
+ timeout: Union[None, Literal[False], float] = False,
+ msg: Optional[str] = None
+ ) -> None:
"""
Process events until ``condition()`` returns ``True``.
:param condition: Condition which determines when the wait will end.
- :type condition: Function which returns ``bool``
:param timeout: Timeout in seconds. If ``False``, the value of ``timeout`` used in the
constructor of this object will be used. If ``None``, there is no timeout. Any other
value is treated as a timeout in seconds.
- :type timeout: ``None``, ``False``, ``float``
:param msg: Context message for :class:`proton.Timeout` exception
- :type msg: ``str``
"""
if timeout is False:
timeout = self.timeout
diff --git a/python/proton/_wrapper.py b/python/proton/_wrapper.py
index 7ad4c87..4b3973f 100644
--- a/python/proton/_wrapper.py
+++ b/python/proton/_wrapper.py
@@ -17,6 +17,8 @@
# under the License.
#
+from typing import Any, Callable, Optional, Union
+
from cproton import pn_incref, pn_decref, \
pn_py2void, pn_void2py, \
pn_record_get, pn_record_def, pn_record_set, \
@@ -24,11 +26,6 @@
from ._exceptions import ProtonException
-from typing import Any, Callable, Optional, Union, TYPE_CHECKING
-if TYPE_CHECKING:
- from ._delivery import Delivery # circular import
- from ._transport import SASL, Transport
-
class EmptyAttrs:
@@ -62,7 +59,11 @@
"""
- def __init__(self, impl_or_constructor, get_context=None):
+ def __init__(
+ self,
+ impl_or_constructor: Union[Any, Callable[[], Any]],
+ get_context: Optional[Callable[[Any], Any]] = None,
+ ) -> None:
init = False
if callable(impl_or_constructor):
# we are constructing a new object
@@ -119,7 +120,7 @@
def __hash__(self) -> int:
return hash(addressof(self._impl))
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
if isinstance(other, Wrapper):
return addressof(self._impl) == addressof(other._impl)
return False