PROTON-2314: [Python] reconnect/failover makeover
- Trivial typos
- Fixed failover to interact more sensibly with reconnect delays
- Added useful keyword parameters to Backoff class
- Allow specification of backoff delay by using an iterator/generator
- Improved connection logging a bit
  - Can now see connection messages without all events
- Remove unused (and now obsolete) address keyword to Container.connect
- Added some failover tests
- Fixed tests to allow logging INFO messages
diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py
index 3a82e9b..fdffbaf 100644
--- a/python/proton/_reactor.py
+++ b/python/proton/_reactor.py
@@ -885,7 +885,7 @@
         s = event.selectable
 
         sock, name = IO.accept(self._selectable)
-        _logger.debug("Accepted connection from %s", name)
+        _logger.info("Accepted connection from %s", name)
 
         r = self._reactor
         handler = self._handler or r.handler
@@ -902,7 +902,73 @@
         t._selectable = s
         IOHandler.update(t, s, r.now)
 
-class Connector(Handler):
+
+def delay_iter(initial=0.1, factor=2.0, max_delay=10.0, max_tries=None):
+    """
+    iterator yielding the next delay in the sequence of delays. The first
+    delay is 0 seconds, the second 0.1 seconds, and each subsequent
+    call to :meth:`next` doubles the next delay period until a
+    maximum value of 10 seconds is reached.
+    """
+    yield 0.0
+    tries = 1
+    delay = initial
+    while max_tries is None or tries < max_tries:
+        yield delay
+        tries += 1
+        delay = min(max_delay, factor * delay)
+
+
+class Backoff(object):
+    """
+    A reconnect strategy involving an increasing delay between
+    retries, up to a maximum or 10 seconds. Repeated calls
+    to :meth:`next` returns a value for the next delay, starting
+    with an initial value of 0 seconds.
+    """
+
+    def __init__(self, **kwargs):
+        self.kwargs = kwargs
+        self.iter = delay_iter(**self.kwargs)
+
+    def __iter__(self):
+        return self.iter
+
+
+def make_backoff_wrapper(backoff):
+    """
+    Make a wrapper for a backoff object:
+    If the object conforms to the old protocol (has reset and next methods) then
+    wrap it in an iterable that returns an iterator suitable for the new backoff approach
+    otherwise assume it is fine as it is!
+    :param backoff:
+    :return:
+    """
+    class WrappedBackoff(object):
+        def __init__(self, backoff):
+            self.backoff = backoff
+
+        def __iter__(self):
+            self.backoff.reset()
+            return self
+
+        def __next__(self):
+            return self.backoff.next()
+    if hasattr(backoff, 'reset') and hasattr(backoff, 'next'):
+        return WrappedBackoff(backoff)
+    else:
+        return backoff
+
+
+class Urls(object):
+    def __init__(self, values):
+        self.values = [Url(v) for v in values]
+
+    def __iter__(self):
+        return iter(self.values)
+
+
+class _Connector(Handler):
     """
     Internal handler that triggers the necessary socket connect for an
     opened connection.
@@ -922,14 +988,15 @@
         self.virtual_host = None
         self.ssl_sni = None
         self.max_frame_size = None
+        self._connect_sequence = None
+        self._next_url = None
 
-    def _connect(self, connection):
-        url = self.address.next()
+    def _connect(self, connection, url):
         connection.url = url
         # if virtual-host not set, use host from address as default
         if self.virtual_host is None:
             connection.hostname = url.host
-        _logger.debug("connecting to %r..." % url)
+        _logger.info("Connecting to %r..." % url)
 
         transport = Transport()
         if self.sasl_enabled:
@@ -957,86 +1024,55 @@
             transport.max_frame_size = self.max_frame_size
 
     def on_connection_local_open(self, event):
-        self._connect(event.connection)
+        if self.reconnect is None:
+            self._connect_sequence = ((delay, url) for delay in delay_iter() for url in self.address)
+        elif self.reconnect is False:
+            self._connect_sequence = ((delay, url) for delay in delay_iter(max_tries=1) for url in self.address)
+        else:
+            self._connect_sequence = ((delay, url) for delay in self.reconnect for url in self.address)
+        _, url = next(self._connect_sequence) # Ignore delay as we assume first delay must be 0
+        self._connect(event.connection, url)
 
     def on_connection_remote_open(self, event):
-        _logger.debug("connected to %s" % event.connection.hostname)
-        if self.reconnect:
-            self.reconnect.reset()
+        _logger.info("Connected to %s" % event.connection.hostname)
+        if self.reconnect is None:
+            self._connect_sequence = ((delay, url) for delay in delay_iter() for url in self.address)
+        elif self.reconnect:
+            self._connect_sequence = ((delay, url) for delay in self.reconnect for url in self.address)
+        else:
+            self._connect_sequence = None # Help take out the garbage
 
     def on_transport_closed(self, event):
-        if self.connection is None: return
-        if self.connection.state & Endpoint.LOCAL_ACTIVE:
+        if self.connection is None:
+            return
 
-            if self.reconnect:
+        if not self.connection.state & Endpoint.LOCAL_ACTIVE:
+            _logger.info("Disconnected, already closed")
+        elif self.reconnect is False:
+            _logger.info("Disconnected, reconnect disabled")
+        else:
+            try:
                 event.transport.unbind()
-                delay = self.reconnect.next()
+                delay, url = next(self._connect_sequence)
                 if delay == 0:
-                    _logger.info("Disconnected, reconnecting...")
-                    self._connect(self.connection)
+                    _logger.info("Disconnected, reconnecting immediately...")
+                    self._connect(self.connection, url)
                     return
                 else:
                     _logger.info("Disconnected will try to reconnect after %s seconds" % delay)
+                    self._next_url = url
                     event.reactor.schedule(delay, self)
                     return
-            else:
-                _logger.debug("Disconnected")
+            except StopIteration:
+                _logger.info("Disconnected, giving up retrying")
+
         # See connector.cpp: conn.free()/pn_connection_release() here?
         self.connection = None
 
     def on_timer_task(self, event):
-        self._connect(self.connection)
-
-
-class Backoff(object):
-    """
-    A reconnect strategy involving an increasing delay between
-    retries, up to a maximum or 10 seconds. Repeated calls
-    to :meth:`next` returns a value for the next delay, starting
-    with an initial value of 0 seconds.
-    """
-
-    def __init__(self):
-        self.delay = 0
-
-    def reset(self):
-        """
-        Reset the backoff delay to 0 seconds.
-        """
-        self.delay = 0
-
-    def next(self):
-        """
-        Start the next delay in the sequence of delays. The first
-        delay is 0 seconds, the second 0.1 seconds, and each subsequent
-        call to :meth:`next` doubles the next delay period until a
-        maximum value of 10 seconds is reached.
-
-        :return: The next delay in seconds.
-        :rtype: ``float``
-        """
-        current = self.delay
-        if current == 0:
-            self.delay = 0.1
-        else:
-            self.delay = min(10, 2 * current)
-        return current
-
-
-class Urls(object):
-    def __init__(self, values):
-        self.values = [Url(v) for v in values]
-        self.i = iter(self.values)
-
-    def __iter__(self):
-        return self
-
-    def next(self):
-        try:
-            return next(self.i)
-        except StopIteration:
-            self.i = iter(self.values)
-            return next(self.i)
+        if self._next_url:
+            self._connect(self.connection, self._next_url)
+            self._next_url = None
 
 
 class SSLConfig(object):
@@ -1126,7 +1162,7 @@
         for it as follows:
 
             1.  The location set in the environment variable ``MESSAGING_CONNECT_FILE``
-            2.  ``.connect.json``
+            2.  ``./connect.json``
             3.  ``~/.config/messaging/connect.json``
             4.  ``/etc/messaging/connect.json``
 
@@ -1250,14 +1286,14 @@
         else:
             return self._connect(url=url, urls=urls, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=ssl_domain, **kwargs)
 
-    def _connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
+    def _connect(self, url=None, urls=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
         conn = self.connection(handler)
         conn.container = self.container_id or str(_generate_uuid())
         conn.offered_capabilities = kwargs.get('offered_capabilities')
         conn.desired_capabilities = kwargs.get('desired_capabilities')
         conn.properties = kwargs.get('properties')
 
-        connector = Connector(conn)
+        connector = _Connector(conn)
         connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
         connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
         connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
@@ -1275,16 +1311,13 @@
             connector.address = Urls([url])
         elif urls:
             connector.address = Urls(urls)
-        elif address:
-            connector.address = address
         else:
-            raise ValueError("One of url, urls or address required")
+            raise ValueError("One of url or urls required")
         if heartbeat:
             connector.heartbeat = heartbeat
-        if reconnect:
-            connector.reconnect = reconnect
-        elif reconnect is None:
-            connector.reconnect = Backoff()
+
+        connector.reconnect = make_backoff_wrapper(reconnect)
+
         # use container's default client domain if none specified.  This is
         # only necessary of the URL specifies the "amqps:" scheme
         connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
diff --git a/python/proton/_utils.py b/python/proton/_utils.py
index 8a10b0f..a9812bb 100644
--- a/python/proton/_utils.py
+++ b/python/proton/_utils.py
@@ -523,7 +523,7 @@
     def next(self):
         """Get the next value"""
         self.lock.acquire()
-        self.count += self.step;
+        self.count += self.step
         result = self.count
         self.lock.release()
         return result
diff --git a/python/tests/proton_tests/main.py b/python/tests/proton_tests/main.py
index bb1009d..e560cf1 100644
--- a/python/tests/proton_tests/main.py
+++ b/python/tests/proton_tests/main.py
@@ -22,7 +22,7 @@
 import optparse, os, struct, sys, time, traceback, types, cgi
 from fnmatch import fnmatchcase as match
 from logging import getLogger, StreamHandler, Formatter, Filter, \
-    WARN, DEBUG, ERROR
+    WARN, DEBUG, ERROR, INFO
 
 from .common import SkipTest
 
@@ -33,6 +33,7 @@
 
 levels = {
   "DEBUG": DEBUG,
+  "INFO": INFO,
   "WARN": WARN,
   "ERROR": ERROR
   }
diff --git a/python/tests/proton_tests/reactor.py b/python/tests/proton_tests/reactor.py
index 99ea996..c4e1227 100644
--- a/python/tests/proton_tests/reactor.py
+++ b/python/tests/proton_tests/reactor.py
@@ -21,11 +21,11 @@
 
 import time
 
-from proton.reactor import Container, ApplicationEvent, EventInjector, Selector
+from proton.reactor import Container, ApplicationEvent, EventInjector, Selector, Backoff
 from proton.handlers import Handshaker, MessagingHandler
 from proton import Handler, Url, symbol
 
-from .common import Test, SkipTest, TestServer, free_tcp_port, ensureCanTestExtendedSASL
+from .common import Test, SkipTest, TestServer, free_tcp_port, free_tcp_ports, ensureCanTestExtendedSASL
 
 class Barf(Exception):
     pass
@@ -442,18 +442,21 @@
         def __init__(self):
             super(ContainerTest._ClientHandler, self).__init__()
             self.server_addr = None
+            self.errors = 0
 
         def on_connection_opened(self, event):
             self.server_addr = event.connected_address
             event.connection.close()
 
+        def on_transport_error(self, event):
+            self.errors += 1
+
     def test_numeric_hostname(self):
         ensureCanTestExtendedSASL()
         server_handler = ContainerTest._ServerHandler("127.0.0.1")
         client_handler = ContainerTest._ClientHandler()
         container = Container(server_handler)
-        container.connect(url=Url(host="127.0.0.1",
-                                  port=server_handler.port),
+        container.connect(url="127.0.0.1:%s" % (server_handler.port),
                           handler=client_handler)
         container.run()
         assert server_handler.client_addr
@@ -466,8 +469,7 @@
         server_handler = ContainerTest._ServerHandler("localhost")
         client_handler = ContainerTest._ClientHandler()
         container = Container(server_handler)
-        container.connect(url=Url(host="localhost",
-                                  port=server_handler.port),
+        container.connect(url="localhost:%s" % (server_handler.port),
                           handler=client_handler)
         container.run()
         assert server_handler.client_addr
@@ -479,8 +481,7 @@
         ensureCanTestExtendedSASL()
         server_handler = ContainerTest._ServerHandler("localhost")
         container = Container(server_handler)
-        conn = container.connect(url=Url(host="localhost",
-                                         port=server_handler.port),
+        conn = container.connect(url="localhost:%s" % (server_handler.port),
                                  handler=ContainerTest._ClientHandler(),
                                  virtual_host="a.b.c.org")
         container.run()
@@ -492,8 +493,7 @@
         # Python Container.
         server_handler = ContainerTest._ServerHandler("localhost")
         container = Container(server_handler)
-        conn = container.connect(url=Url(host="localhost",
-                                         port=server_handler.port),
+        conn = container.connect(url="localhost:%s" % (server_handler.port),
                                  handler=ContainerTest._ClientHandler(),
                                  virtual_host="")
         container.run()
@@ -536,11 +536,55 @@
             self.connect_failed = True
             self.server_handler.listen(event.container)
 
+    def test_failover(self):
+        server_handler = ContainerTest._ServerHandler("localhost")
+        client_handler = ContainerTest._ClientHandler()
+        free_ports = free_tcp_ports(2)
+        container = Container(server_handler)
+        container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1]),
+                                "localhost:%s" % (server_handler.port)],
+                          handler=client_handler)
+        container.run()
+        assert server_handler.peer_hostname == 'localhost', server_handler.peer_hostname
+        assert client_handler.server_addr == Url(host='localhost', port=server_handler.port), client_handler.server_addr
+
+    def test_failover_fail(self):
+        client_handler = ContainerTest._ClientHandler()
+        free_ports = free_tcp_ports(2)
+        container = Container(client_handler)
+        start = time.time()
+        container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1])],
+                          reconnect=Backoff(max_tries=5),
+                          handler=client_handler)
+        container.run()
+        end = time.time()
+        assert client_handler.errors == 10
+        # Total time for failure should be greater than but close to 3s
+        # would like to have an upper bound of about 3.2 too - but loaded CI machines can take a loooong time!
+        assert 3.0 < end-start, end-start
+        assert client_handler.server_addr is None, client_handler.server_addr
+
+    def test_failover_fail_custom_reconnect(self):
+        client_handler = ContainerTest._ClientHandler()
+        free_ports = free_tcp_ports(2)
+        container = Container(client_handler)
+        start = time.time()
+        container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1])],
+                          reconnect=[0, 0.5, 1],
+                          handler=client_handler)
+        container.run()
+        end = time.time()
+        assert client_handler.errors == 6
+        # Total time for failure should be greater than but close to 3s
+        # would like to have an upper bound of about 3.2 too - but loaded CI machines can take a loooong time!
+        assert 3.0 < end-start, end-start
+        assert client_handler.server_addr is None, client_handler.server_addr
+
     def test_reconnect(self):
         server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=True)
         client_handler = ContainerTest._ReconnectClientHandler(server_handler)
         container = Container(server_handler)
-        container.connect(url=Url(host="localhost", port=server_handler.port),
+        container.connect(url="localhost:%s" % (server_handler.port),
                           handler=client_handler)
         container.run()
         assert server_handler.peer_hostname == 'localhost', server_handler.peer_hostname
@@ -551,7 +595,7 @@
         server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=False)
         client_handler = ContainerTest._ReconnectClientHandler(server_handler)
         container = Container(server_handler)
-        container.connect(url=Url(host="localhost", port=server_handler.port),
+        container.connect(url="localhost:%s" % (server_handler.port),
                           handler=client_handler, reconnect=False)
         container.run()
         assert server_handler.peer_hostname == None, server_handler.peer_hostname