blob: b958995373d7bf86a4bdb456d80d042ad293fdc1 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""SdkContainerImageBuilder builds the portable SDK container with dependencies.
It copies the right boot dependencies, namely: apache beam sdk, python packages
from requirements.txt, python packages from extra_packages.txt, workflow
tarball, into the latest public python sdk container image, and run the
dependencies installation in advance with the boot program in setup only mode
to build the new image.
"""
import json
import logging
import os
import posixpath
import shutil
import subprocess
import sys
import tarfile
import tempfile
import time
import uuid
from typing import Type
from google.protobuf.json_format import MessageToJson
from apache_beam import version as beam_version
from apache_beam.internal.gcp.auth import get_service_credentials
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions # pylint: disable=unused-import
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.portability.stager import Stager
from apache_beam.utils import plugin
ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
SDK_CONTAINER_ENTRYPOINT = '/opt/apache/beam/boot'
DOCKERFILE_TEMPLATE = (
"""FROM {base_image}
RUN mkdir -p {workdir}
COPY ./* {workdir}/
RUN {entrypoint} --setup_only --artifacts {workdir}/{manifest_file}
""")
SOURCE_FOLDER = 'source'
_LOGGER = logging.getLogger(__name__)
class SdkContainerImageBuilder(plugin.BeamPlugin):
def __init__(self, options):
self._options = options
self._docker_registry_push_url = self._options.view_as(
SetupOptions).docker_registry_push_url
version = (
beam_version.__version__
if 'dev' not in beam_version.__version__ else 'latest')
self._base_image = (
self._options.view_as(WorkerOptions).sdk_container_image or
'apache/beam_python%s.%s_sdk:%s' %
(sys.version_info[0], sys.version_info[1], version))
self._temp_src_dir = None
def _build(self):
container_image_tag = str(uuid.uuid4())
container_image_name = posixpath.join(
self._docker_registry_push_url or '',
'beam_python_prebuilt_sdk:%s' % container_image_tag)
with tempfile.TemporaryDirectory() as temp_folder:
self._temp_src_dir = temp_folder
self._prepare_dependencies()
self._invoke_docker_build_and_push(container_image_name)
return container_image_name
def _prepare_dependencies(self):
with tempfile.TemporaryDirectory() as tmp:
artifacts = Stager.create_job_resources(
self._options, tmp, log_submission_env_dependencies=False)
resources = Stager.extract_staging_tuple_iter(artifacts)
# make a copy of the staged artifacts into the temp source folder.
file_names = []
for path, name, _ in resources:
shutil.copyfile(path, os.path.join(self._temp_src_dir, name))
file_names.append(name)
with open(os.path.join(self._temp_src_dir, 'Dockerfile'), 'w') as file:
file.write(
DOCKERFILE_TEMPLATE.format(
base_image=self._base_image,
workdir=ARTIFACTS_CONTAINER_DIR,
manifest_file=ARTIFACTS_MANIFEST_FILE,
entrypoint=SDK_CONTAINER_ENTRYPOINT))
self._generate_artifacts_manifests_json_file(
file_names, self._temp_src_dir)
def _invoke_docker_build_and_push(self, container_image_name):
raise NotImplementedError
@classmethod
def _builder_key(cls) -> str:
return f'{cls.__module__}.{cls.__name__}'
@staticmethod
def _generate_artifacts_manifests_json_file(file_names, temp_dir):
infos = []
for name in file_names:
info = beam_runner_api_pb2.ArtifactInformation(
type_urn=common_urns.StandardArtifacts.Types.FILE.urn,
type_payload=beam_runner_api_pb2.ArtifactFilePayload(
path=name).SerializeToString(),
)
infos.append(json.dumps(MessageToJson(info)))
with open(os.path.join(temp_dir, ARTIFACTS_MANIFEST_FILE), 'w') as file:
file.write('[\n' + ',\n'.join(infos) + '\n]')
@classmethod
def build_container_image(cls, pipeline_options: PipelineOptions) -> str:
setup_options = pipeline_options.view_as(SetupOptions)
container_build_engine = setup_options.prebuild_sdk_container_engine
builder_cls = cls._get_subclass_by_key(container_build_engine)
builder = builder_cls(pipeline_options)
return builder._build()
@classmethod
def _get_subclass_by_key(cls, key: str) -> Type['SdkContainerImageBuilder']:
available_builders = [
subclass for subclass in cls.get_all_subclasses()
if subclass._builder_key() == key
]
if not available_builders:
available_builder_keys = [
subclass._builder_key() for subclass in cls.get_all_subclasses()
]
raise ValueError(
f'Cannot find SDK builder type {key} in '
f'{available_builder_keys}')
elif len(available_builders) > 1:
raise ValueError(f'Found multiple builders under key {key}')
return available_builders[0]
class _SdkContainerImageLocalBuilder(SdkContainerImageBuilder):
"""SdkContainerLocalBuilder builds the sdk container image with local
docker."""
@classmethod
def _builder_key(cls):
return 'local_docker'
def _invoke_docker_build_and_push(self, container_image_name):
try:
_LOGGER.info("Building sdk container, this may take a few minutes...")
now = time.time()
subprocess.run(['docker', 'build', '.', '-t', container_image_name],
check=True,
cwd=self._temp_src_dir)
except subprocess.CalledProcessError as err:
raise RuntimeError(
'Failed to build sdk container with local docker, '
'stderr:\n %s.' % err.stderr)
else:
_LOGGER.info(
"Successfully built %s in %.2f seconds" %
(container_image_name, time.time() - now))
if self._docker_registry_push_url:
_LOGGER.info("Pushing prebuilt sdk container...")
try:
subprocess.run(['docker', 'push', container_image_name], check=True)
except subprocess.CalledProcessError as err:
raise RuntimeError(
'Failed to push prebuilt sdk container %s, stderr: \n%s' %
(container_image_name, err.stderr))
_LOGGER.info(
"Successfully pushed %s in %.2f seconds" %
(container_image_name, time.time() - now))
else:
_LOGGER.info(
"no --docker_registry_push_url option is specified in pipeline "
"options, specify it if the new image is intended to be "
"pushed to a registry.")
class _SdkContainerImageCloudBuilder(SdkContainerImageBuilder):
"""SdkContainerLocalBuilder builds the sdk container image with google cloud
build."""
def __init__(self, options):
super().__init__(options)
self._google_cloud_options = options.view_as(GoogleCloudOptions)
self._cloud_build_machine_type = self._get_cloud_build_machine_type_enum(
options.view_as(SetupOptions).cloud_build_machine_type)
if self._google_cloud_options.no_auth:
credentials = None
else:
credentials = get_service_credentials(options)
from apache_beam.io.gcp.gcsio import create_storage_client
self._storage_client = create_storage_client(
options, not self._google_cloud_options.no_auth)
from google.cloud.devtools.cloudbuild_v1.services import cloud_build
self._cloudbuild_client = cloud_build.CloudBuildClient(
credentials=credentials)
if not self._docker_registry_push_url:
self._docker_registry_push_url = (
'gcr.io/%s/prebuilt_beam_sdk' % self._google_cloud_options.project)
@classmethod
def _builder_key(cls):
return 'cloud_build'
def _invoke_docker_build_and_push(self, container_image_name):
from google.cloud.devtools.cloudbuild_v1 import types as cloud_build_types
project_id = self._google_cloud_options.project
temp_location = self._google_cloud_options.temp_location
# google cloud build service expects all the build source file to be
# compressed into a tarball.
tarball_path = os.path.join(self._temp_src_dir, '%s.tgz' % SOURCE_FOLDER)
self._make_tarfile(tarball_path, self._temp_src_dir)
_LOGGER.info(
"Compressed source files for building sdk container at %s" %
tarball_path)
container_image_tag = container_image_name.split(':')[-1]
gcs_location = os.path.join(
temp_location, '%s-%s.tgz' % (SOURCE_FOLDER, container_image_tag))
self._upload_to_gcs(tarball_path, gcs_location)
build = cloud_build_types.Build()
if self._cloud_build_machine_type:
build.options = cloud_build_types.BuildOptions()
build.options.machineType = self._cloud_build_machine_type
build.steps = []
step = cloud_build_types.BuildStep()
step.name = 'quay.io/buildah/stable:latest'
step.entrypoint = 'sh'
step.args = [
'-c',
# The --storage-driver=vfs option is used to run buildah in a
# rootless environment.
(
'buildah bud --storage-driver=vfs -t {0} . && '
'buildah push --storage-driver=vfs {0} docker://{0}'
).format(container_image_name),
]
step.dir = SOURCE_FOLDER
build.steps.append(step)
source = cloud_build_types.Source()
storage_source = cloud_build_types.StorageSource()
gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
storage_source.bucket = os.path.join(gcs_bucket)
storage_source.object_ = gcs_object
source.storage_source = storage_source
build.source = source
# TODO(zyichi): make timeout configurable
build.timeout = '7200s'
now = time.time()
# operation = client.create_build(project_id=project_id, build=build)
request = cloud_build_types.CreateBuildRequest(
project_id=project_id, build=build)
build = self._cloudbuild_client.create_build(request)
build_id, log_url = self._get_cloud_build_id_and_log_url(build.metadata)
_LOGGER.info(
'Building sdk container with Google Cloud Build, this may '
'take a few minutes, you may check build log at %s' % log_url)
# block until build finish, if build fails exception will be raised and
# stops the job submission.
response = self._cloudbuild_client.get_build(
request=cloud_build_types.GetBuildRequest(
id=build_id, project_id=project_id))
while response.status in [cloud_build_types.Build.Status.QUEUED,
cloud_build_types.Build.Status.PENDING,
cloud_build_types.Build.Status.WORKING]:
time.sleep(10)
response = self._cloudbuild_client.get_build(
cloud_build_types.GetBuildRequest(id=build_id, project_id=project_id))
if response.status != cloud_build_types.Build.Status.SUCCESS:
raise RuntimeError(
'Failed to build python sdk container image on google cloud build, '
'please check build log for error.')
_LOGGER.info(
"Python SDK container pre-build finished in %.2f seconds" %
(time.time() - now))
_LOGGER.info(
"Python SDK container built and pushed as %s." % container_image_name)
def _upload_to_gcs(self, local_file_path, gcs_location):
bucket_name, blob_name = self._get_gcs_bucket_and_name(gcs_location)
_LOGGER.info('Starting GCS upload to %s...', gcs_location)
from google.cloud import storage
from google.cloud.exceptions import Forbidden
from google.cloud.exceptions import NotFound
try:
bucket = self._storage_client.get_bucket(bucket_name)
blob = bucket.get_blob(blob_name)
if not blob:
blob = storage.Blob(name=blob_name, bucket=bucket)
blob.upload_from_filename(local_file_path)
except Exception as e:
if isinstance(e, (Forbidden, NotFound)):
raise IOError((
'Could not upload to GCS path %s: %s. Please verify '
'that credentials are valid and that you have write '
'access to the specified path.') % (gcs_location, e.message))
raise
_LOGGER.info('Completed GCS upload to %s.', gcs_location)
def _get_cloud_build_id_and_log_url(self, metadata):
# google-cloud-build 3.35+
if getattr(metadata, 'build', None):
build = metadata.build
return (build.id, build.log_url)
# Fallback for older clients that use additionalProperties.
id = None
log_url = None
additional_props = getattr(metadata, 'additionalProperties', None)
if additional_props:
for item in additional_props:
if item.key == 'build':
for field in item.value.object_value.properties:
if field.key == 'logUrl':
log_url = field.value.string_value
if field.key == 'id':
id = field.value.string_value
return id, log_url
@staticmethod
def _get_gcs_bucket_and_name(gcs_location):
return gcs_location[5:].split('/', 1)
@staticmethod
def _make_tarfile(output_filename, source_dir):
with tarfile.open(output_filename, "w:gz") as tar:
tar.add(source_dir, arcname=SOURCE_FOLDER)
@staticmethod
def _get_cloud_build_machine_type_enum(machine_type: str):
from google.cloud.devtools.cloudbuild_v1 import types as cloud_build_types
if not machine_type:
return None
mappings = {
'n1-highcpu-8': cloud_build_types.BuildOptions.MachineType.N1_HIGHCPU_8,
'n1-highcpu-32': cloud_build_types.BuildOptions.MachineType.
N1_HIGHCPU_32,
'e2-highcpu-8': cloud_build_types.BuildOptions.MachineType.E2_HIGHCPU_8,
'e2-highcpu-32': cloud_build_types.BuildOptions.MachineType.
E2_HIGHCPU_32
}
if machine_type.lower() in mappings:
return mappings[machine_type.lower()]
else:
raise ValueError(
'Unknown Cloud Build Machine Type option, please specify one of '
'[n1-highcpu-8, n1-highcpu-32, e2-highcpu-8, e2-highcpu-32].')