PROTON-2156: [python] Rework connector scheme to work with tornado ioloop
- This also required a small change to the tornado interface code
- Add back the helloworld_tornado and helloworld_direct_tornado tests
diff --git a/python/examples/proton_tornado.py b/python/examples/proton_tornado.py
index 8829a4a..55e8db8 100755
--- a/python/examples/proton_tornado.py
+++ b/python/examples/proton_tornado.py
@@ -91,8 +91,11 @@
self.loop.add_callback(self._stop)
def _stop(self):
- self.reactor.stop()
- self.loop.stop()
+ # We could have received a new selectable since we sent the stop
+ if self.count == 0:
+ self.reactor.stop()
+ self.loop.stop()
+
class Container(object):
def __init__(self, *handlers, **kwargs):
diff --git a/python/examples/test_examples.py b/python/examples/test_examples.py
index 01bcc94..8df68ec 100644
--- a/python/examples/test_examples.py
+++ b/python/examples/test_examples.py
@@ -69,6 +69,12 @@
def test_helloworld_blocking(self):
self.test_helloworld('helloworld_blocking.py')
+ def test_helloworld_tornado(self):
+ self.test_helloworld('helloworld_tornado.py')
+
+ def test_helloworld_direct_tornado(self):
+ self.test_helloworld('helloworld_direct_tornado.py')
+
def test_simple_send_recv(self, recv='simple_recv.py', send='simple_send.py'):
with Popen([recv]) as r:
with Popen([send]):
diff --git a/python/proton/_handlers.py b/python/proton/_handlers.py
index 3525ffb..5ff45dd 100644
--- a/python/proton/_handlers.py
+++ b/python/proton/_handlers.py
@@ -1396,26 +1396,33 @@
def writable(self):
e = self._delegate.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
t = self._transport
+
+ # Always cleanup this ConnectSelectable: either we failed or created a new one
+ # Do it first to ensure the socket gets deregistered before being registered again
+ # in the case of connecting
+ self.terminate()
+ self._transport = None
+ self.update()
+
if e == 0:
log.debug("Connection succeeded")
+
+ # Disassociate from the socket (which will be passed on)
+ self.release()
+
s = self._reactor.selectable(delegate=self._delegate)
s._transport = t
t._selectable = s
self._iohandler.update(t, s, t._reactor.now)
- # Disassociate from the socket (which has been passed on)
- self._delegate = None
- self.terminate()
- self._transport = None
- self.update()
return
elif e == errno.ECONNREFUSED:
if len(self._addrs) > 0:
log.debug("Connection refused: trying next transport address: %s", self._addrs[0])
+
sock = IO.connect(self._addrs[0])
- self._addrs = self._addrs[1:]
- self._delegate.close()
- self._delegate = sock
+ # New ConnectSelectable for the new socket with rest of addresses
+ ConnectSelectable(sock, self._reactor, self._addrs[1:], t, self._iohandler)
return
else:
log.debug("Connection refused, but tried all transport addresses")
@@ -1426,6 +1433,3 @@
t.close_tail()
t.close_head()
- self.terminate()
- self._transport = None
- self.update()