Merge pull request #1814 from apache/abderrahim/blocking-activity
plugin: don't try to pickle the exceptions thrown by blocking_activity
diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py
index d36da67..2e261cb 100644
--- a/src/buildstream/downloadablefilesource.py
+++ b/src/buildstream/downloadablefilesource.py
@@ -97,22 +97,36 @@
if etag is not None:
request.add_header("If-None-Match", etag)
- with contextlib.closing(opener.open(request)) as response:
- info = response.info()
+ try:
+ with contextlib.closing(opener.open(request)) as response:
+ info = response.info()
- # some servers don't honor the 'If-None-Match' header
- if etag and info["ETag"] == etag:
- return None, None
+ # some servers don't honor the 'If-None-Match' header
+ if etag and info["ETag"] == etag:
+ return None, None, None
- etag = info["ETag"]
+ etag = info["ETag"]
- filename = info.get_filename(default_name)
- filename = os.path.basename(filename)
- local_file = os.path.join(directory, filename)
- with open(local_file, "wb") as dest:
- shutil.copyfileobj(response, dest)
+ filename = info.get_filename(default_name)
+ filename = os.path.basename(filename)
+ local_file = os.path.join(directory, filename)
+ with open(local_file, "wb") as dest:
+ shutil.copyfileobj(response, dest)
- return local_file, etag
+ except urllib.error.HTTPError as e:
+ if e.code == 304:
+ # 304 Not Modified.
+ # Because we use etag only for matching ref, currently specified ref is what
+ # we would have downloaded.
+ return None, None, None
+
+ return None, None, str(e)
+ except (urllib.error.URLError, urllib.error.ContentTooShortError, OSError, ValueError) as e:
+ # Note that urllib.request.Request in the try block may throw a
+ # ValueError for unknown url types, so we handle it here.
+ return None, None, str(e)
+
+ return local_file, etag, None
class DownloadableFileSource(Source):
@@ -206,50 +220,39 @@
def _ensure_mirror(self, activity_name: str):
# Downloads from the url and caches it according to its sha256sum.
- try:
- with self.tempdir() as td:
- # We do not use etag in case what we have in cache is
- # not matching ref in order to be able to recover from
- # corrupted download.
- if self.ref and not self.is_cached():
- # Do not re-download the file if the ETag matches.
- etag = self._get_etag(self.ref)
- else:
- etag = None
+ with self.tempdir() as td:
+ # We do not use etag in case what we have in cache is
+ # not matching ref in order to be able to recover from
+ # corrupted download.
+ if self.ref and not self.is_cached():
+ # Do not re-download the file if the ETag matches.
+ etag = self._get_etag(self.ref)
+ else:
+ etag = None
- local_file, new_etag = self.blocking_activity(
- _download_file, (self.__get_urlopener(), self.url, etag, td), activity_name
- )
+ local_file, new_etag, error = self.blocking_activity(
+ _download_file, (self.__get_urlopener(), self.url, etag, td), activity_name
+ )
- if local_file is None:
- return self.ref
+ if error:
+ raise SourceError("{}: Error mirroring {}: {}".format(self, self.url, error), temporary=True)
- # Make sure url-specific mirror dir exists.
- if not os.path.isdir(self._mirror_dir):
- os.makedirs(self._mirror_dir)
-
- # Store by sha256sum
- sha256 = utils.sha256sum(local_file)
- # Even if the file already exists, move the new file over.
- # In case the old file was corrupted somehow.
- os.rename(local_file, self._get_mirror_file(sha256))
-
- if new_etag:
- self._store_etag(sha256, new_etag)
- return sha256
-
- except urllib.error.HTTPError as e:
- if e.code == 304:
- # 304 Not Modified.
- # Because we use etag only for matching ref, currently specified ref is what
- # we would have downloaded.
+ if local_file is None:
return self.ref
- raise SourceError("{}: Error mirroring {}: {}".format(self, self.url, e), temporary=True) from e
- except (urllib.error.URLError, urllib.error.ContentTooShortError, OSError, ValueError) as e:
- # Note that urllib.request.Request in the try block may throw a
- # ValueError for unknown url types, so we handle it here.
- raise SourceError("{}: Error mirroring {}: {}".format(self, self.url, e), temporary=True) from e
+ # Make sure url-specific mirror dir exists.
+ if not os.path.isdir(self._mirror_dir):
+ os.makedirs(self._mirror_dir)
+
+ # Store by sha256sum
+ sha256 = utils.sha256sum(local_file)
+ # Even if the file already exists, move the new file over.
+ # In case the old file was corrupted somehow.
+ os.rename(local_file, self._get_mirror_file(sha256))
+
+ if new_etag:
+ self._store_etag(sha256, new_etag)
+ return sha256
def _get_mirror_file(self, sha=None):
if sha is not None:
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index e1e7b1f..14e35b9 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -123,7 +123,6 @@
import multiprocessing.popen_forkserver # type: ignore
import os
-import pickle
import queue
import signal
import subprocess
@@ -184,13 +183,8 @@
try:
result = target(*args)
result_queue.put((None, result))
- except Exception as exc: # pylint: disable=broad-except
- try:
- # Here we send the result again, just in case it was a PickleError
- # in which case the same exception would be thrown down
- result_queue.put((exc, result))
- except pickle.PickleError:
- result_queue.put((traceback.format_exc(), None))
+ except Exception: # pylint: disable=broad-except
+ result_queue.put((traceback.format_exc(), None))
class Plugin:
@@ -606,7 +600,8 @@
in order to avoid starving the scheduler.
The function, its arguments and return value must all be pickleable,
- as it will be run in another process.
+ as it will be run in another process. The function should not raise
+ an exception.
This should be used whenever there is a potential for a blocking
syscall to not return in a reasonable (<1s) amount of time.
@@ -676,12 +671,7 @@
raise PluginError("Background process didn't exit after 15 seconds and got killed.")
if err is not None:
- if isinstance(err, str):
- # This was a pickle error, this is a bug
- raise PluginError(
- "An error happened while returning the result from a blocking activity", detail=err
- )
- raise err
+ raise PluginError("An error happened while running a blocking activity", detail=err)
return result