| # dockerpty: io.py |
| # |
| # Copyright 2014 Chris Corbyn <chris@w3style.co.uk> |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import os |
| import fcntl |
| import errno |
| import struct |
| import select as builtin_select |
| import six |
| |
| |
| def set_blocking(fd, blocking=True): |
| """ |
| Set the given file-descriptor blocking or non-blocking. |
| |
| Returns the original blocking status. |
| """ |
| |
| old_flag = fcntl.fcntl(fd, fcntl.F_GETFL) |
| |
| if blocking: |
| new_flag = old_flag & ~ os.O_NONBLOCK |
| else: |
| new_flag = old_flag | os.O_NONBLOCK |
| |
| fcntl.fcntl(fd, fcntl.F_SETFL, new_flag) |
| |
| return not bool(old_flag & os.O_NONBLOCK) |
| |
| |
| def select(read_streams, write_streams, timeout=0): |
| """ |
| Select the streams from `read_streams` that are ready for reading, and |
| streams from `write_streams` ready for writing. |
| |
| Uses `select.select()` internally but only returns two lists of ready streams. |
| """ |
| |
| exception_streams = [] |
| |
| try: |
| return builtin_select.select( |
| read_streams, |
| write_streams, |
| exception_streams, |
| timeout, |
| )[0:2] |
| except builtin_select.error as e: |
| # POSIX signals interrupt select() |
| no = e.errno if six.PY3 else e[0] |
| if no == errno.EINTR: |
| return ([], []) |
| else: |
| raise e |
| |
| |
| class Stream(object): |
| """ |
| Generic Stream class. |
| |
| This is a file-like abstraction on top of os.read() and os.write(), which |
| add consistency to the reading of sockets and files alike. |
| """ |
| |
| """ |
| Recoverable IO/OS Errors. |
| """ |
| ERRNO_RECOVERABLE = [ |
| errno.EINTR, |
| errno.EDEADLK, |
| errno.EWOULDBLOCK, |
| ] |
| |
| def __init__(self, fd): |
| """ |
| Initialize the Stream for the file descriptor `fd`. |
| |
| The `fd` object must have a `fileno()` method. |
| """ |
| self.fd = fd |
| self.buffer = b'' |
| self.close_requested = False |
| self.closed = False |
| |
| def fileno(self): |
| """ |
| Return the fileno() of the file descriptor. |
| """ |
| |
| return self.fd.fileno() |
| |
| def set_blocking(self, value): |
| if hasattr(self.fd, 'setblocking'): |
| self.fd.setblocking(value) |
| return True |
| else: |
| return set_blocking(self.fd, value) |
| |
| def read(self, n=4096): |
| """ |
| Return `n` bytes of data from the Stream, or None at end of stream. |
| """ |
| |
| while True: |
| try: |
| if hasattr(self.fd, 'recv'): |
| return self.fd.recv(n) |
| return os.read(self.fd.fileno(), n) |
| except EnvironmentError as e: |
| if e.errno not in Stream.ERRNO_RECOVERABLE: |
| raise e |
| |
| |
| def write(self, data): |
| """ |
| Write `data` to the Stream. Not all data may be written right away. |
| Use select to find when the stream is writeable, and call do_write() |
| to flush the internal buffer. |
| """ |
| |
| if not data: |
| return None |
| |
| self.buffer += data |
| self.do_write() |
| |
| return len(data) |
| |
| def do_write(self): |
| """ |
| Flushes as much pending data from the internal write buffer as possible. |
| """ |
| while True: |
| try: |
| written = 0 |
| |
| if hasattr(self.fd, 'send'): |
| written = self.fd.send(self.buffer) |
| else: |
| written = os.write(self.fd.fileno(), self.buffer) |
| |
| self.buffer = self.buffer[written:] |
| |
| # try to close after writes if a close was requested |
| if self.close_requested and len(self.buffer) == 0: |
| self.close() |
| |
| return written |
| except EnvironmentError as e: |
| if e.errno not in Stream.ERRNO_RECOVERABLE: |
| raise e |
| |
| def needs_write(self): |
| """ |
| Returns True if the stream has data waiting to be written. |
| """ |
| return len(self.buffer) > 0 |
| |
| def close(self): |
| self.close_requested = True |
| |
| # We don't close the fd immediately, as there may still be data pending |
| # to write. |
| if not self.closed and len(self.buffer) == 0: |
| self.closed = True |
| if hasattr(self.fd, 'close'): |
| self.fd.close() |
| else: |
| os.close(self.fd.fileno()) |
| |
| def __repr__(self): |
| return "{cls}({fd})".format(cls=type(self).__name__, fd=self.fd) |
| |
| |
| class Demuxer(object): |
| """ |
| Wraps a multiplexed Stream to read in data demultiplexed. |
| |
| Docker multiplexes streams together when there is no PTY attached, by |
| sending an 8-byte header, followed by a chunk of data. |
| |
| The first 4 bytes of the header denote the stream from which the data came |
| (i.e. 0x01 = stdout, 0x02 = stderr). Only the first byte of these initial 4 |
| bytes is used. |
| |
| The next 4 bytes indicate the length of the following chunk of data as an |
| integer in big endian format. This much data must be consumed before the |
| next 8-byte header is read. |
| """ |
| |
| def __init__(self, stream): |
| """ |
| Initialize a new Demuxer reading from `stream`. |
| """ |
| |
| self.stream = stream |
| self.remain = 0 |
| |
| def fileno(self): |
| """ |
| Returns the fileno() of the underlying Stream. |
| |
| This is useful for select() to work. |
| """ |
| |
| return self.stream.fileno() |
| |
| def set_blocking(self, value): |
| return self.stream.set_blocking(value) |
| |
| def read(self, n=4096): |
| """ |
| Read up to `n` bytes of data from the Stream, after demuxing. |
| |
| Less than `n` bytes of data may be returned depending on the available |
| payload, but the number of bytes returned will never exceed `n`. |
| |
| Because demuxing involves scanning 8-byte headers, the actual amount of |
| data read from the underlying stream may be greater than `n`. |
| """ |
| |
| size = self._next_packet_size(n) |
| |
| if size <= 0: |
| return |
| else: |
| data = six.binary_type() |
| while len(data) < size: |
| nxt = self.stream.read(size - len(data)) |
| if not nxt: |
| # the stream has closed, return what data we got |
| return data |
| data = data + nxt |
| return data |
| |
| def write(self, data): |
| """ |
| Delegates the the underlying Stream. |
| """ |
| |
| return self.stream.write(data) |
| |
| def needs_write(self): |
| """ |
| Delegates to underlying Stream. |
| """ |
| |
| if hasattr(self.stream, 'needs_write'): |
| return self.stream.needs_write() |
| |
| return False |
| |
| def do_write(self): |
| """ |
| Delegates to underlying Stream. |
| """ |
| |
| if hasattr(self.stream, 'do_write'): |
| return self.stream.do_write() |
| |
| return False |
| |
| def close(self): |
| """ |
| Delegates to underlying Stream. |
| """ |
| |
| return self.stream.close() |
| |
| def _next_packet_size(self, n=0): |
| size = 0 |
| |
| if self.remain > 0: |
| size = min(n, self.remain) |
| self.remain -= size |
| else: |
| data = six.binary_type() |
| while len(data) < 8: |
| nxt = self.stream.read(8 - len(data)) |
| if not nxt: |
| # The stream has closed, there's nothing more to read |
| return 0 |
| data = data + nxt |
| |
| if data is None: |
| return 0 |
| if len(data) == 8: |
| __, actual = struct.unpack('>BxxxL', data) |
| size = min(n, actual) |
| self.remain = actual - size |
| |
| return size |
| |
| def __repr__(self): |
| return "{cls}({stream})".format(cls=type(self).__name__, |
| stream=self.stream) |
| |
| |
| class Pump(object): |
| """ |
| Stream pump class. |
| |
| A Pump wraps two Streams, reading from one and and writing its data into |
| the other, much like a pipe but manually managed. |
| |
| This abstraction is used to facilitate piping data between the file |
| descriptors associated with the tty and those associated with a container's |
| allocated pty. |
| |
| Pumps are selectable based on the 'read' end of the pipe. |
| """ |
| |
| def __init__(self, |
| from_stream, |
| to_stream, |
| wait_for_output=True, |
| propagate_close=True): |
| """ |
| Initialize a Pump with a Stream to read from and another to write to. |
| |
| `wait_for_output` is a flag that says that we need to wait for EOF |
| on the from_stream in order to consider this pump as "done". |
| """ |
| |
| self.from_stream = from_stream |
| self.to_stream = to_stream |
| self.eof = False |
| self.wait_for_output = wait_for_output |
| self.propagate_close = propagate_close |
| |
| def fileno(self): |
| """ |
| Returns the `fileno()` of the reader end of the Pump. |
| |
| This is useful to allow Pumps to function with `select()`. |
| """ |
| |
| return self.from_stream.fileno() |
| |
| def set_blocking(self, value): |
| return self.from_stream.set_blocking(value) |
| |
| def flush(self, n=4096): |
| """ |
| Flush `n` bytes of data from the reader Stream to the writer Stream. |
| |
| Returns the number of bytes that were actually flushed. A return value |
| of zero is not an error. |
| |
| If EOF has been reached, `None` is returned. |
| """ |
| |
| try: |
| read = self.from_stream.read(n) |
| |
| if read is None or len(read) == 0: |
| self.eof = True |
| if self.propagate_close: |
| self.to_stream.close() |
| return None |
| |
| return self.to_stream.write(read) |
| except OSError as e: |
| if e.errno != errno.EPIPE: |
| raise e |
| |
| def is_done(self): |
| """ |
| Returns True if the read stream is done (either it's returned EOF or |
| the pump doesn't have wait_for_output set), and the write |
| side does not have pending bytes to send. |
| """ |
| |
| return (not self.wait_for_output or self.eof) and \ |
| not (hasattr(self.to_stream, 'needs_write') and self.to_stream.needs_write()) |
| |
| def __repr__(self): |
| return "{cls}(from={from_stream}, to={to_stream})".format( |
| cls=type(self).__name__, |
| from_stream=self.from_stream, |
| to_stream=self.to_stream) |