blob: 2284d0d92c06485ac0a4e722886e7ef8947adc2f [file] [log] [blame]
#
# Copyright (C) 2019-2020 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import os
import grpc
from ._cas.casremote import BlobNotFound
from .storage._casbaseddirectory import CasBasedDirectory
from ._assetcache import AssetCache
from ._exceptions import CASError, CASRemoteError, SourceCacheError
from . import utils
from ._protos.buildstream.v2 import source_pb2
REMOTE_ASSET_SOURCE_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:source:{}"
# Class that keeps config of remotes and deals with caching of sources.
#
# Args:
# context (Context): The Buildstream context
#
class SourceCache(AssetCache):
def __init__(self, context):
super().__init__(context)
self._basedir = os.path.join(context.cachedir, "source_protos")
os.makedirs(self._basedir, exist_ok=True)
# contains()
#
# Given a source, gets the ref name and checks whether the local CAS
# contains it.
#
# Args:
# source (Source): Source to check
#
# Returns:
# (bool): whether the CAS contains this source or not
#
def contains(self, source):
ref = source._get_source_name()
path = self._source_path(ref)
if not os.path.exists(path):
return False
# check files
source_proto = self._get_source(ref)
return self.cas.contains_directory(source_proto.files, with_files=True)
# commit()
#
# Given a source, it stages and commits it to the local CAS.
#
# Args:
# source: last source
def commit(self, source):
ref = source._get_source_name()
vdir = CasBasedDirectory(self.cas)
if not source.BST_STAGE_VIRTUAL_DIRECTORY:
with utils._tempdir(dir=self.context.tmpdir, prefix="staging-temp") as tmpdir:
source._stage(tmpdir)
vdir.import_files(tmpdir, can_link=True)
else:
source._stage(vdir)
self._store_source(ref, vdir._get_digest())
# export()
#
# Exports a source in the CAS to a virtual directory
#
# Args:
# source (Source): source we want to export
#
# Returns:
# CASBasedDirectory
def export(self, source):
ref = source._get_source_name()
source = self._get_source(ref)
return CasBasedDirectory(self.cas, digest=source.files)
# pull()
#
# Attempts to pull sources from configure remote source caches.
#
# Args:
# source (Source): The source we want to fetch
# progress (callable|None): The progress callback
#
# Returns:
# (bool): True if pull successful, False if not
def pull(self, source):
ref = source._get_source_name()
project = source._get_project()
display_key = source._get_brief_display_key()
index_remotes, storage_remotes = self.get_remotes(project.name, False)
# First fetch the source directory digest so we know what to pull
source_digest = None
for remote in index_remotes:
try:
remote.init()
source.status("Pulling source {} <- {}".format(display_key, remote))
source_digest = self._pull_source(ref, remote)
if source_digest is None:
source.info(
"Remote source service ({}) does not have source {} cached".format(remote, display_key)
)
continue
except CASError as e:
raise SourceCacheError("Failed to pull source {}: {}".format(display_key, e)) from e
if not source_digest:
return False
for remote in storage_remotes:
try:
remote.init()
source.status("Pulling data for source {} <- {}".format(display_key, remote))
# Fetch source blobs
self.cas._fetch_directory(remote, source_digest)
source.info("Pulled source {} <- {}".format(display_key, remote))
return True
except BlobNotFound as e:
# Not all blobs are available on this remote
source.info("Remote cas ({}) does not have blob {} cached".format(remote, e.blob))
continue
except CASError as e:
raise SourceCacheError("Failed to pull source {}: {}".format(display_key, e)) from e
return False
# push()
#
# Push a source to configured remote source caches
#
# Args:
# source (Source): source to push
#
# Returns:
# (Bool): whether it pushed to a remote source cache
#
def push(self, source):
ref = source._get_source_name()
project = source._get_project()
index_remotes, storage_remotes = self.get_remotes(project.name, True)
pushed_storage = False
pushed_index = False
display_key = source._get_brief_display_key()
for remote in storage_remotes:
remote.init()
source.status("Pushing data for source {} -> {}".format(display_key, remote))
source_proto = self._get_source(ref)
try:
self.cas._send_directory(remote, source_proto.files)
pushed_storage = True
except CASRemoteError:
source.info("Failed to push source files {} -> {}".format(display_key, remote))
continue
for remote in index_remotes:
remote.init()
source.status("Pushing source {} -> {}".format(display_key, remote))
# check whether cache has files already
if self._pull_source(ref, remote) is not None:
source.info("Remote ({}) already has source {} cached".format(remote, display_key))
continue
if not self._push_source(ref, remote):
source.info("Failed to push source metadata {} -> {}".format(display_key, remote))
continue
source.info("Pushed source {} -> {}".format(display_key, remote))
pushed_index = True
return pushed_index and pushed_storage
def _store_source(self, ref, digest):
source_proto = source_pb2.Source()
source_proto.files.CopyFrom(digest)
self._store_proto(source_proto, ref)
def _store_proto(self, proto, ref):
path = self._source_path(ref)
os.makedirs(os.path.dirname(path), exist_ok=True)
with utils.save_file_atomic(path, "w+b") as f:
f.write(proto.SerializeToString())
def _get_source(self, ref):
path = self._source_path(ref)
source_proto = source_pb2.Source()
try:
with open(path, "r+b") as f:
source_proto.ParseFromString(f.read())
return source_proto
except FileNotFoundError as e:
raise SourceCacheError("Attempted to access unavailable source: {}".format(e)) from e
def _source_path(self, ref):
return os.path.join(self._basedir, ref)
def _pull_source(self, source_ref, remote):
uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref)
try:
remote.init()
response = remote.fetch_directory([uri])
if not response:
return None
self._store_source(source_ref, response.root_directory_digest)
return response.root_directory_digest
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise SourceCacheError("Failed to pull source with status {}: {}".format(e.code().name, e.details()))
return None
def _push_source(self, source_ref, remote):
uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref)
try:
remote.init()
source_proto = self._get_source(source_ref)
remote.push_directory([uri], source_proto.files)
return True
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details()))
return False