blob: 27585873c9933a2d6a034251e0dec3146df3db2e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import absolute_import
import errno
import socket
import select
import time
from typing import TYPE_CHECKING, Tuple, List
if TYPE_CHECKING:
from proton._selectable import Selectable
PN_INVALID_SOCKET = -1
class IO(object):
@staticmethod
def _setupsocket(s: socket) -> None:
s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, True)
s.setblocking(False)
@staticmethod
def close(s: socket) -> None:
s.close()
@staticmethod
def listen(host, port):
s = socket.socket()
IO._setupsocket(s)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
s.bind((host, port))
s.listen(10)
return s
@staticmethod
def accept(s: socket):
n = s.accept()
IO._setupsocket(n[0])
return n
@staticmethod
def connect(addr):
s = socket.socket(addr[0], addr[1], addr[2])
IO._setupsocket(s)
try:
s.connect(addr[4])
except socket.error as e:
if e.errno not in (errno.EINPROGRESS, errno.EWOULDBLOCK, errno.EAGAIN):
raise
return s
@staticmethod
def select(*args, **kwargs):
return select.select(*args, **kwargs)
@staticmethod
def sleep(t: float) -> None:
time.sleep(t)
return
class Selector(object):
def __init__(self) -> None:
self._selectables = set()
self._reading = set()
self._writing = set()
self._deadline = None
def add(self, selectable: 'Selectable') -> None:
self._selectables.add(selectable)
if selectable.reading:
self._reading.add(selectable)
if selectable.writing:
self._writing.add(selectable)
if selectable.deadline:
if self._deadline is None:
self._deadline = selectable.deadline
else:
self._deadline = min(selectable.deadline, self._deadline)
def remove(self, selectable: 'Selectable') -> None:
self._selectables.discard(selectable)
self._reading.discard(selectable)
self._writing.discard(selectable)
self.update_deadline()
@property
def selectables(self) -> int:
return len(self._selectables)
def update_deadline(self) -> None:
for sel in self._selectables:
if sel.deadline:
if self._deadline is None:
self._deadline = sel.deadline
else:
self._deadline = min(sel.deadline, self._deadline)
def update(self, selectable: 'Selectable') -> None:
self._reading.discard(selectable)
self._writing.discard(selectable)
if selectable.reading:
self._reading.add(selectable)
if selectable.writing:
self._writing.add(selectable)
self.update_deadline()
def select(self, timeout):
def select_inner(timeout):
# This inner select adds the writing fds to the exception fd set
# because Windows returns connected fds in the exception set not the
# writable set
r = self._reading
w = self._writing
now = time.time()
# No timeout or deadline
if timeout is None and self._deadline is None:
return IO.select(r, w, w)
if timeout is None:
t = max(0, self._deadline - now)
return IO.select(r, w, w, t)
if self._deadline is None:
return IO.select(r, w, w, timeout)
t = max(0, min(timeout, self._deadline - now))
if len(r) == 0 and len(w) == 0:
if t > 0:
IO.sleep(t)
return ([], [], [])
return IO.select(r, w, w, t)
# Need to allow for signals interrupting us on Python 2
# In this case the signal handler could have messed up our internal state
# so don't retry just return with no handles.
try:
r, w, ex = select_inner(timeout)
except select.error as e:
if e.errno != errno.EINTR:
raise
r, w, ex = ([], [], [])
# For windows non blocking connect we get exception not writable so add exceptions to writable
w += ex
# Calculate timed out selectables
now = time.time()
t = [s for s in self._selectables if s.deadline and now > s.deadline]
self._deadline = None
self.update_deadline()
return r, w, t