| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from __future__ import absolute_import |
| import sys |
| import errno |
| import time |
| from logging import getLogger |
| log = getLogger("qpid.messaging") |
| |
| try: |
| set = set |
| except NameError: |
| from sets import Set as set |
| |
| try: |
| from socket import SHUT_RDWR |
| except ImportError: |
| SHUT_RDWR = 2 |
| |
| try: |
| from traceback import format_exc |
| except ImportError: |
| import traceback |
| def format_exc(): |
| return "".join(traceback.format_exception(*sys.exc_info())) |
| |
| # QPID-5588: prefer poll() to select(), as it allows file descriptors with |
| # values > FD_SETSIZE |
| import select as _select_mod |
| try: |
| # QPID-5790: unless eventlet/greenthreads have monkey-patched the select |
| # module, as to date poll() is not properly supported by eventlet |
| import eventlet |
| _is_patched = eventlet.patcher.is_monkey_patched("select") |
| except ImportError: |
| _is_patched = False |
| |
| if hasattr(_select_mod, "poll") and not _is_patched: |
| from select import error as SelectError |
| def select(rlist, wlist, xlist, timeout=None): |
| fd_count = 0 |
| rset = set(rlist) |
| wset = set(wlist) |
| xset = set(xlist) |
| if timeout: |
| # select expects seconds, poll milliseconds |
| timeout = float(timeout) * 1000 |
| poller = _select_mod.poll() |
| |
| rwset = rset.intersection(wset) |
| for rw in rwset: |
| poller.register(rw, (_select_mod.POLLIN | _select_mod.POLLOUT)) |
| fd_count += 1 |
| for ro in rset.difference(rwset): |
| poller.register(ro, _select_mod.POLLIN) |
| fd_count += 1 |
| for wo in wset.difference(rwset): |
| poller.register(wo, _select_mod.POLLOUT) |
| fd_count += 1 |
| for x in xset: |
| poller.register(x, _select_mod.POLLPRI) |
| fd_count += 1 |
| |
| # select returns the objects passed in, but poll gives us back only the |
| # integer fds. Maintain a map to get back: |
| fd_map = {} |
| for o in rset | wset | xset: |
| if hasattr(o, "fileno"): |
| fd_map[o.fileno()] = o |
| |
| log.debug("poll(%d fds, timeout=%s)", fd_count, timeout) |
| active = poller.poll(timeout) |
| log.debug("poll() returned %s fds", len(active)) |
| |
| rfds = [] |
| wfds = [] |
| xfds = [] |
| # set the error conditions so we do a read(), which will report the error |
| rflags = (_select_mod.POLLIN | _select_mod.POLLERR | _select_mod.POLLHUP) |
| for fds, flags in active: |
| if fds in fd_map: |
| fds = fd_map[fds] |
| if (flags & rflags): |
| rfds.append(fds) |
| if (flags & _select_mod.POLLOUT): |
| wfds.append(fds) |
| if (flags & _select_mod.POLLPRI): |
| xfds.append(fds) |
| return (rfds, wfds, xfds) |
| else: |
| if tuple(sys.version_info[0:2]) < (2, 4): |
| from select import error as SelectError |
| from select import select as old_select |
| def select(rlist, wlist, xlist, timeout=None): |
| return old_select(list(rlist), list(wlist), list(xlist), timeout) |
| else: |
| from select import select |
| from select import error as SelectError |
| |
| class BaseWaiter: |
| |
| def wakeup(self): |
| self._do_write() |
| |
| def wait(self, timeout=None): |
| start = time.time() |
| if timeout is not None: |
| ready = False |
| while timeout > 0: |
| try: |
| ready, _, _ = select([self], [], [], timeout) |
| break |
| except SelectError as e: |
| if e[0] == errno.EINTR: |
| elapsed = time.time() - start |
| timeout = timeout - elapsed |
| else: |
| raise e |
| else: |
| ready = True |
| |
| if ready: |
| self._do_read() |
| return True |
| else: |
| return False |
| |
| def reading(self): |
| return True |
| |
| def readable(self): |
| self._do_read() |
| |
| if sys.platform in ('win32', 'cygwin'): |
| import socket |
| |
| class SockWaiter(BaseWaiter): |
| |
| def __init__(self, read_sock, write_sock): |
| self.read_sock = read_sock |
| self.write_sock = write_sock |
| |
| def _do_write(self): |
| self.write_sock.send("\0") |
| |
| def _do_read(self): |
| self.read_sock.recv(65536) |
| |
| def fileno(self): |
| return self.read_sock.fileno() |
| |
| def close(self): |
| if self.write_sock is not None: |
| self.write_sock.close() |
| self.write_sock = None |
| self.read_sock.close() |
| self.read_sock = None |
| |
| def __del__(self): |
| self.close() |
| |
| def __repr__(self): |
| return "SockWaiter(%r, %r)" % (self.read_sock, self.write_sock) |
| |
| def selectable_waiter(): |
| listener = socket.socket() |
| listener.bind(('', 0)) |
| listener.listen(1) |
| _, port = listener.getsockname() |
| write_sock = socket.socket() |
| write_sock.connect(("127.0.0.1", port)) |
| read_sock, _ = listener.accept() |
| listener.close() |
| return SockWaiter(read_sock, write_sock) |
| else: |
| import os |
| |
| class PipeWaiter(BaseWaiter): |
| |
| def __init__(self): |
| self.read_fd, self.write_fd = os.pipe() |
| |
| def _do_write(self): |
| os.write(self.write_fd, "\0") |
| |
| def _do_read(self): |
| os.read(self.read_fd, 65536) |
| |
| def fileno(self): |
| return self.read_fd |
| |
| def close(self): |
| if self.write_fd is not None: |
| os.close(self.write_fd) |
| self.write_fd = None |
| os.close(self.read_fd) |
| self.read_fd = None |
| |
| def __del__(self): |
| self.close() |
| |
| def __repr__(self): |
| return "PipeWaiter(%r, %r)" % (self.read_fd, self.write_fd) |
| |
| def selectable_waiter(): |
| return PipeWaiter() |