QPID-7317: More robust qpid.selector with better logging

This commit disables the selector and related qpid.messaging objects when the
selector thread exits for any reason: process exit, fork, exception etc.  Any
subsequent use will throw an exception and log the locations of the failed call
and where the selector thread was stopped. This should be slightly more
predictable & robust than commit 037c573 which tried to keep the selector alive
in a daemon thread.

I have not been able to hang the pulp_smash test suite with this patch. The new
logging shows that celery workers do sometimes use qpid.messaging in an illegal
state, which could cause the reported hang. So far I have not seen a stack trace
that is an exact match for reported stacks. If this patch does not address the
pulp problem it should at least provide much better debugging information in
journalctl log output after the hang.
diff --git a/qpid/selector.py b/qpid/selector.py
index 32e542b..56b137d 100644
--- a/qpid/selector.py
+++ b/qpid/selector.py
@@ -6,9 +6,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,30 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import time, errno, os
+import time, errno, os, atexit, traceback
 from compat import select, SelectError, set, selectable_waiter, format_exc
 from threading import Thread, Lock
 from logging import getLogger
+from qpid.messaging import InternalError
+
+def _stack(skip=0):
+  return ("".join(traceback.format_stack()[:-(1+skip)])).strip()
+
+class SelectorStopped(InternalError):
+  def __init__(self, msg, where=None):
+    InternalError.__init__(self, text=msg)
+    self.where = _stack(1)
+
+def _check(ex, skip=0):
+  if ex:
+    log.error("illegal use of qpid.messaging at:\n%s\n%s" % (_stack(1), ex))
+    where = getattr(ex, 'where')
+    if where:
+      log.error("qpid.messaging was previously stopped at:\n%s\n%s" % (where, ex))
+    raise ex
 
 log = getLogger("qpid.messaging")
 
-class SelectorException(Exception): pass
-
 class Acceptor:
 
   def __init__(self, sock, handler):
@@ -57,11 +72,11 @@
       if Selector.DEFAULT is None or Selector._current_pid != os.getpid():
         # If we forked, mark the existing Selector dead.
         if Selector.DEFAULT is not None:
-          log.warning("qpid.messaging: process was forked")
-          Selector.DEFAULT.dead(
-            SelectorException("qpid.messaging: forked child process used parent connection"), True)
+          log.warning("process forked, child must not use parent qpid.messaging")
+          Selector.DEFAULT.dead(SelectorStopped("forked child using parent qpid.messaging"))
         sel = Selector()
         sel.start()
+        atexit.register(sel.stop)
         Selector.DEFAULT = sel
         Selector._current_pid = os.getpid()
       return Selector.DEFAULT
@@ -75,13 +90,10 @@
     self.waiter = selectable_waiter()
     self.reading.add(self.waiter)
     self.stopped = False
-    self.thread = None
     self.exception = None
 
   def wakeup(self):
-    if self.exception:
-      log.error(str(self.exception))
-      raise self.exception
+    _check(self.exception)
     self.waiter.wakeup()
 
   def register(self, selectable):
@@ -110,8 +122,7 @@
     self.wakeup()
 
   def start(self):
-    self.stopped = False
-    self.exception = None
+    _check(self.exception)
     self.thread = Thread(target=self.run)
     self.thread.setDaemon(True)
     self.thread.start();
@@ -162,60 +173,44 @@
           if w is not None and now > w:
             sel.timeout()
     except Exception, e:
-      log.error("qpid.messaging: I/O thread has died: %s\n%s" % (e, format_exc()))
-      dead(e, False)
-      raise
-    self.dead(SelectorException("qpid.messaging: I/O thread exited"), False)
+      log.error("qpid.messaging thread died: %s" % e)
+      self.exception = SelectorStopped(str(e))
+    self.exception = self.exception or self.stopped
+    self.dead(self.exception or SelectorStopped("qpid.messaging thread died: reason unknown"))
 
   def stop(self, timeout=None):
-    """Stop the selector and wait for it's thread to exit.
-    Ignored for the shared default() selector, which stops when the process exits.
+    """Stop the selector and wait for it's thread to exit. It cannot be re-started"""
+    if self.thread and not self.stopped:
+      self.stopped = SelectorStopped("qpid.messaging thread has been stopped")
+      self.wakeup()
+      self.thread.join(timeout)
 
+  def dead(self, e):
+    """Mark the Selector as dead if it is stopped for any reason.  Ensure there any future
+    attempt to use the selector or any of its connections will throw an exception.
     """
-    if self.DEFAULT == self:    # Never stop the DEFAULT Selector
-      return
-    self.stopped = True
-    self.wakeup()
-    self.thread.join(timeout)
-    self.dead(SelectorException("qpid.messaging: I/O thread stopped"), False)
-
-  def dead(self, e, forked):
-    """Mark the Selector as dead if it is stopped for any reason.
-    Ensure there any future calls to wait() will raise an exception.
-    If the thread died because of a fork() then ensure further that
-    attempting to take the connections lock also raises.
-    """
-    self.thread = None
     self.exception = e
