plugin.py: Harden the Plugin.call() and Plugin.check_output() APIs
These APIs were passing along *args and *kwargs transparently to the
subprocess.Popen() API, which holds us hostage to supporting the underlying
python APIs.
Instead, we now define a strongly typed API with access to limited
keyword arguments which we propagate to the underlying python API.
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 4008734..6a8b221 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -133,7 +133,7 @@
import sys
import traceback
from contextlib import contextmanager, suppress
-from typing import Any, Callable, Generator, Optional, Sequence, Tuple, TypeVar, TYPE_CHECKING
+from typing import IO, Any, Callable, Generator, Optional, Sequence, Dict, Tuple, TypeVar, Union, TYPE_CHECKING
from weakref import WeakValueDictionary
from . import utils, _signals
@@ -152,6 +152,22 @@
T1 = TypeVar("T1")
+#
+# Some types used for subprocess.Popen() wrappers
+#
+# This recipe was gleaned from observing:
+# https://github.com/python/typeshed/blob/master/stdlib/_typeshed/__init__.pyi
+# https://github.com/python/typeshed/blob/master/stdlib/subprocess.pyi
+#
+_FILE = Union[None, int, IO[Any]]
+_TXT = Union[bytes, str]
+_STR_BYTES_PATH = Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"]
+if sys.version_info >= (3, 8):
+ _CMD = Union[_STR_BYTES_PATH, Sequence[_STR_BYTES_PATH]]
+else:
+ # Python 3.6 doesn't support _CMD being a single PathLike.
+ # See: https://bugs.python.org/issue31961
+ _CMD = Union[_TXT, Sequence[_STR_BYTES_PATH]]
# _background_job_wrapper()
#
@@ -664,16 +680,28 @@
return result
- def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int:
+ def call(
+ self,
+ args: _CMD,
+ fail: Optional[str] = None,
+ fail_temporarily: bool = False,
+ cwd: Optional[_STR_BYTES_PATH] = None,
+ env: Optional[Dict[str, str]] = None,
+ stdin: Optional[_FILE] = None,
+ stdout: Optional[_FILE] = None,
+ stderr: Optional[_FILE] = None,
+ ) -> int:
"""A wrapper for subprocess.call()
Args:
- popenargs (list): Popen() arguments
- fail: A message to display if the process returns
- a non zero exit code
- fail_temporarily: Whether any exceptions should
- be raised as temporary.
- rest_of_args (kwargs): Remaining arguments to subprocess.call()
+ args: A sequence of program arguments or else a string or `path-like-object <https://docs.python.org/3/glossary.html#term-path-like-object>`_
+ fail: A message to display if the process returns a non zero exit code
+ fail_temporarily: Whether any exceptions should be raised as temporary.
+ cwd: Optionally specify the working directory for the command
+ env: Optionally specify some environment variables for the command
+ stdin: Optionally specify standard input for the command
+ stdout: Optionally specify standard output for the command
+ stderr: Optionally specify standard error for the command
Returns:
The process exit code.
@@ -681,8 +709,15 @@
Raises:
(:class:`.PluginError`): If a non-zero return code is received and *fail* is specified
- Note: If *fail* is not specified, then the return value of subprocess.call()
- is returned even on error, and no exception is automatically raised.
+ .. attention::
+
+ If *fail* is not specified, then the return value of subprocess.call()
+ is returned even on error, and no exception is automatically raised.
+
+ .. attention::
+
+ If *stderr* and/or *stdout* are specified, then these streams are delivered
+ directly to the plugin instead of being delivered to the associated log file.
**Example**
@@ -695,19 +730,38 @@
"Failed to download ponies from {}".format(
self.mirror_directory))
"""
- exit_code, _ = self.__call(*popenargs, fail=fail, fail_temporarily=fail_temporarily, **kwargs)
+ exit_code, _ = self.__call(
+ args,
+ fail=fail,
+ fail_temporarily=fail_temporarily,
+ cwd=cwd,
+ env=env,
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr,
+ )
return exit_code
- def check_output(self, *popenargs, fail=None, fail_temporarily=False, **kwargs) -> Tuple[int, str]:
+ def check_output(
+ self,
+ args: _CMD,
+ fail: Optional[str] = None,
+ fail_temporarily: bool = False,
+ cwd: Optional[_STR_BYTES_PATH] = None,
+ env: Optional[Dict[str, str]] = None,
+ stdin: Optional[_FILE] = None,
+ stderr: Optional[_FILE] = None,
+ ) -> Tuple[int, str]:
"""A wrapper for subprocess.check_output()
Args:
- popenargs (list): Popen() arguments
- fail (str): A message to display if the process returns
- a non zero exit code
- fail_temporarily (bool): Whether any exceptions should
- be raised as temporary.
- rest_of_args (kwargs): Remaining arguments to subprocess.call()
+ args: A sequence of program arguments or else a string or `path-like-object <https://docs.python.org/3/glossary.html#term-path-like-object>`_
+ fail: A message to display if the process returns a non zero exit code
+ fail_temporarily: Whether any exceptions should be raised as temporary.
+ cwd: Optionally specify the working directory for the command
+ env: Optionally specify some environment variables for the command
+ stdin: Optionally specify standard input for the command
+ stderr: Optionally specify standard error for the command
Returns:
A 2-tuple of form (process exit code, process standard output)
@@ -715,8 +769,15 @@
Raises:
(:class:`.PluginError`): If a non-zero return code is received and *fail* is specified
- Note: If *fail* is not specified, then the return value of subprocess.check_output()
- is returned even on error, and no exception is automatically raised.
+ .. attention::
+
+ If *fail* is not specified, then the return value of subprocess.call()
+ is returned even on error, and no exception is automatically raised.
+
+ .. attention::
+
+ If *stderr* and/or *stdout* are specified, then these streams are delivered
+ directly to the plugin instead of being delivered to the associated log file.
**Example**
@@ -744,7 +805,16 @@
raise SourceError(
fmt.format(plugin=self, track=tracking)) from e
"""
- return self.__call(*popenargs, collect_stdout=True, fail=fail, fail_temporarily=fail_temporarily, **kwargs)
+ return self.__call(
+ args,
+ collect_stdout=True,
+ fail=fail,
+ fail_temporarily=fail_temporarily,
+ cwd=cwd,
+ env=env,
+ stdin=stdin,
+ stderr=stderr,
+ )
#############################################################
# Private Methods used in BuildStream #
@@ -860,19 +930,29 @@
# Internal subprocess implementation for the call() and check_output() APIs
#
- def __call(self, *popenargs, collect_stdout=False, fail=None, fail_temporarily=False, **kwargs):
-
+ def __call(
+ self,
+ args: _CMD,
+ fail: Optional[str] = None,
+ fail_temporarily: bool = False,
+ collect_stdout: bool = False,
+ cwd: Optional[_STR_BYTES_PATH] = None,
+ env: Optional[Dict[str, str]] = None,
+ stdin: Optional[_FILE] = None,
+ stdout: Optional[_FILE] = None,
+ stderr: Optional[_FILE] = None,
+ ):
with self._output_file() as output_file:
- if "stdout" not in kwargs:
- kwargs["stdout"] = output_file
- if "stderr" not in kwargs:
- kwargs["stderr"] = output_file
+ if stdout is None:
+ stdout = output_file
+ if stderr is None:
+ stderr = output_file
if collect_stdout:
- kwargs["stdout"] = subprocess.PIPE
+ stdout = subprocess.PIPE
- self.__note_command(output_file, *popenargs, **kwargs)
+ self.__note_command(output_file, args, cwd)
- exit_code, output = utils._call(*popenargs, **kwargs)
+ exit_code, output = utils._call(args, cwd=cwd, env=env, stdin=stdin, stdout=stdout, stderr=stderr)
if fail and exit_code:
raise PluginError("{plugin}: {message}".format(plugin=self, message=fail), temporary=fail_temporarily)
@@ -899,9 +979,18 @@
message = Message(message_type, brief, **plugin_kwargs)
self.__context.messenger.message(message)
- def __note_command(self, output, *popenargs, **kwargs):
- workdir = kwargs.get("cwd", os.getcwd())
- command = " ".join(popenargs[0])
+ # __note_command():
+ #
+ # Make a note in the logs that we are running a process on the host.
+ #
+ # Args:
+ # output: The output logging file
+ # args: The command with args which we're about to run
+ # cwd: The host side working directory to run the command, or None
+ #
+ def __note_command(self, output, args, cwd):
+ workdir = cwd or os.getcwd()
+ command = " ".join(args)
output.write("Running host command {}: {}\n".format(workdir, command))
output.flush()
self.status("Running host command", detail=command)