# Copyright (C) 2017 Codethink Limited
# Copyright (C) 2018 Bloomberg Finance LP
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# Authors:
# Sam Thursfield <>
# Chandan Singh <>
# This plugin was originally developped in the
# repository and was copied from 192aa75a62161b655b051d5b69f17d9233cf4dfe
docker - stage files from Docker images
.. code:: yaml
# Specify the docker source kind
kind: docker
# Specify the registry endpoint, defaults to Docker Hub (optional)
# Image path (required)
image: library/alpine
# Image tag to follow (optional)
track: latest
# Specify the digest of the exact image to use (required)
ref: 6c9f6f68a131ec6381da82f2bff978083ed7f4f7991d931bfa767b7965ebc94b
# Some images are built for multiple platforms. When tracking a tag, we
# will choose which image to use based on these settings. Default values
# are chosen based on the output of `uname -m` and `uname -s`, but you
# can override them.
#architecture: arm64
#os: linux
Note that Docker images may contain device nodes. BuildStream elements cannot
contain device nodes so those will be dropped. Any regular files in the /dev
directory will also be dropped.
See `built-in functionality doumentation
<>`_ for
details on common configuration options for sources.
import hashlib
import json
import os
import platform
import shutil
import tarfile
import urllib.parse
import requests
from buildstream import Source, SourceError
from buildstream.utils import (
def parse_bearer_authorization_challenge(text):
# Hand-written and probably broken parsing of the Www-Authenticate
# response. I can't find a built-in way to parse this, but I probably
# didn't look hard enough.
if not text.startswith("Bearer "):
raise SourceError("Unexpected Www-Authenticate response: %{}".format(text))
pairs = {}
text = text[len("Bearer ") :]
for pair in text.split(","):
key, value = pair.split("=")
pairs[key] = value[1:-1]
return pairs
def default_architecture():
machine = platform.machine()
if machine == "x86_64":
return "amd64"
elif machine == "aarch64":
return "arm64"
return machine
def default_os():
return platform.system().lower()
# Variant of urllib.parse.urljoin() allowing multiple path components at once.
def urljoin(url, *args):
for arg in args:
if not url.endswith("/"):
url += "/"
url = urllib.parse.urljoin(url, arg.lstrip("/"))
return url
# DockerManifestError
# Raised if something goes wrong while querying an image manifest from a remote
# registry.
class DockerManifestError(SourceError):
def __init__(self, message, manifest=None):
self.manifest = manifest
class DockerRegistryV2Client:
def __init__(self, endpoint, api_timeout=3):
self.endpoint = endpoint
self.api_timeout = api_timeout
self.token = None
def _request(self, subpath, extra_headers=None, stream=False, _reauthorized=False):
if not extra_headers:
extra_headers = {}
headers = {"content-type": "application/json"}
if self.token:
headers["Authorization"] = "Bearer {}".format(self.token)
url = urljoin(self.endpoint, "v2", subpath)
response = requests.get(url, headers=headers, stream=stream, timeout=self.api_timeout)
if response.status_code ==["unauthorized"] and not _reauthorized:
# This request requires (re)authorization. See:
auth_challenge = response.headers["Www-Authenticate"]
auth_vars = parse_bearer_authorization_challenge(auth_challenge)
self._auth(auth_vars["realm"], auth_vars["service"], auth_vars["scope"])
return self._request(subpath, extra_headers=extra_headers, _reauthorized=True)
return response
def _auth(self, realm, service, scope):
# Respond to an Www-Authenticate challenge by requesting the necessary
# token from the 'realm' (endpoint) that we were given in the challenge.
request_url = "{}?service={}&scope={}".format(realm, service, scope)
response = requests.get(request_url, timeout=self.api_timeout)
self.token = response.json()["token"]
# digest():
# Calculate a Docker-compatible digest of an arbitrary string of bytes.
# Args:
# content (bytes): Content to hash
# Returns:
# (str) A Docker-compatible digest of 'content'
def digest(content):
digest_hash = hashlib.sha256()
return "sha256:" + digest_hash.hexdigest()
# manifest():
# Fetches the image manifest for a given image from the remote registry.
# If this is a "fat" (multiplatform) image, the 'artitecture' and 'os'
# parameters control which of the available images is chosen.
# The manifest is returned verbatim, so you need to parse it yourself
# with json.loads() to get at its contents. The verbatim text can be used
# to recalculate the content digest, just encode it and pass to .digest().
# If we returned only the parsed JSON data you wouldn't be able to do this.
# Args:
# image_path (str): Relative path to the image, e.g. library/alpine
# reference (str): Either a tag name (such as 'latest') or the content
# digest of an exact version of the image.
# architecture (str): Architecture name (amd64, arm64, etc.)
# os_ (str): OS name (e.g. linux)
# Raises:
# requests.RequestException, if network errors occur
# Returns:
# (str, str): A tuple of the manifest content as text, and its content hash
def manifest(
# pylint: disable=too-many-locals
accept_types = [
manifest_url = urljoin(image_path, "manifests", urllib.parse.quote(reference))
response = self._request(manifest_url, extra_headers={"Accept": ",".join(accept_types)})
manifest = json.loads(response.text)
except json.JSONDecodeError as e:
raise DockerManifestError(
"Server did not return a valid manifest: {}".format(e),
) from e
schema_version = manifest.get("schemaVersion")
if schema_version == 1:
raise DockerManifestError("Schema version 1 is unsupported.", manifest=response.text)
if schema_version is None:
raise DockerManifestError(
"Manifest did not include the schemaVersion key.",
our_digest = self.digest(response.text.encode("utf8"))
their_digest = response.headers.get("Docker-Content-Digest")
if not their_digest:
raise DockerManifestError(
"Server did not set the Docker-Content-Digest header.",
if our_digest != their_digest:
raise DockerManifestError(
"Server returned a non-matching content digest. "
"Our digest: {}, their digest: {}".format(our_digest, their_digest),
if manifest["mediaType"] == "application/vnd.docker.distribution.manifest.list.v2+json":
# This is a "fat manifest", we need to narrow down to a specific
# architecture.
for sub in manifest["manifests"]:
if sub["platform"]["architecture"] == architecture and sub["platform"]["os"]:
sub_digest = sub["digest"]
return self.manifest(
raise DockerManifestError(
"No images found for architecture {}, OS {}".format(architecture, os_),
elif manifest["mediaType"] == "application/vnd.docker.distribution.manifest.v2+json":
return response.text, our_digest
raise DockerManifestError(
"Unsupported manifest type {}".format(manifest["mediaType"]),
# blob():
# Fetch a blob from the remote registry. This is used for getting each
# layer of an image in tar.gz format.
# Raises:
# requests.RequestException, if network errors occur
# Args:
# image_path (str): Relative path to the image, e.g. library/alpine
# blob_digest (str): Content hash of the blob.
# download_to (str): Path to a file where the content will be written.
def blob(self, image_path, blob_digest, download_to):
blob_url = urljoin(image_path, "blobs", urllib.parse.quote(blob_digest))
response = self._request(blob_url, stream=True)
with save_file_atomic(download_to, "wb") as f:
shutil.copyfileobj(response.raw, f)
class ReadableTarInfo(tarfile.TarInfo):
The goal is to override`TarFile`'s `extractall` semantics by ensuring that on extraction, the
files are readable by the owner of the file. This is done by over-riding the accessor for the
mode` attribute in `TarInfo`, class that encapsulates the internal meta-data of the tarball,
so that the owner-read bit is always set.
# The mode attribute is not declared as a property and so
# this trips up the static type checker, mark this as "ignore"
@property # type: ignore
def mode(self):
# ensure file is readable by owner
return self.__permission | 0o400
def mode(self, permission):
self.__permission = permission
class DockerSource(Source):
# pylint: disable=too-many-instance-attributes
# Docker identifies images by a content digest calculated from the image's
# manifest. This corresponds well with the concept of a 'ref' in
# BuildStream. However, Docker theoretically supports multiple hash
# methods while BuildStream does not. Right now every Docker registry
# uses sha256 so let's ignore that issue for the time being.
def _digest_to_ref(digest):
if digest.startswith("sha256:"):
return digest[len("sha256:") :]
method = digest.split(":")[0]
raise SourceError("Unsupported digest method: {}".format(method))
def _ref_to_digest(ref):
return "sha256:" + ref
def configure(self, node):
# url is deprecated, but accept it as a valid key so that we can raise
# a nicer warning.
node.validate_keys(["registry-url", "image", "ref", "track", "url"] + Source.COMMON_CONFIG_KEYS)
if "url" in node:
raise SourceError(
"{}: 'url' parameter is now deprecated, " "use 'registry-url' and 'image' instead.".format(self)
self.image = node.get_str("image")
self.original_registry_url = node.get_str("registry-url", _DOCKER_HUB_URL)
self.registry_url = self.translate_url(self.original_registry_url)
if "ref" in node:
self.digest = self._ref_to_digest(node.get_str("ref"))
self.digest = None
self.tag = node.get_str("track", "") or None
self.architecture = node.get_str("architecture", "") or default_architecture()
self.os = node.get_str("os", "") or default_os()
if not (self.digest or self.tag):
raise SourceError("{}: Must specify either 'ref' or 'track' parameters".format(self))
self.client = DockerRegistryV2Client(self.registry_url)
self.manifest = None
def preflight(self):
def get_unique_key(self):
return [self.original_registry_url, self.image, self.digest]
def get_ref(self):
return None if self.digest is None else self._digest_to_ref(self.digest)
def set_ref(self, ref, node):
node["ref"] = ref
self.digest = self._ref_to_digest(ref)
def track(self):
# pylint: disable=arguments-differ
# If the tracking ref is not specified it's not an error, just silently return
if not self.tag:
return None
with self.timed_activity(
"Fetching image manifest for image: '{}:{}' from: {}".format(self.image, self.tag, self.registry_url)
_, digest = self.client.manifest(self.image, self.tag)
except DockerManifestError as e:
self.log("Problem downloading manifest", detail=e.manifest)
except (OSError, requests.RequestException) as e:
raise SourceError(e) from e
return self._digest_to_ref(digest)
def is_resolved(self):
return self.digest is not None
def is_cached(self):
mirror_dir = self.get_mirror_directory()
manifest = self._load_manifest()
for layer in manifest["layers"]:
layer_digest = layer["digest"]
blob_path = os.path.join(mirror_dir, layer_digest + ".tar.gz")
self._verify_blob(blob_path, expected_digest=layer_digest)
except FileNotFoundError:
# digest fetched, but some layer blob not fetched
return False
return True
except (FileNotFoundError, SourceError):
return False
def _load_manifest(self):
manifest_file = os.path.join(self.get_mirror_directory(), self.digest + ".manifest.json")
with open(manifest_file, "rb") as f:
text =
real_digest = self.client.digest(text)
if real_digest != self.digest:
raise SourceError("Manifest {} is corrupt; got content hash of {}".format(manifest_file, real_digest))
return json.loads(text.decode("utf-8"))
def _save_manifest(self, text, path):
manifest_file = os.path.join(path, self.digest + ".manifest.json")
with save_file_atomic(manifest_file, "wb") as f:
def _verify_blob(path, expected_digest):
blob_digest = "sha256:" + sha256sum(path)
if expected_digest != blob_digest:
raise SourceError("Blob {} is corrupt; got content hash of {}.".format(path, blob_digest))
def fetch(self):
# pylint: disable=arguments-differ
with self.timed_activity(
"Fetching image {}:{} with digest {}".format(self.image, self.tag, self.digest),
with self.tempdir() as tmpdir:
# move all files to a tmpdir
manifest = self._load_manifest()
except FileNotFoundError as e:
manifest_text, digest = self.client.manifest(self.image, self.digest)
except requests.RequestException as ee:
raise SourceError(ee) from ee
if digest != self.digest:
raise SourceError(
"Requested image {}, got manifest with digest {}".format(self.digest, digest)
) from e
self._save_manifest(manifest_text, tmpdir)
manifest = json.loads(manifest_text)
except DockerManifestError as e:
self.log("Unexpected manifest", detail=e.manifest)
except (OSError, requests.RequestException) as e:
raise SourceError(e) from e
for layer in manifest["layers"]:
if layer["mediaType"] != "application/vnd.docker.image.rootfs.diff.tar.gzip":
raise SourceError("Unsupported layer type: {}".format(layer["mediaType"]))
layer_digest = layer["digest"]
blob_path = os.path.join(tmpdir, layer_digest + ".tar.gz")
if not os.path.exists(blob_path):
self.client.blob(self.image, layer_digest, download_to=blob_path)
except (OSError, requests.RequestException) as e:
if os.path.exists(blob_path):
raise SourceError(e) from e
self._verify_blob(blob_path, expected_digest=layer_digest)
# Only if all sources are successfully fetched, move files to staging directory
# As both the manifest and blobs are content addressable, we can optimize space by having
# a flat mirror directory. We check one-by-one if there is any need to copy a file out of the tmpdir.
for fetched_file in os.listdir(tmpdir):
os.path.join(tmpdir, fetched_file),
os.path.join(self.get_mirror_directory(), fetched_file),
def stage(self, directory):
mirror_dir = self.get_mirror_directory()
manifest = self._load_manifest()
except (OSError, SourceError) as e:
raise SourceError("Unable to load manifest: {}".format(e)) from e
for layer in manifest["layers"]:
layer_digest = layer["digest"]
blob_path = os.path.join(mirror_dir, layer_digest + ".tar.gz")
self._verify_blob(blob_path, expected_digest=layer_digest)
) = self._get_extract_and_remove_files(blob_path)
# remove files associated with whiteouts
for white_out_file in white_out_fileset:
white_out_file = os.path.join(directory, white_out_file)
# extract files for the current layer
with, tarinfo=ReadableTarInfo) as tar:
with self.tempdir() as td:
tar.extractall(path=td, members=extract_fileset)
link_files(td, directory)
except (OSError, SourceError, tarfile.TarError) as e:
raise SourceError("{}: Error staging source: {}".format(self, e)) from e
def _get_extract_and_remove_files(layer_tar_path):
"""Return the set of files to remove and extract for a given layer
:param layer_tar_path: The path where a layer has been extracted
:return: Tuple of filesets
- extract_fileset: files to extract into staging directory
- delete_fileset: files to remove from staging directory as the current layer
contains a whiteout corresponding to a staged file in the previous layers
def strip_wh(white_out_file):
"""Strip the prefixing .wh. for given file
:param white_out_file: path of file
:return: path without white-out prefix
# whiteout files have the syntax of `*/.wh.*`
file_name = os.path.basename(white_out_file)
path = os.path.join(os.path.dirname(white_out_file), file_name.split(".wh.")[1])
return path
def is_regular_file(info):
"""Check if file is a non-device file
:param info: tar member metadata
:return: if the file is a non-device file
return not ("dev/") or info.isdev())
with as tar:
extract_fileset = []
delete_fileset = []
for member in tar.getmembers():
if os.path.basename(".wh."):
elif is_regular_file(member):
return extract_fileset, delete_fileset
# Plugin entry point
def setup():
return DockerSource