blob: 1435969f487bab0223f8dbfe2f76e16fc87276b2 [file] [log] [blame]
# Copyright (C) 2019-2020 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import os
import re
from typing import List, Dict, Tuple, Iterable, Optional
import grpc
from . import utils
from ._cas import CASRemote, CASCache
from ._exceptions import AssetCacheError, RemoteError
from ._remotespec import RemoteSpec, RemoteType
from ._remote import BaseRemote
from ._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc
from ._protos.google.rpc import code_pb2
class AssetRemote(BaseRemote):
def __init__(self, spec):
super().__init__(spec)
self.fetch_service = None
self.push_service = None
def close(self):
self.fetch_service = None
self.push_service = None
super().close()
def _configure_protocols(self):
# set up remote asset stubs
self.fetch_service = remote_asset_pb2_grpc.FetchStub(self.channel)
self.push_service = remote_asset_pb2_grpc.PushStub(self.channel)
# _check():
#
# Check if this remote provides everything required for the
# particular kind of remote. This is expected to be called as part
# of check()
#
# Raises:
# RemoteError: If the upstream has a problem
#
def _check(self):
request = remote_asset_pb2.FetchBlobRequest()
if self.spec.instance_name:
request.instance_name = self.spec.instance_name
try:
self.fetch_service.FetchBlob(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
# Expected error as the request doesn't specify any URIs.
pass
elif e.code() == grpc.StatusCode.UNIMPLEMENTED:
raise RemoteError(
"Configured remote does not implement the Remote Asset "
"Fetch service. Please check remote configuration."
)
else:
raise RemoteError("Remote initialisation failed with status {}: {}".format(e.code().name, e.details()))
if self.spec.push:
request = remote_asset_pb2.PushBlobRequest()
if self.spec.instance_name:
request.instance_name = self.spec.instance_name
try:
self.push_service.PushBlob(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
# Expected error as the request doesn't specify any URIs.
pass
elif e.code() == grpc.StatusCode.UNIMPLEMENTED:
raise RemoteError(
"Configured remote does not implement the Remote Asset "
"Push service. Please check remote configuration."
)
else:
raise RemoteError(
"Remote initialisation failed with status {}: {}".format(e.code().name, e.details())
)
# fetch_blob():
#
# Resolve URIs to a CAS blob digest.
#
# Args:
# uris (list of str): The URIs to resolve. Multiple URIs should represent
# the same content available at different locations.
# qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
# content to fetch.
#
# Returns
# (FetchBlobResponse): The asset server response or None if the resource
# is not available.
#
# Raises:
# AssetCacheError: If the upstream has a problem
#
def fetch_blob(self, uris, *, qualifiers=None):
request = remote_asset_pb2.FetchBlobRequest()
if self.spec.instance_name:
request.instance_name = self.spec.instance_name
request.uris.extend(uris)
if qualifiers:
request.qualifiers.extend(qualifiers)
try:
response = self.fetch_service.FetchBlob(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
return None
raise AssetCacheError("FetchBlob failed with status {}: {}".format(e.code().name, e.details())) from e
if response.status.code == code_pb2.NOT_FOUND:
return None
if response.status.code != code_pb2.OK:
raise AssetCacheError("FetchBlob failed with response status {}".format(response.status.code))
return response
# fetch_directory():
#
# Resolve URIs to a CAS Directory digest.
#
# Args:
# uris (list of str): The URIs to resolve. Multiple URIs should represent
# the same content available at different locations.
# qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
# content to fetch.
#
# Returns
# (FetchDirectoryResponse): The asset server response or None if the resource
# is not available.
#
# Raises:
# AssetCacheError: If the upstream has a problem
#
def fetch_directory(self, uris, *, qualifiers=None):
request = remote_asset_pb2.FetchDirectoryRequest()
if self.spec.instance_name:
request.instance_name = self.spec.instance_name
request.uris.extend(uris)
if qualifiers:
request.qualifiers.extend(qualifiers)
try:
response = self.fetch_service.FetchDirectory(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
return None
raise AssetCacheError("FetchDirectory failed with status {}: {}".format(e.code().name, e.details())) from e
if response.status.code == code_pb2.NOT_FOUND:
return None
if response.status.code != code_pb2.OK:
raise AssetCacheError("FetchDirectory failed with response status {}".format(response.status.code))
return response
# push_blob():
#
# Associate a CAS blob digest to URIs.
#
# Args:
# uris (list of str): The URIs to associate with the blob digest.
# blob_digest (Digest): The CAS blob to associate.
# qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
# content that is being pushed.
# references_blobs (list of Digest): Referenced blobs that need to not expire
# before expiration of this association.
# references_directories (list of Digest): Referenced directories that need to not expire
# before expiration of this association.
#
# Raises:
# AssetCacheError: If the upstream has a problem
#
def push_blob(self, uris, blob_digest, *, qualifiers=None, references_blobs=None, references_directories=None):
request = remote_asset_pb2.PushBlobRequest()
if self.spec.instance_name:
request.instance_name = self.spec.instance_name
request.uris.extend(uris)
request.blob_digest.CopyFrom(blob_digest)
if qualifiers:
request.qualifiers.extend(qualifiers)
if references_blobs:
request.references_blobs.extend(references_blobs)
if references_directories:
request.references_directories.extend(references_directories)
try:
self.push_service.PushBlob(request)
except grpc.RpcError as e:
raise AssetCacheError("PushBlob failed with status {}: {}".format(e.code().name, e.details())) from e
# push_directory():
#
# Associate a CAS Directory digest to URIs.
#
# Args:
# uris (list of str): The URIs to associate with the blob digest.
# directory_digest (Digest): The CAS Direcdtory to associate.
# qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
# content that is being pushed.
# references_blobs (list of Digest): Referenced blobs that need to not expire
# before expiration of this association.
# references_directories (list of Digest): Referenced directories that need to not expire
# before expiration of this association.
#
# Raises:
# AssetCacheError: If the upstream has a problem
#
def push_directory(
self, uris, directory_digest, *, qualifiers=None, references_blobs=None, references_directories=None
):
request = remote_asset_pb2.PushDirectoryRequest()
if self.spec.instance_name:
request.instance_name = self.spec.instance_name
request.uris.extend(uris)
request.root_directory_digest.CopyFrom(directory_digest)
if qualifiers:
request.qualifiers.extend(qualifiers)
if references_blobs:
request.references_blobs.extend(references_blobs)
if references_directories:
request.references_directories.extend(references_directories)
try:
self.push_service.PushDirectory(request)
except grpc.RpcError as e:
raise AssetCacheError("PushDirectory failed with status {}: {}".format(e.code().name, e.details())) from e
# RemotePair()
#
# A pair of remotes which corresponds to a RemoteSpec, we
# need separate remote objects for the index and the storage so
# we store them together for each RemoteSpec here.
#
# Either members of the RemotePair may be None, in case that
# the user specified a diffrerent RemoteSpec for indexing and
# for storage.
#
# Both members may also be None, in the case that we were unable
# to establish a connection to this remote at initialization time.
#
class RemotePair:
def __init__(self, cas: CASCache, spec: RemoteSpec):
self.index: Optional[AssetRemote] = None
self.storage: Optional[CASRemote] = None
self.error: Optional[str] = None
try:
if spec.remote_type in [RemoteType.INDEX, RemoteType.ALL]:
index = AssetRemote(spec)
index.check()
self.index = index
if spec.remote_type in [RemoteType.STORAGE, RemoteType.ALL]:
storage = CASRemote(spec, cas)
storage.check()
self.storage = storage
except RemoteError as e:
self.error = str(e)
# Base Asset Cache for Caches to derive from
#
class AssetCache:
def __init__(self, context):
self.context = context
self.cas: CASCache = context.get_cascache()
# Table of RemotePair objects
self._remotes: Dict[RemoteSpec, RemotePair] = {}
# Table of prioritized RemoteSpecs which are valid for each project
self._project_specs: Dict[str, List[RemoteSpec]] = {}
self._has_fetch_remotes: bool = False
self._has_push_remotes: bool = False
self._basedir = None
# release_resources():
#
# Release resources used by AssetCache.
#
def release_resources(self):
# Close all remotes and their gRPC channels
for remote in self._remotes.values():
if remote.index:
remote.index.close()
if remote.storage:
remote.storage.close()
# setup_remotes():
#
# Sets up which remotes to use
#
# Args:
# specs: The active remote specs
# project_specs: List of specs for each project
#
def setup_remotes(self, specs: Iterable[RemoteSpec], project_specs: Dict[str, List[RemoteSpec]]):
# Hold on to the project specs
self._project_specs = project_specs
for spec in specs:
# This can be called multiple times, ensure that we only try
# to instantiate each remote once.
#
if spec in self._remotes:
continue
remote = RemotePair(self.cas, spec)
if remote.error:
self.context.messenger.warn("Failed to initialize remote {}: {}".format(spec.url, remote.error))
self._remotes[spec] = remote
# Determine overall existance of push or fetch remotes
self._has_fetch_remotes = any(remote.storage for _, remote in self._remotes.items()) and any(
remote.index for _, remote in self._remotes.items()
)
self._has_push_remotes = any(spec.push and remote.storage for spec, remote in self._remotes.items()) and any(
spec.push and remote.index for spec, remote in self._remotes.items()
)
# get_remotes():
#
# List the index remotes and storage remotes available for fetching
#
# Args:
# project_name: The project name
# push: Whether pushing is required for this remote
#
# Returns:
# index_remotes: The index remotes
# storage_remotes: The storage remotes
#
def get_remotes(self, project_name: str, push: bool) -> Tuple[List[AssetRemote], List[CASRemote]]:
try:
project_specs = self._project_specs[project_name]
except KeyError:
# Technically this shouldn't happen, but here is a defensive return none the less.
return [], []
index_remotes = []
storage_remotes = []
for spec in project_specs:
if push and not spec.push:
continue
remote = self._remotes[spec]
if remote.index:
index_remotes.append(remote.index)
if remote.storage:
storage_remotes.append(remote.storage)
return index_remotes, storage_remotes
# has_fetch_remotes():
#
# Check whether any remote repositories are available for fetching.
#
# Args:
# plugin (Plugin): The Plugin to check
#
# Returns: True if any remote repositories are configured, False otherwise
#
def has_fetch_remotes(self, *, plugin=None):
if not self._has_fetch_remotes:
# No project has fetch remotes
return False
elif plugin is None:
# At least one (sub)project has fetch remotes
return True
else:
project = plugin._get_project()
index_remotes, storage_remotes = self.get_remotes(project.name, False)
# Check whether the specified element's project has fetch remotes
return index_remotes and storage_remotes
# has_push_remotes():
#
# Check whether any remote repositories are available for pushing.
#
# Args:
# element (Element): The Element to check
#
# Returns: True if any remote repository is configured, False otherwise
#
def has_push_remotes(self, *, plugin=None):
if not self._has_push_remotes:
# No project has push remotes
return False
elif plugin is None:
# At least one (sub)project has push remotes
return True
else:
project = plugin._get_project()
index_remotes, storage_remotes = self.get_remotes(project.name, True)
# Check whether the specified element's project has fetch remotes
return bool(index_remotes and storage_remotes)
# list_refs_mtimes()
#
# List refs in a directory, given a base path. Also returns the
# associated mtimes
#
# Args:
# base_path (str): Base path to traverse over
# glob_expr (str|None): Optional glob expression to match against files
#
# Returns:
# (iter (mtime, filename)]): iterator of tuples of mtime and refs
#
def list_refs_mtimes(self, base_path, *, glob_expr=None):
path = base_path
if glob_expr is not None:
globdir = os.path.dirname(glob_expr)
if not any(c in "*?[" for c in globdir):
# path prefix contains no globbing characters so
# append the glob to optimise the os.walk()
path = os.path.join(base_path, globdir)
regexer = None
if glob_expr:
expression = utils._glob2re(glob_expr)
regexer = re.compile(expression)
for root, _, files in os.walk(path):
for filename in files:
ref_path = os.path.join(root, filename)
relative_path = os.path.relpath(ref_path, base_path) # Relative to refs head
if regexer is None or regexer.match(relative_path):
# Obtain the mtime (the time a file was last modified)
yield (os.path.getmtime(ref_path), relative_path)
# remove_ref()
#
# Removes a ref.
#
# This also takes care of pruning away directories which can
# be removed after having removed the given ref.
#
# Args:
# ref (str): The ref to remove
#
# Raises:
# (AssetCacheError): If the ref didnt exist, or a system error
# occurred while removing it
#
def remove_ref(self, ref):
try:
utils._remove_path_with_parents(self._basedir, ref)
except FileNotFoundError as e:
raise AssetCacheError("Could not find ref '{}'".format(ref)) from e
except OSError as e:
raise AssetCacheError("System error while removing ref '{}': {}".format(ref, e)) from e