blob: fdb404b67b6f4bcd81fee0d7f0e4ee84ba664474 [file] [log] [blame]
#
# Copyright (C) 2016-2018 Codethink Limited
# Copyright (C) 2017-2020 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from typing import TYPE_CHECKING, Iterator
from ._context import Context
from .storage._casbaseddirectory import CasBasedDirectory
if TYPE_CHECKING:
from typing import List
from .source import Source
# An ElementSources object represents the combined sources of an element.
class ElementSources:
def __init__(self, context: Context):
self._context = context
self._sources = [] # type: List[Source]
self.vdir = None # Directory with staged sources
self._sourcecache = context.sourcecache # Source cache
self._is_resolved = False # Whether the source is fully resolved or not
self._cached = None # If the sources are known to be successfully cached in CAS
# the index of the last source in this element that requires previous
# sources for staging
self._last_source_requires_previous_idx = None
# add_source():
#
# Append source to this list of element sources.
#
# Args:
# source (Source): The source to add
#
def add_source(self, source):
self._sources.append(source)
# sources():
#
# A generator function to enumerate the element sources
#
# Yields:
# Source: The individual sources
#
def sources(self) -> Iterator["Source"]:
for source in self._sources:
yield source
# track():
#
# Calls track() on the Element sources
#
# Raises:
# SourceError: If one of the element sources has an error
#
# Returns:
# (list): A list of Source object ids and their new references
#
def track(self, workspace):
refs = []
for index, source in enumerate(self._sources):
old_ref = source.get_ref()
new_ref = source._track(self._sources[0:index])
refs.append((source._unique_id, new_ref))
# Complimentary warning that the new ref will be unused.
if old_ref != new_ref and workspace:
detail = (
"This source has an open workspace.\n"
+ "To start using the new reference, please close the existing workspace."
)
source.warn("Updated reference will be ignored as source has open workspace", detail=detail)
return refs
# stage():
#
# Stage the element sources to a directory
#
# Returns:
# (:class:`.storage.Directory`): A virtual directory object to stage sources into.
#
def stage(self):
# Assert sources are cached
assert self.cached()
self.vdir = CasBasedDirectory(self._context.get_cascache())
if self._sources:
# find last required source
last_required_previous_idx = self._last_source_requires_previous()
for source in self._sources[last_required_previous_idx:]:
source_dir = self._sourcecache.export(source)
self.vdir.import_files(source_dir)
return self.vdir
# fetch_done()
#
# Indicates that fetching the sources for this element has been done.
#
# Args:
# fetched_original (bool): Whether the original sources had been asked (and fetched) or not
#
def fetch_done(self, fetched_original):
self._cached = True
for source in self._sources:
source._fetch_done(fetched_original)
# push()
#
# Push the element's sources.
#
# Returns:
# (bool): True if the remote was updated, False if it already existed
# and no updated was required
#
def push(self):
pushed = False
for source in self.sources():
if self._sourcecache.push(source):
pushed = True
return pushed
# init_workspace():
#
# Initialises a new workspace from the element sources.
#
# Args:
# directory (str): Path of the workspace to init
#
def init_workspace(self, directory: str):
for source in self.sources():
source._init_workspace(directory)
# fetch():
#
# Fetch the element sources.
#
# Raises:
# SourceError: If one of the element sources has an error
#
def fetch(self, fetch_original=False):
previous_sources = []
fetch_needed = False
if self._sources and not fetch_original:
for source in self._sources:
if self._sourcecache.contains(source):
continue
# try and fetch from source cache
if not source._is_cached() and self._sourcecache.has_fetch_remotes():
if self._sourcecache.pull(source):
continue
fetch_needed = True
# We need to fetch original sources
if fetch_needed or fetch_original:
for source in self.sources():
if not source._is_cached():
source._fetch(previous_sources)
previous_sources.append(source)
self._cache_sources()
# get_unique_key():
#
# Return something which uniquely identifies the combined sources of the
# element.
#
# Returns:
# (str, list, dict): A string, list or dictionary as unique identifier
#
def get_unique_key(self):
result = []
for source in self._sources:
result.append({"key": source._get_unique_key(), "name": source._get_source_name()})
return result
# cached():
#
# Check if the element sources are cached in CAS, generating the source
# cache keys if needed.
#
# Returns:
# (bool): True if the element sources are cached
#
def cached(self):
if self._cached is not None:
return self._cached
sourcecache = self._sourcecache
# Go through sources we'll cache generating keys
for ix, source in enumerate(self._sources):
if not source._key:
if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
source._generate_key(self._sources[:ix])
else:
source._generate_key([])
# Check all sources are in source cache
for source in self._sources:
if not sourcecache.contains(source):
return False
self._cached = True
return True
# is_resolved():
#
# Get whether all sources of the element are resolved
#
# Returns:
# (bool): True if all element sources are resolved
#
def is_resolved(self):
return self._is_resolved
# cached_original():
#
# Get whether all the sources of the element have their own cached
# copy of their sources.
#
# Returns:
# (bool): True if all element sources have the original sources cached
#
def cached_original(self):
return all(source._is_cached() for source in self._sources)
# update_resolved_state():
#
# Updates source's resolved state
#
# An element's source state must be resolved before it may compute
# cache keys, because the source's ref, whether defined in yaml or
# from the workspace, is a component of the element's cache keys.
#
def update_resolved_state(self):
for source in self._sources:
if not source.is_resolved():
break
else:
self._is_resolved = True
# preflight():
#
# A internal wrapper for calling the abstract preflight() method on
# the element and its sources.
#
def preflight(self):
# Ensure that the first source does not need access to previous sources
if self._sources and self._sources[0]._requires_previous_sources():
from .element import ElementError # pylint: disable=cyclic-import
raise ElementError(
"{}: {} cannot be the first source of an element "
"as it requires access to previous sources".format(self, self._sources[0])
)
# Preflight the sources
for source in self.sources():
source._preflight()
# _cache_sources():
#
# Caches the sources into the local CAS
#
def _cache_sources(self):
if self._sources and not self.cached():
last_requires_previous = 0
# commit all other sources by themselves
for idx, source in enumerate(self._sources):
if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
self._sourcecache.commit(source, self._sources[last_requires_previous:idx])
last_requires_previous = idx
else:
self._sourcecache.commit(source, [])
# _last_source_requires_previous
#
# This is the last source that requires previous sources to be cached.
# Sources listed after this will be cached separately.
#
# Returns:
# (int): index of last source that requires previous sources
#
def _last_source_requires_previous(self):
if self._last_source_requires_previous_idx is None:
last_requires_previous = 0
for idx, source in enumerate(self._sources):
if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
last_requires_previous = idx
self._last_source_requires_previous_idx = last_requires_previous
return self._last_source_requires_previous_idx