PROTON-2314: [Python] reconnect/failover makeover
- Trivial typos
- Fixed failover to interact more sensibly with reconnect delays
- Added useful keyword parameters to Backoff class
- Allow specification of backoff delay by using an iterator/generator
- Improved connection logging a bit
- Can now see connection messages without all events
- Remove unused (and now obsolete) address keyword to Container.connect
- Added some failover tests
- Fixed tests to allow logging INFO messages
diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py
index 3a82e9b..fdffbaf 100644
--- a/python/proton/_reactor.py
+++ b/python/proton/_reactor.py
@@ -885,7 +885,7 @@
s = event.selectable
sock, name = IO.accept(self._selectable)
- _logger.debug("Accepted connection from %s", name)
+ _logger.info("Accepted connection from %s", name)
r = self._reactor
handler = self._handler or r.handler
@@ -902,7 +902,73 @@
t._selectable = s
IOHandler.update(t, s, r.now)
-class Connector(Handler):
+
+def delay_iter(initial=0.1, factor=2.0, max_delay=10.0, max_tries=None):
+ """
+ iterator yielding the next delay in the sequence of delays. The first
+ delay is 0 seconds, the second 0.1 seconds, and each subsequent
+ call to :meth:`next` doubles the next delay period until a
+ maximum value of 10 seconds is reached.
+ """
+ yield 0.0
+ tries = 1
+ delay = initial
+ while max_tries is None or tries < max_tries:
+ yield delay
+ tries += 1
+ delay = min(max_delay, factor * delay)
+
+
+class Backoff(object):
+ """
+ A reconnect strategy involving an increasing delay between
+ retries, up to a maximum or 10 seconds. Repeated calls
+ to :meth:`next` returns a value for the next delay, starting
+ with an initial value of 0 seconds.
+ """
+
+ def __init__(self, **kwargs):
+ self.kwargs = kwargs
+ self.iter = delay_iter(**self.kwargs)
+
+ def __iter__(self):
+ return self.iter
+
+
+def make_backoff_wrapper(backoff):
+ """
+ Make a wrapper for a backoff object:
+ If the object conforms to the old protocol (has reset and next methods) then
+ wrap it in an iterable that returns an iterator suitable for the new backoff approach
+ otherwise assume it is fine as it is!
+ :param backoff:
+ :return:
+ """
+ class WrappedBackoff(object):
+ def __init__(self, backoff):
+ self.backoff = backoff
+
+ def __iter__(self):
+ self.backoff.reset()
+ return self
+
+ def __next__(self):
+ return self.backoff.next()
+ if hasattr(backoff, 'reset') and hasattr(backoff, 'next'):
+ return WrappedBackoff(backoff)
+ else:
+ return backoff
+
+
+class Urls(object):
+ def __init__(self, values):
+ self.values = [Url(v) for v in values]
+
+ def __iter__(self):
+ return iter(self.values)
+
+
+class _Connector(Handler):
"""
Internal handler that triggers the necessary socket connect for an
opened connection.
@@ -922,14 +988,15 @@
self.virtual_host = None
self.ssl_sni = None
self.max_frame_size = None
+ self._connect_sequence = None
+ self._next_url = None
- def _connect(self, connection):
- url = self.address.next()
+ def _connect(self, connection, url):
connection.url = url
# if virtual-host not set, use host from address as default
if self.virtual_host is None:
connection.hostname = url.host
- _logger.debug("connecting to %r..." % url)
+ _logger.info("Connecting to %r..." % url)
transport = Transport()
if self.sasl_enabled:
@@ -957,86 +1024,55 @@
transport.max_frame_size = self.max_frame_size
def on_connection_local_open(self, event):
- self._connect(event.connection)
+ if self.reconnect is None:
+ self._connect_sequence = ((delay, url) for delay in delay_iter() for url in self.address)
+ elif self.reconnect is False:
+ self._connect_sequence = ((delay, url) for delay in delay_iter(max_tries=1) for url in self.address)
+ else:
+ self._connect_sequence = ((delay, url) for delay in self.reconnect for url in self.address)
+ _, url = next(self._connect_sequence) # Ignore delay as we assume first delay must be 0
+ self._connect(event.connection, url)
def on_connection_remote_open(self, event):
- _logger.debug("connected to %s" % event.connection.hostname)
- if self.reconnect:
- self.reconnect.reset()
+ _logger.info("Connected to %s" % event.connection.hostname)
+ if self.reconnect is None:
+ self._connect_sequence = ((delay, url) for delay in delay_iter() for url in self.address)
+ elif self.reconnect:
+ self._connect_sequence = ((delay, url) for delay in self.reconnect for url in self.address)
+ else:
+ self._connect_sequence = None # Help take out the garbage
def on_transport_closed(self, event):
- if self.connection is None: return
- if self.connection.state & Endpoint.LOCAL_ACTIVE:
+ if self.connection is None:
+ return
- if self.reconnect:
+ if not self.connection.state & Endpoint.LOCAL_ACTIVE:
+ _logger.info("Disconnected, already closed")
+ elif self.reconnect is False:
+ _logger.info("Disconnected, reconnect disabled")
+ else:
+ try:
event.transport.unbind()
- delay = self.reconnect.next()
+ delay, url = next(self._connect_sequence)
if delay == 0:
- _logger.info("Disconnected, reconnecting...")
- self._connect(self.connection)
+ _logger.info("Disconnected, reconnecting immediately...")
+ self._connect(self.connection, url)
return
else:
_logger.info("Disconnected will try to reconnect after %s seconds" % delay)
+ self._next_url = url
event.reactor.schedule(delay, self)
return
- else:
- _logger.debug("Disconnected")
+ except StopIteration:
+ _logger.info("Disconnected, giving up retrying")
+
# See connector.cpp: conn.free()/pn_connection_release() here?
self.connection = None
def on_timer_task(self, event):
- self._connect(self.connection)
-
-
-class Backoff(object):
- """
- A reconnect strategy involving an increasing delay between
- retries, up to a maximum or 10 seconds. Repeated calls
- to :meth:`next` returns a value for the next delay, starting
- with an initial value of 0 seconds.
- """
-
- def __init__(self):
- self.delay = 0
-
- def reset(self):
- """
- Reset the backoff delay to 0 seconds.
- """
- self.delay = 0
-
- def next(self):
- """
- Start the next delay in the sequence of delays. The first
- delay is 0 seconds, the second 0.1 seconds, and each subsequent
- call to :meth:`next` doubles the next delay period until a
- maximum value of 10 seconds is reached.
-
- :return: The next delay in seconds.
- :rtype: ``float``
- """
- current = self.delay
- if current == 0:
- self.delay = 0.1
- else:
- self.delay = min(10, 2 * current)
- return current
-
-
-class Urls(object):
- def __init__(self, values):
- self.values = [Url(v) for v in values]
- self.i = iter(self.values)
-
- def __iter__(self):
- return self
-
- def next(self):
- try:
- return next(self.i)
- except StopIteration:
- self.i = iter(self.values)
- return next(self.i)
+ if self._next_url:
+ self._connect(self.connection, self._next_url)
+ self._next_url = None
class SSLConfig(object):
@@ -1126,7 +1162,7 @@
for it as follows:
1. The location set in the environment variable ``MESSAGING_CONNECT_FILE``
- 2. ``.connect.json``
+ 2. ``./connect.json``
3. ``~/.config/messaging/connect.json``
4. ``/etc/messaging/connect.json``
@@ -1250,14 +1286,14 @@
else:
return self._connect(url=url, urls=urls, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=ssl_domain, **kwargs)
- def _connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
+ def _connect(self, url=None, urls=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
conn = self.connection(handler)
conn.container = self.container_id or str(_generate_uuid())
conn.offered_capabilities = kwargs.get('offered_capabilities')
conn.desired_capabilities = kwargs.get('desired_capabilities')
conn.properties = kwargs.get('properties')
- connector = Connector(conn)
+ connector = _Connector(conn)
connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
@@ -1275,16 +1311,13 @@
connector.address = Urls([url])
elif urls:
connector.address = Urls(urls)
- elif address:
- connector.address = address
else:
- raise ValueError("One of url, urls or address required")
+ raise ValueError("One of url or urls required")
if heartbeat:
connector.heartbeat = heartbeat
- if reconnect:
- connector.reconnect = reconnect
- elif reconnect is None:
- connector.reconnect = Backoff()
+
+ connector.reconnect = make_backoff_wrapper(reconnect)
+
# use container's default client domain if none specified. This is
# only necessary of the URL specifies the "amqps:" scheme
connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
diff --git a/python/proton/_utils.py b/python/proton/_utils.py
index 8a10b0f..a9812bb 100644
--- a/python/proton/_utils.py
+++ b/python/proton/_utils.py
@@ -523,7 +523,7 @@
def next(self):
"""Get the next value"""
self.lock.acquire()
- self.count += self.step;
+ self.count += self.step
result = self.count
self.lock.release()
return result
diff --git a/python/tests/proton_tests/main.py b/python/tests/proton_tests/main.py
index bb1009d..e560cf1 100644
--- a/python/tests/proton_tests/main.py
+++ b/python/tests/proton_tests/main.py
@@ -22,7 +22,7 @@
import optparse, os, struct, sys, time, traceback, types, cgi
from fnmatch import fnmatchcase as match
from logging import getLogger, StreamHandler, Formatter, Filter, \
- WARN, DEBUG, ERROR
+ WARN, DEBUG, ERROR, INFO
from .common import SkipTest
@@ -33,6 +33,7 @@
levels = {
"DEBUG": DEBUG,
+ "INFO": INFO,
"WARN": WARN,
"ERROR": ERROR
}
diff --git a/python/tests/proton_tests/reactor.py b/python/tests/proton_tests/reactor.py
index 99ea996..c4e1227 100644
--- a/python/tests/proton_tests/reactor.py
+++ b/python/tests/proton_tests/reactor.py
@@ -21,11 +21,11 @@
import time
-from proton.reactor import Container, ApplicationEvent, EventInjector, Selector
+from proton.reactor import Container, ApplicationEvent, EventInjector, Selector, Backoff
from proton.handlers import Handshaker, MessagingHandler
from proton import Handler, Url, symbol
-from .common import Test, SkipTest, TestServer, free_tcp_port, ensureCanTestExtendedSASL
+from .common import Test, SkipTest, TestServer, free_tcp_port, free_tcp_ports, ensureCanTestExtendedSASL
class Barf(Exception):
pass
@@ -442,18 +442,21 @@
def __init__(self):
super(ContainerTest._ClientHandler, self).__init__()
self.server_addr = None
+ self.errors = 0
def on_connection_opened(self, event):
self.server_addr = event.connected_address
event.connection.close()
+ def on_transport_error(self, event):
+ self.errors += 1
+
def test_numeric_hostname(self):
ensureCanTestExtendedSASL()
server_handler = ContainerTest._ServerHandler("127.0.0.1")
client_handler = ContainerTest._ClientHandler()
container = Container(server_handler)
- container.connect(url=Url(host="127.0.0.1",
- port=server_handler.port),
+ container.connect(url="127.0.0.1:%s" % (server_handler.port),
handler=client_handler)
container.run()
assert server_handler.client_addr
@@ -466,8 +469,7 @@
server_handler = ContainerTest._ServerHandler("localhost")
client_handler = ContainerTest._ClientHandler()
container = Container(server_handler)
- container.connect(url=Url(host="localhost",
- port=server_handler.port),
+ container.connect(url="localhost:%s" % (server_handler.port),
handler=client_handler)
container.run()
assert server_handler.client_addr
@@ -479,8 +481,7 @@
ensureCanTestExtendedSASL()
server_handler = ContainerTest._ServerHandler("localhost")
container = Container(server_handler)
- conn = container.connect(url=Url(host="localhost",
- port=server_handler.port),
+ conn = container.connect(url="localhost:%s" % (server_handler.port),
handler=ContainerTest._ClientHandler(),
virtual_host="a.b.c.org")
container.run()
@@ -492,8 +493,7 @@
# Python Container.
server_handler = ContainerTest._ServerHandler("localhost")
container = Container(server_handler)
- conn = container.connect(url=Url(host="localhost",
- port=server_handler.port),
+ conn = container.connect(url="localhost:%s" % (server_handler.port),
handler=ContainerTest._ClientHandler(),
virtual_host="")
container.run()
@@ -536,11 +536,55 @@
self.connect_failed = True
self.server_handler.listen(event.container)
+ def test_failover(self):
+ server_handler = ContainerTest._ServerHandler("localhost")
+ client_handler = ContainerTest._ClientHandler()
+ free_ports = free_tcp_ports(2)
+ container = Container(server_handler)
+ container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1]),
+ "localhost:%s" % (server_handler.port)],
+ handler=client_handler)
+ container.run()
+ assert server_handler.peer_hostname == 'localhost', server_handler.peer_hostname
+ assert client_handler.server_addr == Url(host='localhost', port=server_handler.port), client_handler.server_addr
+
+ def test_failover_fail(self):
+ client_handler = ContainerTest._ClientHandler()
+ free_ports = free_tcp_ports(2)
+ container = Container(client_handler)
+ start = time.time()
+ container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1])],
+ reconnect=Backoff(max_tries=5),
+ handler=client_handler)
+ container.run()
+ end = time.time()
+ assert client_handler.errors == 10
+ # Total time for failure should be greater than but close to 3s
+ # would like to have an upper bound of about 3.2 too - but loaded CI machines can take a loooong time!
+ assert 3.0 < end-start, end-start
+ assert client_handler.server_addr is None, client_handler.server_addr
+
+ def test_failover_fail_custom_reconnect(self):
+ client_handler = ContainerTest._ClientHandler()
+ free_ports = free_tcp_ports(2)
+ container = Container(client_handler)
+ start = time.time()
+ container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1])],
+ reconnect=[0, 0.5, 1],
+ handler=client_handler)
+ container.run()
+ end = time.time()
+ assert client_handler.errors == 6
+ # Total time for failure should be greater than but close to 3s
+ # would like to have an upper bound of about 3.2 too - but loaded CI machines can take a loooong time!
+ assert 3.0 < end-start, end-start
+ assert client_handler.server_addr is None, client_handler.server_addr
+
def test_reconnect(self):
server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=True)
client_handler = ContainerTest._ReconnectClientHandler(server_handler)
container = Container(server_handler)
- container.connect(url=Url(host="localhost", port=server_handler.port),
+ container.connect(url="localhost:%s" % (server_handler.port),
handler=client_handler)
container.run()
assert server_handler.peer_hostname == 'localhost', server_handler.peer_hostname
@@ -551,7 +595,7 @@
server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=False)
client_handler = ContainerTest._ReconnectClientHandler(server_handler)
container = Container(server_handler)
- container.connect(url=Url(host="localhost", port=server_handler.port),
+ container.connect(url="localhost:%s" % (server_handler.port),
handler=client_handler, reconnect=False)
container.run()
assert server_handler.peer_hostname == None, server_handler.peer_hostname