blob: 32e542bc71573d59c636ae6a224d2ce4164ab9e6 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import time, errno, os
from compat import select, SelectError, set, selectable_waiter, format_exc
from threading import Thread, Lock
from logging import getLogger
log = getLogger("qpid.messaging")
class SelectorException(Exception): pass
class Acceptor:
def __init__(self, sock, handler):
self.sock = sock
self.handler = handler
def fileno(self):
return self.sock.fileno()
def reading(self):
return True
def writing(self):
return False
def readable(self):
sock, addr = self.sock.accept()
self.handler(sock)
class Selector:
lock = Lock()
DEFAULT = None
_current_pid = None
@staticmethod
def default():
Selector.lock.acquire()
try:
if Selector.DEFAULT is None or Selector._current_pid != os.getpid():
# If we forked, mark the existing Selector dead.
if Selector.DEFAULT is not None:
log.warning("qpid.messaging: process was forked")
Selector.DEFAULT.dead(
SelectorException("qpid.messaging: forked child process used parent connection"), True)
sel = Selector()
sel.start()
Selector.DEFAULT = sel
Selector._current_pid = os.getpid()
return Selector.DEFAULT
finally:
Selector.lock.release()
def __init__(self):
self.selectables = set()
self.reading = set()
self.writing = set()
self.waiter = selectable_waiter()
self.reading.add(self.waiter)
self.stopped = False
self.thread = None
self.exception = None
def wakeup(self):
if self.exception:
log.error(str(self.exception))
raise self.exception
self.waiter.wakeup()
def register(self, selectable):
self.selectables.add(selectable)
self.modify(selectable)
def _update(self, selectable):
if selectable.reading():
self.reading.add(selectable)
else:
self.reading.discard(selectable)
if selectable.writing():
self.writing.add(selectable)
else:
self.writing.discard(selectable)
return selectable.timing()
def modify(self, selectable):
self._update(selectable)
self.wakeup()
def unregister(self, selectable):
self.reading.discard(selectable)
self.writing.discard(selectable)
self.selectables.discard(selectable)
self.wakeup()
def start(self):
self.stopped = False
self.exception = None
self.thread = Thread(target=self.run)
self.thread.setDaemon(True)
self.thread.start();
def run(self):
try:
while not self.stopped and not self.exception:
wakeup = None
for sel in self.selectables.copy():
t = self._update(sel)
if t is not None:
if wakeup is None:
wakeup = t
else:
wakeup = min(wakeup, t)
rd = []
wr = []
ex = []
while True:
try:
if wakeup is None:
timeout = None
else:
timeout = max(0, wakeup - time.time())
rd, wr, ex = select(self.reading, self.writing, (), timeout)
break
except SelectError, e:
# Repeat the select call if we were interrupted.
if e[0] == errno.EINTR:
continue
else:
# unrecoverable: promote to outer try block
raise
for sel in wr:
if sel.writing():
sel.writeable()
for sel in rd:
if sel.reading():
sel.readable()
now = time.time()
for sel in self.selectables.copy():
w = sel.timing()
if w is not None and now > w:
sel.timeout()
except Exception, e:
log.error("qpid.messaging: I/O thread has died: %s\n%s" % (e, format_exc()))
dead(e, False)
raise
self.dead(SelectorException("qpid.messaging: I/O thread exited"), False)
def stop(self, timeout=None):
"""Stop the selector and wait for it's thread to exit.
Ignored for the shared default() selector, which stops when the process exits.
"""
if self.DEFAULT == self: # Never stop the DEFAULT Selector
return
self.stopped = True
self.wakeup()
self.thread.join(timeout)
self.dead(SelectorException("qpid.messaging: I/O thread stopped"), False)
def dead(self, e, forked):
"""Mark the Selector as dead if it is stopped for any reason.
Ensure there any future calls to wait() will raise an exception.
If the thread died because of a fork() then ensure further that
attempting to take the connections lock also raises.
"""
self.thread = None
self.exception = e
for sel in self.selectables.copy():
try:
# Mark the connection as failed
sel.connection.error = e
if forked:
# Replace connection's locks, they may be permanently locked in the forked child.
c = sel.connection
c.error = e
c._lock = BrokenLock(e)
for ssn in c.sessions.values():
ssn._lock = c._lock
for l in ssn.senders + ssn.receivers:
l._lock = c._lock
except:
pass
try:
if forked:
self.waiter.close() # Don't mess with the parent's FDs
else:
self.waiter.wakeup() # In case somebody re-waited while we were cleaning up.
except:
pass
class BrokenLock(object):
"""Dummy lock-like object that raises an exception. Used in forked child to
replace locks that may be held in the parent process."""
def __init__(self, exception):
self.exception = exception
def acquire(self):
log.error(str(self.exception))
raise self.exception