-    for sel in self.selectables.copy():
-      try:
-        # Mark the connection as failed
-        sel.connection.error = e
-        if forked:
-          # Replace connection's locks, they may be permanently locked in the forked child.
-          c = sel.connection
-          c.error = e
-          c._lock = BrokenLock(e)
-          for ssn in c.sessions.values():
-            ssn._lock = c._lock
-            for l in ssn.senders + ssn.receivers:
-              l._lock = c._lock
-      except:
-        pass
     try:
-      if forked:
-        self.waiter.close()       # Don't mess with the parent's FDs
-      else:
-        self.waiter.wakeup()      # In case somebody re-waited while we were cleaning up.
-    except:
-      pass
+      for sel in self.selectables.copy():
+        c = sel.connection
+        for ssn in c.sessions.values():
+          for l in ssn.senders + ssn.receivers:
+            disable(l, self.exception)
+          disable(ssn, self.exception)
+        disable(c, self.exception)
+    except Exception, e:
+      log.error("error stopping qpid.messaging (%s)\n%s", self.exception, format_exc())
+    try:
+      self.waiter.close()
+    except Exception, e:
+      log.error("error stopping qpid.messaging (%s)\n%s", self.exception, format_exc())
 
-class BrokenLock(object):
-  """Dummy lock-like object that raises an exception. Used in forked child to
-      replace locks that may be held in the parent process."""
-  def __init__(self, exception):
-    self.exception = exception
-
-  def acquire(self):
-    log.error(str(self.exception))
-    raise self.exception
+# Disable an object so it raises exceptions on any use
+import inspect
+def disable(obj, exception):
+  assert(exception)
+  def log_raise(*args, **kwargs):
+    _check(exception, 1)
+  # Replace all methods with log_raise
+  for m in inspect.getmembers(obj, predicate=inspect.ismethod):
+    setattr(obj, m[0], log_raise)
diff --git a/qpid/tests/messaging/__init__.py b/qpid/tests/messaging/__init__.py
index be7000f..e73c058 100644
--- a/qpid/tests/messaging/__init__.py
+++ b/qpid/tests/messaging/__init__.py
@@ -233,4 +233,4 @@
       s += ":%s" % self.port
     return s
 
-import address, endpoints, message
+import address, endpoints, message, selector
diff --git a/qpid/tests/messaging/selector.py b/qpid/tests/messaging/selector.py
new file mode 100644
index 0000000..fc29607
--- /dev/null
+++ b/qpid/tests/messaging/selector.py
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys, os
+from logging import getLogger
+from unittest import TestCase
+from qpid.selector import Selector
+from qpid.messaging import *
+from qpid.messaging.exceptions import InternalError
+
+class SelectorTests(TestCase):
+  """Make sure that using a connection after a selector stops raises and doesn't hang"""
+
+  def setUp(self):
+    self.log = getLogger("qpid.messaging")
+    self.propagate = self.log.propagate
+    self.log.propagate = False  # Disable for tests, expected log output is noisy
+
+  def tearDown(self):
+    # Clear out any broken selector so next test can function
+    Selector.DEFAULT = None
+    self.log.propagate = self.propagate  # Restore setting
+
+  def configure(self, config):
+    self.broker = config.broker
+
+  def test_use_after_stop(self):
+    """Create endpoints, stop the selector, try to use them"""
+    c = Connection.establish(self.broker)
+    ssn = c.session()
+    r = ssn.receiver("foo;{create:always,delete:always}")
+    s = ssn.sender("foo;{create:always,delete:always}")
+
+    Selector.DEFAULT.stop()
+    self.assertRaises(InternalError, c.session)
+    self.assertRaises(InternalError, ssn.sender, "foo")
+    self.assertRaises(InternalError, s.send, "foo")
+    self.assertRaises(InternalError, r.fetch)
+    self.assertRaises(InternalError, Connection.establish, self.broker)
+
+  def test_use_after_fork(self):
+    c = Connection.establish(self.broker)
+    pid = os.fork()
+    if pid:                     # Parent
+      self.assertEqual((pid, 0), os.waitpid(pid, 0))
+      self.assertEqual("child", c.session().receiver("child;{create:always}").fetch().content)
+    else:                       # Child
+      try:
+        # Can establish new connections
+        s = Connection.establish(self.broker).session().sender("child;{create:always}")
+        self.assertRaises(InternalError, c.session) # But can't use parent connection
+        s.send("child")
+        os._exit(0)
+      except Exception, e:
+        print >>sys.stderr, "test child process error: %s" % e
+        os.exit(1)
+      finally:
+        os._exit(1)             # Hard exit from child to stop remaining tests running twice