| """Inventory utility functions for Sphinx.""" |
| from __future__ import annotations |
| |
| import os |
| import re |
| import zlib |
| from typing import IO, TYPE_CHECKING, Callable |
| |
| from sphinx.util import logging |
| |
| BUFSIZE = 16 * 1024 |
| logger = logging.getLogger(__name__) |
| |
| if TYPE_CHECKING: |
| from collections.abc import Iterator |
| |
| from sphinx.builders import Builder |
| from sphinx.environment import BuildEnvironment |
| from sphinx.util.typing import Inventory, InventoryItem |
| |
| |
| class InventoryFileReader: |
| """A file reader for an inventory file. |
| |
| This reader supports mixture of texts and compressed texts. |
| """ |
| |
| def __init__(self, stream: IO) -> None: |
| self.stream = stream |
| self.buffer = b'' |
| self.eof = False |
| |
| def read_buffer(self) -> None: |
| chunk = self.stream.read(BUFSIZE) |
| if chunk == b'': |
| self.eof = True |
| self.buffer += chunk |
| |
| def readline(self) -> str: |
| pos = self.buffer.find(b'\n') |
| if pos != -1: |
| line = self.buffer[:pos].decode() |
| self.buffer = self.buffer[pos + 1:] |
| elif self.eof: |
| line = self.buffer.decode() |
| self.buffer = b'' |
| else: |
| self.read_buffer() |
| line = self.readline() |
| |
| return line |
| |
| def readlines(self) -> Iterator[str]: |
| while not self.eof: |
| line = self.readline() |
| if line: |
| yield line |
| |
| def read_compressed_chunks(self) -> Iterator[bytes]: |
| decompressor = zlib.decompressobj() |
| while not self.eof: |
| self.read_buffer() |
| yield decompressor.decompress(self.buffer) |
| self.buffer = b'' |
| yield decompressor.flush() |
| |
| def read_compressed_lines(self) -> Iterator[str]: |
| buf = b'' |
| for chunk in self.read_compressed_chunks(): |
| buf += chunk |
| pos = buf.find(b'\n') |
| while pos != -1: |
| yield buf[:pos].decode() |
| buf = buf[pos + 1:] |
| pos = buf.find(b'\n') |
| |
| |
| class InventoryFile: |
| @classmethod |
| def load(cls, stream: IO, uri: str, joinfunc: Callable) -> Inventory: |
| reader = InventoryFileReader(stream) |
| line = reader.readline().rstrip() |
| if line == '# Sphinx inventory version 1': |
| return cls.load_v1(reader, uri, joinfunc) |
| elif line == '# Sphinx inventory version 2': |
| return cls.load_v2(reader, uri, joinfunc) |
| else: |
| raise ValueError('invalid inventory header: %s' % line) |
| |
| @classmethod |
| def load_v1(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory: |
| invdata: Inventory = {} |
| projname = stream.readline().rstrip()[11:] |
| version = stream.readline().rstrip()[11:] |
| for line in stream.readlines(): |
| name, type, location = line.rstrip().split(None, 2) |
| location = join(uri, location) |
| # version 1 did not add anchors to the location |
| if type == 'mod': |
| type = 'py:module' |
| location += '#module-' + name |
| else: |
| type = 'py:' + type |
| location += '#' + name |
| invdata.setdefault(type, {})[name] = (projname, version, location, '-') |
| return invdata |
| |
| @classmethod |
| def load_v2(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory: |
| invdata: Inventory = {} |
| projname = stream.readline().rstrip()[11:] |
| version = stream.readline().rstrip()[11:] |
| line = stream.readline() |
| if 'zlib' not in line: |
| raise ValueError('invalid inventory header (not compressed): %s' % line) |
| |
| for line in stream.read_compressed_lines(): |
| # be careful to handle names with embedded spaces correctly |
| m = re.match(r'(.+?)\s+(\S+)\s+(-?\d+)\s+?(\S*)\s+(.*)', |
| line.rstrip(), flags=re.VERBOSE) |
| if not m: |
| continue |
| name, type, prio, location, dispname = m.groups() |
| if ':' not in type: |
| # wrong type value. type should be in the form of "{domain}:{objtype}" |
| # |
| # Note: To avoid the regex DoS, this is implemented in python (refs: #8175) |
| continue |
| if type == 'py:module' and type in invdata and name in invdata[type]: |
| # due to a bug in 1.1 and below, |
| # two inventory entries are created |
| # for Python modules, and the first |
| # one is correct |
| continue |
| if location.endswith('$'): |
| location = location[:-1] + name |
| location = join(uri, location) |
| inv_item: InventoryItem = projname, version, location, dispname |
| invdata.setdefault(type, {})[name] = inv_item |
| return invdata |
| |
| @classmethod |
| def dump(cls, filename: str, env: BuildEnvironment, builder: Builder) -> None: |
| def escape(string: str) -> str: |
| return re.sub("\\s+", " ", string) |
| |
| with open(os.path.join(filename), 'wb') as f: |
| # header |
| f.write(('# Sphinx inventory version 2\n' |
| '# Project: %s\n' |
| '# Version: %s\n' |
| '# The remainder of this file is compressed using zlib.\n' % |
| (escape(env.config.project), |
| escape(env.config.version))).encode()) |
| |
| # body |
| compressor = zlib.compressobj(9) |
| for domainname, domain in sorted(env.domains.items()): |
| for name, dispname, typ, docname, anchor, prio in \ |
| sorted(domain.get_objects()): |
| if anchor.endswith(name): |
| # this can shorten the inventory by as much as 25% |
| anchor = anchor[:-len(name)] + '$' |
| uri = builder.get_target_uri(docname) |
| if anchor: |
| uri += '#' + anchor |
| if dispname == name: |
| dispname = '-' |
| entry = ('%s %s:%s %s %s %s\n' % |
| (name, domainname, typ, prio, uri, dispname)) |
| f.write(compressor.compress(entry.encode())) |
| f.write(compressor.flush()) |