| from __future__ import annotations |
| |
| import select |
| import socket |
| from functools import partial |
| |
| __all__ = ["wait_for_read", "wait_for_write"] |
| |
| |
| # How should we wait on sockets? |
| # |
| # There are two types of APIs you can use for waiting on sockets: the fancy |
| # modern stateful APIs like epoll/kqueue, and the older stateless APIs like |
| # select/poll. The stateful APIs are more efficient when you have a lots of |
| # sockets to keep track of, because you can set them up once and then use them |
| # lots of times. But we only ever want to wait on a single socket at a time |
| # and don't want to keep track of state, so the stateless APIs are actually |
| # more efficient. So we want to use select() or poll(). |
| # |
| # Now, how do we choose between select() and poll()? On traditional Unixes, |
| # select() has a strange calling convention that makes it slow, or fail |
| # altogether, for high-numbered file descriptors. The point of poll() is to fix |
| # that, so on Unixes, we prefer poll(). |
| # |
| # On Windows, there is no poll() (or at least Python doesn't provide a wrapper |
| # for it), but that's OK, because on Windows, select() doesn't have this |
| # strange calling convention; plain select() works fine. |
| # |
| # So: on Windows we use select(), and everywhere else we use poll(). We also |
| # fall back to select() in case poll() is somehow broken or missing. |
| |
| |
| def select_wait_for_socket( |
| sock: socket.socket, |
| read: bool = False, |
| write: bool = False, |
| timeout: float | None = None, |
| ) -> bool: |
| if not read and not write: |
| raise RuntimeError("must specify at least one of read=True, write=True") |
| rcheck = [] |
| wcheck = [] |
| if read: |
| rcheck.append(sock) |
| if write: |
| wcheck.append(sock) |
| # When doing a non-blocking connect, most systems signal success by |
| # marking the socket writable. Windows, though, signals success by marked |
| # it as "exceptional". We paper over the difference by checking the write |
| # sockets for both conditions. (The stdlib selectors module does the same |
| # thing.) |
| fn = partial(select.select, rcheck, wcheck, wcheck) |
| rready, wready, xready = fn(timeout) |
| return bool(rready or wready or xready) |
| |
| |
| def poll_wait_for_socket( |
| sock: socket.socket, |
| read: bool = False, |
| write: bool = False, |
| timeout: float | None = None, |
| ) -> bool: |
| if not read and not write: |
| raise RuntimeError("must specify at least one of read=True, write=True") |
| mask = 0 |
| if read: |
| mask |= select.POLLIN |
| if write: |
| mask |= select.POLLOUT |
| poll_obj = select.poll() |
| poll_obj.register(sock, mask) |
| |
| # For some reason, poll() takes timeout in milliseconds |
| def do_poll(t: float | None) -> list[tuple[int, int]]: |
| if t is not None: |
| t *= 1000 |
| return poll_obj.poll(t) |
| |
| return bool(do_poll(timeout)) |
| |
| |
| def _have_working_poll() -> bool: |
| # Apparently some systems have a select.poll that fails as soon as you try |
| # to use it, either due to strange configuration or broken monkeypatching |
| # from libraries like eventlet/greenlet. |
| try: |
| poll_obj = select.poll() |
| poll_obj.poll(0) |
| except (AttributeError, OSError): |
| return False |
| else: |
| return True |
| |
| |
| def wait_for_socket( |
| sock: socket.socket, |
| read: bool = False, |
| write: bool = False, |
| timeout: float | None = None, |
| ) -> bool: |
| # We delay choosing which implementation to use until the first time we're |
| # called. We could do it at import time, but then we might make the wrong |
| # decision if someone goes wild with monkeypatching select.poll after |
| # we're imported. |
| global wait_for_socket |
| if _have_working_poll(): |
| wait_for_socket = poll_wait_for_socket |
| elif hasattr(select, "select"): |
| wait_for_socket = select_wait_for_socket |
| return wait_for_socket(sock, read, write, timeout) |
| |
| |
| def wait_for_read(sock: socket.socket, timeout: float | None = None) -> bool: |
| """Waits for reading to be available on a given socket. |
| Returns True if the socket is readable, or False if the timeout expired. |
| """ |
| return wait_for_socket(sock, read=True, timeout=timeout) |
| |
| |
| def wait_for_write(sock: socket.socket, timeout: float | None = None) -> bool: |
| """Waits for writing to be available on a given socket. |
| Returns True if the socket is readable, or False if the timeout expired. |
| """ |
| return wait_for_socket(sock, write=True, timeout=timeout) |