Make elements keep track of their built artifact size
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 4aa7ec5..3541f24 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -78,6 +78,9 @@
     def __init__(self, context):
         self.context = context
         self.extractdir = os.path.join(context.artifactdir, 'extract')
+        self.max_size = context.cache_quota
+        self.estimated_size = None
+
         self.global_remote_specs = []
         self.project_remote_specs = {}
 
@@ -179,6 +182,35 @@
                                   (str(provenance)))
         return cache_specs
 
+    # get_approximate_cache_size()
+    #
+    # A cheap method that aims to serve as an upper limit on the
+    # artifact cache size.
+    #
+    # The cache size reported by this function will normally be larger
+    # than the real cache size, since it is calculated using the
+    # pre-commit artifact size, but for very small artifacts in
+    # certain caches additional overhead could cause this to be
+    # smaller than, but close to, the actual size.
+    #
+    # Nonetheless, in practice this should be safe to use as an upper
+    # limit on the cache size.
+    #
+    # If the cache has built-in constant-time size reporting, please
+    # feel free to override this method with a more accurate
+    # implementation.
+    #
+    # Returns:
+    #     (int) An approximation of the artifact cache size.
+    #
+    def get_approximate_cache_size(self):
+        # If we don't currently have an estimate, figure out the real
+        # cache size.
+        if self.estimated_size is None:
+            self.estimated_size = self.calculate_cache_size()
+
+        return self.estimated_size
+
     ################################################
     # Abstract methods for subclasses to implement #
     ################################################
@@ -328,6 +360,20 @@
         raise ImplError("Cache '{kind}' does not implement link_key()"
                         .format(kind=type(self).__name__))
 
+    # calculate_cache_size()
+    #
+    # Return the real artifact cache size.
+    #
+    # Implementations should also use this to update estimated_size.
+    #
+    # Returns:
+    #
+    # (int) The size of the artifact cache.
+    #
+    def calculate_cache_size(self):
+        raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
+                        .format(kind=type(self).__name__))
+
     ################################################
     #               Local Private Methods          #
     ################################################
@@ -369,6 +415,30 @@
         with self.context.timed_activity("Initializing remote caches", silent_nested=True):
             self.initialize_remotes(on_failure=remote_failed)
 
+    # _add_artifact_size()
+    #
+    # Since we cannot keep track of the cache size between threads,
+    # this method will be called by the main process every time a
+    # process that added something to the cache finishes.
+    #
+    # This will then add the reported size to
+    # ArtifactCache.estimated_size.
+    #
+    def _add_artifact_size(self, artifact_size):
+        if not self.estimated_size:
+            self.estimated_size = self.calculate_cache_size()
+
+        self.estimated_size += artifact_size
+
+    # _set_cache_size()
+    #
+    # Similarly to the above method, when we calculate the actual size
+    # in a child thread, we can't update it. We instead pass the value
+    # back to the main thread and update it there.
+    #
+    def _set_cache_size(self, cache_size):
+        self.estimated_size = cache_size
+
 
 # _configured_remote_artifact_cache_specs():
 #
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
index e5a0eca..605b2d1 100644
--- a/buildstream/_scheduler/jobs/elementjob.py
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -210,8 +210,13 @@
         data = {}
 
         workspace = self._element._get_workspace()
+        artifact_size = self._element._get_artifact_size()
+        cache_size = self._element._get_artifact_cache().calculate_cache_size()
 
         if workspace is not None:
             data['workspace'] = workspace.to_dict()
+        if artifact_size is not None:
+            data['artifact_size'] = artifact_size
+        data['cache_size'] = cache_size
 
         return data
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 8ca3ac0..ac20d37 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -300,6 +300,8 @@
         # Update values that need to be synchronized in the main task
         # before calling any queue implementation
         self._update_workspaces(element, job)
+        if job.child_data:
+            element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
 
         # Give the result of the job to the Queue implementor,
         # and determine if it should be considered as processed
diff --git a/buildstream/element.py b/buildstream/element.py
index 0f0bf49..140c824 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -225,6 +225,7 @@
         self.__staged_sources_directory = None  # Location where Element.stage_sources() was called
         self.__tainted = None                   # Whether the artifact is tainted and should not be shared
         self.__required = False                 # Whether the artifact is required in the current session
+        self.__artifact_size = None             # The size of data committed to the artifact cache
 
         # hash tables of loaded artifact metadata, hashed by key
         self.__metadata_keys = {}                     # Strong and weak keys for this key
@@ -1397,6 +1398,16 @@
             workspace.clear_running_files()
             self._get_context().get_workspaces().save_config()
 
+            # We also need to update the required artifacts, since
+            # workspaced dependencies do not have a fixed cache key
+            # when the build starts.
+            #
+            # This does *not* cause a race condition, because
+            # _assemble_done is called before a cleanup job may be
+            # launched.
+            #
+            self.__artifacts.append_required_artifacts([self])
+
     # _assemble():
     #
     # Internal method for running the entire build phase.
@@ -1524,6 +1535,7 @@
                 }), os.path.join(metadir, 'workspaced-dependencies.yaml'))
 
                 with self.timed_activity("Caching artifact"):
+                    self.__artifact_size = utils._get_dir_size(assembledir)
                     self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
 
             # Finally cleanup the build dir
@@ -1763,6 +1775,25 @@
         workspaces = self._get_context().get_workspaces()
         return workspaces.get_workspace(self._get_full_name())
 
+    # _get_artifact_size()
+    #
+    # Get the size of the artifact produced by this element in the
+    # current pipeline - if this element has not been assembled or
+    # pulled, this will be None.
+    #
+    # Note that this is the size of an artifact *before* committing it
+    # to the cache, the size on disk may differ. It can act as an
+    # approximate guide for when to do a proper size calculation.
+    #
+    # Returns:
+    #    (int|None): The size of the artifact
+    #
+    def _get_artifact_size(self):
+        return self.__artifact_size
+
+    def _get_artifact_cache(self):
+        return self.__artifacts
+
     # _write_script():
     #
     # Writes a script to the given directory.