| # |
| # Copyright (C) 2018 Codethink Limited |
| # Copyright (C) 2018-2019 Bloomberg Finance LP |
| # |
| # This program is free software; you can redistribute it and/or |
| # modify it under the terms of the GNU Lesser General Public |
| # License as published by the Free Software Foundation; either |
| # version 2 of the License, or (at your option) any later version. |
| # |
| # This library is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # Lesser General Public License for more details. |
| # |
| # You should have received a copy of the GNU Lesser General Public |
| # License along with this library. If not, see <http://www.gnu.org/licenses/>. |
| # |
| |
| import contextlib |
| import threading |
| import os |
| import random |
| import shutil |
| import signal |
| import stat |
| import subprocess |
| import tempfile |
| import time |
| import psutil |
| |
| import grpc |
| |
| from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2_grpc |
| from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc |
| from .._protos.build.buildgrid import local_cas_pb2_grpc |
| from .._protos.google.bytestream import bytestream_pb2_grpc |
| |
| from .. import _signals, utils |
| from .._exceptions import CASCacheError |
| |
| _CASD_MAX_LOGFILES = 10 |
| _CASD_TIMEOUT = 300 # in seconds |
| |
| |
| # CASDProcessManager |
| # |
| # This manages the subprocess that runs buildbox-casd. |
| # |
| # Args: |
| # path (str): The root directory for the CAS repository |
| # log_dir (str): The directory for the logs |
| # log_level (LogLevel): Log level to give to buildbox-casd for logging |
| # cache_quota (int): User configured cache quota |
| # protect_session_blobs (bool): Disable expiry for blobs used in the current session |
| # |
| class CASDProcessManager: |
| def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs): |
| self._log_dir = log_dir |
| |
| self._socket_path = self._make_socket_path(path) |
| self._connection_string = "unix:" + self._socket_path |
| |
| casd_args = [utils.get_host_tool("buildbox-casd")] |
| casd_args.append("--bind=" + self._connection_string) |
| casd_args.append("--log-level=" + log_level.value) |
| |
| if cache_quota is not None: |
| casd_args.append("--quota-high={}".format(int(cache_quota))) |
| casd_args.append("--quota-low={}".format(int(cache_quota / 2))) |
| |
| if protect_session_blobs: |
| casd_args.append("--protect-session-blobs") |
| |
| casd_args.append(path) |
| |
| self._start_time = time.time() |
| self._logfile = self._rotate_and_get_next_logfile() |
| |
| with open(self._logfile, "w") as logfile_fp: |
| # Block SIGINT on buildbox-casd, we don't need to stop it |
| # The frontend will take care of it if needed |
| with _signals.blocked([signal.SIGINT], ignore=False): |
| self.process = subprocess.Popen(casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT) |
| |
| # _make_socket_path() |
| # |
| # Create a path to the CASD socket, ensuring that we don't exceed |
| # the socket path limit. |
| # |
| # Note that we *may* exceed the path limit if the python-chosen |
| # tmpdir path is very long, though this should be /tmp. |
| # |
| # Args: |
| # path (str): The root directory for the CAS repository. |
| # |
| # Returns: |
| # (str) - The path to the CASD socket. |
| # |
| def _make_socket_path(self, path): |
| self._socket_tempdir = tempfile.mkdtemp(prefix="buildstream") |
| # mkdtemp will create this directory in the "most secure" |
| # way. This translates to "u+rwx,go-rwx". |
| # |
| # This is a good thing, generally, since it prevents us |
| # from leaking sensitive information to other users, but |
| # it's a problem for the workflow for userchroot, since |
| # the setuid casd binary will not share a uid with the |
| # user creating the tempdir. |
| # |
| # Instead, we chmod the directory 755, and only place a |
| # symlink to the CAS directory in here, which will allow the |
| # CASD process RWX access to a directory without leaking build |
| # information. |
| os.chmod( |
| self._socket_tempdir, |
| stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH, |
| ) |
| |
| os.symlink(path, os.path.join(self._socket_tempdir, "cas")) |
| # FIXME: There is a potential race condition here; if multiple |
| # instances happen to create the same socket path, at least |
| # one will try to talk to the same server as us. |
| # |
| # There's no real way to avoid this from our side; we'd need |
| # buildbox-casd to tell us that it could not create a fresh |
| # socket. |
| # |
| # We could probably make this even safer by including some |
| # thread/process-specific information, but we're not really |
| # supporting this use case anyway; it's mostly here fore |
| # testing, and to help more gracefully handle the situation. |
| # |
| # Note: this uses the same random string generation principle |
| # as cpython, so this is probably a safe file name. |
| random_name = "".join([random.choice("abcdefghijklmnopqrstuvwxyz0123456789_") for _ in range(8)]) |
| socket_name = "casserver-{}.sock".format(random_name) |
| return os.path.join(self._socket_tempdir, "cas", socket_name) |
| |
| # _rotate_and_get_next_logfile() |
| # |
| # Get the logfile to use for casd |
| # |
| # This will ensure that we don't create too many casd log files by |
| # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around. |
| # |
| # Returns: |
| # (str): the path to the log file to use |
| # |
| def _rotate_and_get_next_logfile(self): |
| try: |
| existing_logs = sorted(os.listdir(self._log_dir)) |
| except FileNotFoundError: |
| os.makedirs(self._log_dir) |
| else: |
| while len(existing_logs) >= _CASD_MAX_LOGFILES: |
| logfile_to_delete = existing_logs.pop(0) |
| os.remove(os.path.join(self._log_dir, logfile_to_delete)) |
| |
| return os.path.join(self._log_dir, str(self._start_time) + ".log") |
| |
| # release_resources() |
| # |
| # Terminate the process and release related resources. |
| # |
| def release_resources(self, messenger=None): |
| self._terminate(messenger) |
| self.process = None |
| shutil.rmtree(self._socket_tempdir) |
| |
| # _terminate() |
| # |
| # Terminate the buildbox casd process. |
| # |
| def _terminate(self, messenger=None): |
| return_code = self.process.poll() |
| |
| if return_code is not None: |
| # buildbox-casd is already dead |
| |
| if messenger: |
| messenger.bug( |
| "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(return_code, self._logfile) |
| ) |
| return |
| |
| self.process.terminate() |
| |
| try: |
| # Don't print anything if buildbox-casd terminates quickly |
| return_code = self.process.wait(timeout=0.5) |
| except subprocess.TimeoutExpired: |
| if messenger: |
| cm = messenger.timed_activity("Terminating buildbox-casd") |
| else: |
| cm = contextlib.suppress() |
| with cm: |
| try: |
| return_code = self.process.wait(timeout=15) |
| except subprocess.TimeoutExpired: |
| self.process.kill() |
| self.process.wait(timeout=15) |
| |
| if messenger: |
| messenger.warn("Buildbox-casd didn't exit in time and has been killed") |
| return |
| |
| if return_code != 0 and messenger: |
| messenger.bug( |
| "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(return_code, self._logfile) |
| ) |
| |
| # create_channel(): |
| # |
| # Return a CASDChannel, note that the actual connection is not necessarily |
| # established until it is needed. |
| # |
| def create_channel(self): |
| return CASDChannel(self._socket_path, self._connection_string, self._start_time, self.process.pid) |
| |
| |
| class CASDChannel: |
| def __init__(self, socket_path, connection_string, start_time, casd_pid): |
| self._socket_path = socket_path |
| self._connection_string = connection_string |
| self._start_time = start_time |
| self._casd_channel = None |
| self._bytestream = None |
| self._casd_cas = None |
| self._local_cas = None |
| self._asset_fetch = None |
| self._asset_push = None |
| self._casd_pid = casd_pid |
| self._shutdown_requested = False |
| |
| self._lock = threading.Lock() |
| |
| def _establish_connection(self): |
| with self._lock: |
| if self._casd_channel is not None: |
| return |
| |
| while not os.path.exists(self._socket_path): |
| # casd is not ready yet, try again after a 10ms delay, |
| # but don't wait for more than specified timeout period |
| if time.time() > self._start_time + _CASD_TIMEOUT: |
| raise CASCacheError("Timed out waiting for buildbox-casd to become ready") |
| |
| if self._shutdown_requested: |
| # Shutdown has been requested, we can exit |
| return |
| |
| # check that process is still alive |
| try: |
| proc = psutil.Process(self._casd_pid) |
| if proc.status() == psutil.STATUS_ZOMBIE: |
| proc.wait() |
| |
| if not proc.is_running(): |
| if self._shutdown_requested: |
| return |
| raise CASCacheError("buildbox-casd process died before connection could be established") |
| except psutil.NoSuchProcess: |
| if self._shutdown_requested: |
| return |
| raise CASCacheError("buildbox-casd process died before connection could be established") |
| |
| time.sleep(0.01) |
| |
| self._casd_channel = grpc.insecure_channel(self._connection_string) |
| self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel) |
| self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel) |
| self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) |
| self._asset_fetch = remote_asset_pb2_grpc.FetchStub(self._casd_channel) |
| self._asset_push = remote_asset_pb2_grpc.PushStub(self._casd_channel) |
| |
| # get_cas(): |
| # |
| # Return ContentAddressableStorage stub for buildbox-casd channel. |
| # |
| def get_cas(self): |
| if self._casd_channel is None: |
| self._establish_connection() |
| return self._casd_cas |
| |
| # get_local_cas(): |
| # |
| # Return LocalCAS stub for buildbox-casd channel. |
| # |
| def get_local_cas(self): |
| if self._local_cas is None: |
| self._establish_connection() |
| return self._local_cas |
| |
| def get_bytestream(self): |
| if self._bytestream is None: |
| self._establish_connection() |
| return self._bytestream |
| |
| # get_asset_fetch(): |
| # |
| # Return Remote Asset Fetch stub for buildbox-casd channel. |
| # |
| def get_asset_fetch(self): |
| if self._casd_channel is None: |
| self._establish_connection() |
| return self._asset_fetch |
| |
| # get_asset_push(): |
| # |
| # Return Remote Asset Push stub for buildbox-casd channel. |
| # |
| def get_asset_push(self): |
| if self._casd_channel is None: |
| self._establish_connection() |
| return self._asset_push |
| |
| # is_closed(): |
| # |
| # Return whether this connection is closed or not. |
| # |
| def is_closed(self): |
| return self._casd_channel is None |
| |
| # request_shutdown(): |
| # |
| # Notify the channel that a shutdown of casd was requested. |
| # |
| # Thus we know that not being able to establish a connection is expected |
| # and no error will be reported in that case. |
| def request_shutdown(self) -> None: |
| self._shutdown_requested = True |
| |
| # close(): |
| # |
| # Close the casd channel. |
| # |
| def close(self): |
| assert self._shutdown_requested, "Please request shutdown before closing" |
| |
| with self._lock: |
| if self.is_closed(): |
| return |
| |
| self._asset_push = None |
| self._asset_fetch = None |
| self._local_cas = None |
| self._casd_cas = None |
| self._bytestream = None |
| self._casd_channel.close() |
| self._casd_channel = None |