PROTON-2156: [python] Rework connector scheme to work with tornado ioloop
- This also required a small change to the tornado interface code
- Add back the helloworld_tornado and helloworld_direct_tornado tests
diff --git a/python/examples/proton_tornado.py b/python/examples/proton_tornado.py
index 8829a4a..55e8db8 100755
--- a/python/examples/proton_tornado.py
+++ b/python/examples/proton_tornado.py
@@ -91,8 +91,11 @@
             self.loop.add_callback(self._stop)
 
     def _stop(self):
-        self.reactor.stop()
-        self.loop.stop()
+        # We could have received a new selectable since we sent the stop
+        if self.count == 0:
+            self.reactor.stop()
+            self.loop.stop()
+
 
 class Container(object):
     def __init__(self, *handlers, **kwargs):
diff --git a/python/examples/test_examples.py b/python/examples/test_examples.py
index 01bcc94..8df68ec 100644
--- a/python/examples/test_examples.py
+++ b/python/examples/test_examples.py
@@ -69,6 +69,12 @@
     def test_helloworld_blocking(self):
         self.test_helloworld('helloworld_blocking.py')
 
+    def test_helloworld_tornado(self):
+        self.test_helloworld('helloworld_tornado.py')
+
+    def test_helloworld_direct_tornado(self):
+        self.test_helloworld('helloworld_direct_tornado.py')
+
     def test_simple_send_recv(self, recv='simple_recv.py', send='simple_send.py'):
         with Popen([recv]) as r:
             with Popen([send]):
diff --git a/python/proton/_handlers.py b/python/proton/_handlers.py
index 3525ffb..5ff45dd 100644
--- a/python/proton/_handlers.py
+++ b/python/proton/_handlers.py
@@ -1396,26 +1396,33 @@
     def writable(self):
         e = self._delegate.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
         t = self._transport
+
+        # Always cleanup this ConnectSelectable: either we failed or created a new one
+        # Do it first to ensure the socket gets deregistered before being registered again
+        # in the case of connecting
+        self.terminate()
+        self._transport = None
+        self.update()
+
         if e == 0:
             log.debug("Connection succeeded")
+
+            # Disassociate from the socket (which will be passed on)
+            self.release()
+
             s = self._reactor.selectable(delegate=self._delegate)
             s._transport = t
             t._selectable = s
             self._iohandler.update(t, s, t._reactor.now)
 
-            # Disassociate from the socket (which has been passed on)
-            self._delegate = None
-            self.terminate()
-            self._transport = None
-            self.update()
             return
         elif e == errno.ECONNREFUSED:
             if len(self._addrs) > 0:
                 log.debug("Connection refused: trying next transport address: %s", self._addrs[0])
+
                 sock = IO.connect(self._addrs[0])
-                self._addrs = self._addrs[1:]
-                self._delegate.close()
-                self._delegate = sock
+                # New ConnectSelectable for the new socket with rest of addresses
+                ConnectSelectable(sock, self._reactor, self._addrs[1:], t, self._iohandler)
                 return
             else:
                 log.debug("Connection refused, but tried all transport addresses")
@@ -1426,6 +1433,3 @@
 
         t.close_tail()
         t.close_head()
-        self.terminate()
-        self._transport = None
-        self.update()