blob: 026562980db8bbaa2d784a4a59cf597b18fe8e7e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
""" For internal use only. No backwards compatibility guarantees.
Dataflow client utility functions."""
# pytype: skip-file
import codecs
import getpass
import io
import json
import logging
import os
import pkg_resources
import re
import sys
import time
import warnings
from copy import copy
from datetime import datetime
from apitools.base.py import encoding
from apitools.base.py import exceptions
from apache_beam import version as beam_version
from apache_beam.internal.gcp.auth import get_service_credentials
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.internal.http_client import get_new_http
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
from apache_beam.io.gcp.internal.clients import storage
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow
from apache_beam.runners.dataflow.internal.names import PropertyNames
from apache_beam.runners.internal import names as shared_names
from apache_beam.runners.portability.stager import Stager
from apache_beam.transforms import DataflowDistributionCounter
from apache_beam.transforms import cy_combiners
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.environments import is_apache_beam_container
from apache_beam.utils import retry
from apache_beam.utils import proto_utils
# Environment version information. It is passed to the service during a
# a job submission and is used by the service to establish what features
# are expected by the workers.
_LEGACY_ENVIRONMENT_MAJOR_VERSION = '8'
_FNAPI_ENVIRONMENT_MAJOR_VERSION = '8'
_LOGGER = logging.getLogger(__name__)
_PYTHON_VERSIONS_SUPPORTED_BY_DATAFLOW = ['3.6', '3.7', '3.8']
class Step(object):
"""Wrapper for a dataflow Step protobuf."""
def __init__(self, step_kind, step_name, additional_properties=None):
self.step_kind = step_kind
self.step_name = step_name
self.proto = dataflow.Step(kind=step_kind, name=step_name)
self.proto.properties = {}
self._additional_properties = []
if additional_properties is not None:
for (n, v, t) in additional_properties:
self.add_property(n, v, t)
def add_property(self, name, value, with_type=False):
self._additional_properties.append((name, value, with_type))
self.proto.properties.additionalProperties.append(
dataflow.Step.PropertiesValue.AdditionalProperty(
key=name, value=to_json_value(value, with_type=with_type)))
def _get_outputs(self):
"""Returns a list of all output labels for a step."""
outputs = []
for p in self.proto.properties.additionalProperties:
if p.key == PropertyNames.OUTPUT_INFO:
for entry in p.value.array_value.entries:
for entry_prop in entry.object_value.properties:
if entry_prop.key == PropertyNames.OUTPUT_NAME:
outputs.append(entry_prop.value.string_value)
return outputs
def __reduce__(self):
"""Reduce hook for pickling the Step class more easily."""
return (Step, (self.step_kind, self.step_name, self._additional_properties))
def get_output(self, tag=None):
"""Returns name if it is one of the outputs or first output if name is None.
Args:
tag: tag of the output as a string or None if we want to get the
name of the first output.
Returns:
The name of the output associated with the tag or the first output
if tag was None.
Raises:
ValueError: if the tag does not exist within outputs.
"""
outputs = self._get_outputs()
if tag is None or len(outputs) == 1:
return outputs[0]
else:
if tag not in outputs:
raise ValueError('Cannot find named output: %s in %s.' % (tag, outputs))
return tag
class Environment(object):
"""Wrapper for a dataflow Environment protobuf."""
def __init__(
self,
packages,
options,
environment_version,
proto_pipeline_staged_url,
proto_pipeline=None,
_sdk_image_overrides=None):
self.standard_options = options.view_as(StandardOptions)
self.google_cloud_options = options.view_as(GoogleCloudOptions)
self.worker_options = options.view_as(WorkerOptions)
self.debug_options = options.view_as(DebugOptions)
self.pipeline_url = proto_pipeline_staged_url
self.proto = dataflow.Environment()
self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE
self.proto.dataset = '{}/cloud_dataflow'.format(
GoogleCloudOptions.BIGQUERY_API_SERVICE)
self.proto.tempStoragePrefix = (
self.google_cloud_options.temp_location.replace(
'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE))
if self.worker_options.worker_region:
self.proto.workerRegion = self.worker_options.worker_region
if self.worker_options.worker_zone:
self.proto.workerZone = self.worker_options.worker_zone
# User agent information.
self.proto.userAgent = dataflow.Environment.UserAgentValue()
self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint
self._proto_pipeline = proto_pipeline
self._sdk_image_overrides = _sdk_image_overrides or dict()
if self.google_cloud_options.service_account_email:
self.proto.serviceAccountEmail = (
self.google_cloud_options.service_account_email)
if self.google_cloud_options.dataflow_kms_key:
self.proto.serviceKmsKeyName = self.google_cloud_options.dataflow_kms_key
self.proto.userAgent.additionalProperties.extend([
dataflow.Environment.UserAgentValue.AdditionalProperty(
key='name', value=to_json_value(self._get_python_sdk_name())),
dataflow.Environment.UserAgentValue.AdditionalProperty(
key='version', value=to_json_value(beam_version.__version__))
])
# Version information.
self.proto.version = dataflow.Environment.VersionValue()
_verify_interpreter_version_is_supported(options)
if self.standard_options.streaming:
job_type = 'FNAPI_STREAMING'
else:
if _use_fnapi(options):
job_type = 'FNAPI_BATCH'
else:
job_type = 'PYTHON_BATCH'
self.proto.version.additionalProperties.extend([
dataflow.Environment.VersionValue.AdditionalProperty(
key='job_type', value=to_json_value(job_type)),
dataflow.Environment.VersionValue.AdditionalProperty(
key='major', value=to_json_value(environment_version))
])
# TODO: Use enumerated type instead of strings for job types.
if job_type.startswith('FNAPI_'):
self.debug_options.experiments = self.debug_options.experiments or []
if self.debug_options.lookup_experiment(
'runner_harness_container_image') or _use_unified_worker(options):
# Default image is not used if user provides a runner harness image.
# Default runner harness image is selected by the service for unified
# worker.
pass
else:
runner_harness_override = (get_runner_harness_container_image())
if runner_harness_override:
self.debug_options.add_experiment(
'runner_harness_container_image=' + runner_harness_override)
debug_options_experiments = self.debug_options.experiments
# Add use_multiple_sdk_containers flag if it's not already present. Do not
# add the flag if 'no_use_multiple_sdk_containers' is present.
# TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
# till version 2.4.
if ('use_multiple_sdk_containers' not in debug_options_experiments and
'no_use_multiple_sdk_containers' not in debug_options_experiments):
debug_options_experiments.append('use_multiple_sdk_containers')
# FlexRS
if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED':
self.proto.flexResourceSchedulingGoal = (
dataflow.Environment.FlexResourceSchedulingGoalValueValuesEnum.
FLEXRS_COST_OPTIMIZED)
elif self.google_cloud_options.flexrs_goal == 'SPEED_OPTIMIZED':
self.proto.flexResourceSchedulingGoal = (
dataflow.Environment.FlexResourceSchedulingGoalValueValuesEnum.
FLEXRS_SPEED_OPTIMIZED)
# Experiments
if self.debug_options.experiments:
for experiment in self.debug_options.experiments:
self.proto.experiments.append(experiment)
# Worker pool(s) information.
package_descriptors = []
for package in packages:
package_descriptors.append(
dataflow.Package(
location='%s/%s' % (
self.google_cloud_options.staging_location.replace(
'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE),
package),
name=package))
pool = dataflow.WorkerPool(
kind='local' if self.local else 'harness',
packages=package_descriptors,
taskrunnerSettings=dataflow.TaskRunnerSettings(
parallelWorkerSettings=dataflow.WorkerSettings(
baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
servicePath=self.google_cloud_options.dataflow_endpoint)))
pool.autoscalingSettings = dataflow.AutoscalingSettings()
# Set worker pool options received through command line.
if self.worker_options.num_workers:
pool.numWorkers = self.worker_options.num_workers
if self.worker_options.max_num_workers:
pool.autoscalingSettings.maxNumWorkers = (
self.worker_options.max_num_workers)
if self.worker_options.autoscaling_algorithm:
values_enum = dataflow.AutoscalingSettings.AlgorithmValueValuesEnum
pool.autoscalingSettings.algorithm = {
'NONE': values_enum.AUTOSCALING_ALGORITHM_NONE,
'THROUGHPUT_BASED': values_enum.AUTOSCALING_ALGORITHM_BASIC,
}.get(self.worker_options.autoscaling_algorithm)
if self.worker_options.machine_type:
pool.machineType = self.worker_options.machine_type
if self.worker_options.disk_size_gb:
pool.diskSizeGb = self.worker_options.disk_size_gb
if self.worker_options.disk_type:
pool.diskType = self.worker_options.disk_type
if self.worker_options.zone:
pool.zone = self.worker_options.zone
if self.worker_options.network:
pool.network = self.worker_options.network
if self.worker_options.subnetwork:
pool.subnetwork = self.worker_options.subnetwork
pool.workerHarnessContainerImage = (
get_container_image_from_options(options))
# Setting worker pool sdk_harness_container_images option for supported
# Dataflow workers.
environments_to_use = self._get_environments_from_tranforms()
if _use_unified_worker(options):
python_sdk_container_image = get_container_image_from_options(options)
# Adding container images for other SDKs that may be needed for
# cross-language pipelines.
for id, environment in environments_to_use:
if environment.urn != common_urns.environments.DOCKER.urn:
raise Exception(
'Dataflow can only execute pipeline steps in Docker environments.'
' Received %r.' % environment)
environment_payload = proto_utils.parse_Bytes(
environment.payload, beam_runner_api_pb2.DockerPayload)
container_image_url = environment_payload.container_image
container_image = dataflow.SdkHarnessContainerImage()
container_image.containerImage = container_image_url
# Currently we only set following to True for Python SDK.
# TODO: set this correctly for remote environments that might be Python.
container_image.useSingleCorePerContainer = (
container_image_url == python_sdk_container_image)
container_image.environmentId = id
pool.sdkHarnessContainerImages.append(container_image)
if self.debug_options.number_of_worker_harness_threads:
pool.numThreadsPerWorker = (
self.debug_options.number_of_worker_harness_threads)
if self.worker_options.use_public_ips is not None:
if self.worker_options.use_public_ips:
pool.ipConfiguration = (
dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC)
else:
pool.ipConfiguration = (
dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE
)
if self.standard_options.streaming:
# Use separate data disk for streaming.
disk = dataflow.Disk()
if self.local:
disk.diskType = 'local'
if self.worker_options.disk_type:
disk.diskType = self.worker_options.disk_type
pool.dataDisks.append(disk)
self.proto.workerPools.append(pool)
sdk_pipeline_options = options.get_all_options()
if sdk_pipeline_options:
self.proto.sdkPipelineOptions = (
dataflow.Environment.SdkPipelineOptionsValue())
options_dict = {
k: v
for k, v in sdk_pipeline_options.items() if v is not None
}
options_dict["pipelineUrl"] = proto_pipeline_staged_url
self.proto.sdkPipelineOptions.additionalProperties.append(
dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
key='options', value=to_json_value(options_dict)))
dd = DisplayData.create_from_options(options)
items = [item.get_dict() for item in dd.items]
self.proto.sdkPipelineOptions.additionalProperties.append(
dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
key='display_data', value=to_json_value(items)))
if self.google_cloud_options.dataflow_service_options:
for option in self.google_cloud_options.dataflow_service_options:
self.proto.serviceOptions.append(option)
if self.google_cloud_options.enable_hot_key_logging:
self.proto.debugOptions = dataflow.DebugOptions(enableHotKeyLogging=True)
def _get_environments_from_tranforms(self):
if not self._proto_pipeline:
return []
environment_ids = set(
transform.environment_id
for transform in self._proto_pipeline.components.transforms.values()
if transform.environment_id)
return [(id, self._proto_pipeline.components.environments[id])
for id in environment_ids]
def _get_python_sdk_name(self):
python_version = '%d.%d' % (sys.version_info[0], sys.version_info[1])
return 'Apache Beam Python %s SDK' % python_version
class Job(object):
"""Wrapper for a dataflow Job protobuf."""
def __str__(self):
def encode_shortstrings(input_buffer, errors='strict'):
"""Encoder (from Unicode) that suppresses long base64 strings."""
original_len = len(input_buffer)
if original_len > 150:
if self.base64_str_re.match(input_buffer):
input_buffer = '<string of %d bytes>' % original_len
input_buffer = input_buffer.encode('ascii', errors=errors)
else:
matched = self.coder_str_re.match(input_buffer)
if matched:
input_buffer = '%s<string of %d bytes>' % (
matched.group(1), matched.end(2) - matched.start(2))
input_buffer = input_buffer.encode('ascii', errors=errors)
return input_buffer, original_len
def decode_shortstrings(input_buffer, errors='strict'):
"""Decoder (to Unicode) that suppresses long base64 strings."""
shortened, length = encode_shortstrings(input_buffer, errors)
return str(shortened), length
def shortstrings_registerer(encoding_name):
if encoding_name == 'shortstrings':
return codecs.CodecInfo(
name='shortstrings',
encode=encode_shortstrings,
decode=decode_shortstrings)
return None
codecs.register(shortstrings_registerer)
# Use json "dump string" method to get readable formatting;
# further modify it to not output too-long strings, aimed at the
# 10,000+ character hex-encoded "serialized_fn" values.
return json.dumps(
json.loads(encoding.MessageToJson(self.proto), encoding='shortstrings'),
indent=2,
sort_keys=True)
@staticmethod
def _build_default_job_name(user_name):
"""Generates a default name for a job.
user_name is lowercased, and any characters outside of [-a-z0-9]
are removed. If necessary, the user_name is truncated to shorten
the job name to 63 characters."""
user_name = re.sub('[^-a-z0-9]', '', user_name.lower())
date_component = datetime.utcnow().strftime('%m%d%H%M%S-%f')
app_user_name = 'beamapp-{}'.format(user_name)
job_name = '{}-{}'.format(app_user_name, date_component)
if len(job_name) > 63:
job_name = '{}-{}'.format(
app_user_name[:-(len(job_name) - 63)], date_component)
return job_name
@staticmethod
def default_job_name(job_name):
if job_name is None:
job_name = Job._build_default_job_name(getpass.getuser())
return job_name
def __init__(self, options, proto_pipeline):
self.options = options
self.proto_pipeline = proto_pipeline
self.google_cloud_options = options.view_as(GoogleCloudOptions)
if not self.google_cloud_options.job_name:
self.google_cloud_options.job_name = self.default_job_name(
self.google_cloud_options.job_name)
required_google_cloud_options = ['project', 'job_name', 'temp_location']
missing = [
option for option in required_google_cloud_options
if not getattr(self.google_cloud_options, option)
]
if missing:
raise ValueError(
'Missing required configuration parameters: %s' % missing)
if not self.google_cloud_options.staging_location:
_LOGGER.info(
'Defaulting to the temp_location as staging_location: %s',
self.google_cloud_options.temp_location)
(
self.google_cloud_options.staging_location
) = self.google_cloud_options.temp_location
# Make the staging and temp locations job name and time specific. This is
# needed to avoid clashes between job submissions using the same staging
# area or team members using same job names. This method is not entirely
# foolproof since two job submissions with same name can happen at exactly
# the same time. However the window is extremely small given that
# time.time() has at least microseconds granularity. We add the suffix only
# for GCS staging locations where the potential for such clashes is high.
if self.google_cloud_options.staging_location.startswith('gs://'):
path_suffix = '%s.%f' % (self.google_cloud_options.job_name, time.time())
self.google_cloud_options.staging_location = FileSystems.join(
self.google_cloud_options.staging_location, path_suffix)
self.google_cloud_options.temp_location = FileSystems.join(
self.google_cloud_options.temp_location, path_suffix)
self.proto = dataflow.Job(name=self.google_cloud_options.job_name)
if self.options.view_as(StandardOptions).streaming:
self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING
else:
self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH
if self.google_cloud_options.update:
self.proto.replaceJobId = self.job_id_for_name(self.proto.name)
if self.google_cloud_options.transform_name_mapping:
self.proto.transformNameMapping = (
dataflow.Job.TransformNameMappingValue())
for _, (key, value) in enumerate(
self.google_cloud_options.transform_name_mapping.items()):
self.proto.transformNameMapping.additionalProperties.append(
dataflow.Job.TransformNameMappingValue.AdditionalProperty(
key=key, value=value))
if self.google_cloud_options.create_from_snapshot:
self.proto.createdFromSnapshotId = (
self.google_cloud_options.create_from_snapshot)
# Labels.
if self.google_cloud_options.labels:
self.proto.labels = dataflow.Job.LabelsValue()
for label in self.google_cloud_options.labels:
parts = label.split('=', 1)
key = parts[0]
value = parts[1] if len(parts) > 1 else ''
self.proto.labels.additionalProperties.append(
dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value))
self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$')
self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')
def job_id_for_name(self, job_name):
return DataflowApplicationClient(
self.google_cloud_options).job_id_for_name(job_name)
def json(self):
return encoding.MessageToJson(self.proto)
def __reduce__(self):
"""Reduce hook for pickling the Job class more easily."""
return (Job, (self.options, ))
class DataflowApplicationClient(object):
"""A Dataflow API client used by application code to create and query jobs."""
def __init__(self, options):
"""Initializes a Dataflow API client object."""
self.standard_options = options.view_as(StandardOptions)
self.google_cloud_options = options.view_as(GoogleCloudOptions)
if _use_fnapi(options):
self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
else:
self.environment_version = _LEGACY_ENVIRONMENT_MAJOR_VERSION
if self.google_cloud_options.no_auth:
credentials = None
else:
credentials = get_service_credentials()
http_client = get_new_http()
self._client = dataflow.DataflowV1b3(
url=self.google_cloud_options.dataflow_endpoint,
credentials=credentials,
get_credentials=(not self.google_cloud_options.no_auth),
http=http_client,
response_encoding=get_response_encoding())
self._storage_client = storage.StorageV1(
url='https://www.googleapis.com/storage/v1',
credentials=credentials,
get_credentials=(not self.google_cloud_options.no_auth),
http=http_client,
response_encoding=get_response_encoding())
self._sdk_image_overrides = self._get_sdk_image_overrides(options)
def _get_sdk_image_overrides(self, pipeline_options):
worker_options = pipeline_options.view_as(WorkerOptions)
sdk_overrides = worker_options.sdk_harness_container_image_overrides
return (
dict(s.split(',', 1)
for s in sdk_overrides) if sdk_overrides else dict())
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def _gcs_file_copy(self, from_path, to_path):
to_folder, to_name = os.path.split(to_path)
total_size = os.path.getsize(from_path)
with open(from_path, 'rb') as f:
self.stage_file(to_folder, to_name, f, total_size=total_size)
def _stage_resources(self, pipeline, options):
google_cloud_options = options.view_as(GoogleCloudOptions)
if google_cloud_options.staging_location is None:
raise RuntimeError('The --staging_location option must be specified.')
if google_cloud_options.temp_location is None:
raise RuntimeError('The --temp_location option must be specified.')
resources = []
hashes = {}
for _, env in sorted(pipeline.components.environments.items(),
key=lambda kv: kv[0]):
for dep in env.dependencies:
if dep.type_urn != common_urns.artifact_types.FILE.urn:
raise RuntimeError('unsupported artifact type %s' % dep.type_urn)
if dep.role_urn != common_urns.artifact_roles.STAGING_TO.urn:
raise RuntimeError('unsupported role type %s' % dep.role_urn)
type_payload = beam_runner_api_pb2.ArtifactFilePayload.FromString(
dep.type_payload)
role_payload = (
beam_runner_api_pb2.ArtifactStagingToRolePayload.FromString(
dep.role_payload))
if type_payload.sha256 and type_payload.sha256 in hashes:
_LOGGER.info(
'Found duplicated artifact: %s (%s)',
type_payload.path,
type_payload.sha256)
staged_name = hashes[type_payload.sha256]
dep.role_payload = beam_runner_api_pb2.ArtifactStagingToRolePayload(
staged_name=staged_name).SerializeToString()
else:
staged_name = role_payload.staged_name
resources.append((type_payload.path, staged_name))
hashes[type_payload.sha256] = staged_name
if FileSystems.get_scheme(
google_cloud_options.staging_location) == GCSFileSystem.scheme():
dep.type_urn = common_urns.artifact_types.URL.urn
dep.type_payload = beam_runner_api_pb2.ArtifactUrlPayload(
url=FileSystems.join(
google_cloud_options.staging_location, staged_name),
sha256=type_payload.sha256).SerializeToString()
else:
dep.type_payload = beam_runner_api_pb2.ArtifactFilePayload(
path=FileSystems.join(
google_cloud_options.staging_location, staged_name),
sha256=type_payload.sha256).SerializeToString()
resource_stager = _LegacyDataflowStager(self)
staged_resources = resource_stager.stage_job_resources(
resources, staging_location=google_cloud_options.staging_location)
return staged_resources
def stage_file(
self,
gcs_or_local_path,
file_name,
stream,
mime_type='application/octet-stream',
total_size=None):
"""Stages a file at a GCS or local path with stream-supplied contents."""
if not gcs_or_local_path.startswith('gs://'):
local_path = FileSystems.join(gcs_or_local_path, file_name)
_LOGGER.info('Staging file locally to %s', local_path)
with open(local_path, 'wb') as f:
f.write(stream.read())
return
gcs_location = FileSystems.join(gcs_or_local_path, file_name)
bucket, name = gcs_location[5:].split('/', 1)
request = storage.StorageObjectsInsertRequest(bucket=bucket, name=name)
start_time = time.time()
_LOGGER.info('Starting GCS upload to %s...', gcs_location)
upload = storage.Upload(stream, mime_type, total_size)
try:
response = self._storage_client.objects.Insert(request, upload=upload)
except exceptions.HttpError as e:
reportable_errors = {
403: 'access denied',
404: 'bucket not found',
}
if e.status_code in reportable_errors:
raise IOError((
'Could not upload to GCS path %s: %s. Please verify '
'that credentials are valid and that you have write '
'access to the specified path.') %
(gcs_or_local_path, reportable_errors[e.status_code]))
raise
_LOGGER.info(
'Completed GCS upload to %s in %s seconds.',
gcs_location,
int(time.time() - start_time))
return response
@retry.no_retries # Using no_retries marks this as an integration point.
def create_job(self, job):
"""Creates job description. May stage and/or submit for remote execution."""
self.create_job_description(job)
# Stage and submit the job when necessary
dataflow_job_file = job.options.view_as(DebugOptions).dataflow_job_file
template_location = (
job.options.view_as(GoogleCloudOptions).template_location)
if job.options.view_as(DebugOptions).lookup_experiment('upload_graph'):
# For Runner V2, also set portable job submission.
if _use_unified_worker(job.options):
job.options.view_as(DebugOptions).add_experiment(
'use_portable_job_submission')
self.stage_file(
job.options.view_as(GoogleCloudOptions).staging_location,
"dataflow_graph.json",
io.BytesIO(job.json().encode('utf-8')))
del job.proto.steps[:]
job.proto.stepsLocation = FileSystems.join(
job.options.view_as(GoogleCloudOptions).staging_location,
"dataflow_graph.json")
# template file generation should be placed immediately before the
# conditional API call.
job_location = template_location or dataflow_job_file
if job_location:
gcs_or_local_path = os.path.dirname(job_location)
file_name = os.path.basename(job_location)
self.stage_file(
gcs_or_local_path, file_name, io.BytesIO(job.json().encode('utf-8')))
if not template_location:
return self.submit_job_description(job)
_LOGGER.info(
'A template was just created at location %s', template_location)
return None
@staticmethod
def _update_container_image_for_dataflow(beam_container_image_url):
# By default Dataflow pipelines use containers hosted in Dataflow GCR
# instead of Docker Hub.
image_suffix = beam_container_image_url.rsplit('/', 1)[1]
return names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + image_suffix
@staticmethod
def _apply_sdk_environment_overrides(
proto_pipeline, sdk_overrides, pipeline_options):
# Updates container image URLs for Dataflow.
# For a given container image URL
# * If a matching override has been provided that will be used.
# * For improved performance, External Apache Beam container images that are
# not explicitly overridden will be
# updated to use GCR copies instead of directly downloading from the
# Docker Hub.
current_sdk_container_image = get_container_image_from_options(
pipeline_options)
for environment in proto_pipeline.components.environments.values():
docker_payload = proto_utils.parse_Bytes(
environment.payload, beam_runner_api_pb2.DockerPayload)
overridden = False
new_container_image = docker_payload.container_image
for pattern, override in sdk_overrides.items():
new_container_image = re.sub(pattern, override, new_container_image)
if new_container_image != docker_payload.container_image:
overridden = True
# Container of the current (Python) SDK is overridden separately, hence
# not updated here.
if (is_apache_beam_container(new_container_image) and not overridden and
new_container_image != current_sdk_container_image):
new_container_image = (
DataflowApplicationClient._update_container_image_for_dataflow(
docker_payload.container_image))
if not new_container_image:
raise ValueError(
'SDK Docker container image has to be a non-empty string')
new_payload = copy(docker_payload)
new_payload.container_image = new_container_image
environment.payload = new_payload.SerializeToString()
def create_job_description(self, job):
"""Creates a job described by the workflow proto."""
DataflowApplicationClient._apply_sdk_environment_overrides(
job.proto_pipeline, self._sdk_image_overrides, job.options)
# Stage other resources for the SDK harness
resources = self._stage_resources(job.proto_pipeline, job.options)
# Stage proto pipeline.
self.stage_file(
job.google_cloud_options.staging_location,
shared_names.STAGED_PIPELINE_FILENAME,
io.BytesIO(job.proto_pipeline.SerializeToString()))
job.proto.environment = Environment(
proto_pipeline_staged_url=FileSystems.join(
job.google_cloud_options.staging_location,
shared_names.STAGED_PIPELINE_FILENAME),
packages=resources,
options=job.options,
environment_version=self.environment_version,
proto_pipeline=job.proto_pipeline,
_sdk_image_overrides=self._sdk_image_overrides).proto
_LOGGER.debug('JOB: %s', job)
@retry.with_exponential_backoff(num_retries=3, initial_delay_secs=3)
def get_job_metrics(self, job_id):
request = dataflow.DataflowProjectsLocationsJobsGetMetricsRequest()
request.jobId = job_id
request.location = self.google_cloud_options.region
request.projectId = self.google_cloud_options.project
try:
response = self._client.projects_locations_jobs.GetMetrics(request)
except exceptions.BadStatusCodeError as e:
_LOGGER.error(
'HTTP status %d. Unable to query metrics', e.response.status)
raise
return response
@retry.with_exponential_backoff(num_retries=3)
def submit_job_description(self, job):
"""Creates and excutes a job request."""
request = dataflow.DataflowProjectsLocationsJobsCreateRequest()
request.projectId = self.google_cloud_options.project
request.location = self.google_cloud_options.region
request.job = job.proto
try:
response = self._client.projects_locations_jobs.Create(request)
except exceptions.BadStatusCodeError as e:
_LOGGER.error(
'HTTP status %d trying to create job'
' at dataflow service endpoint %s',
e.response.status,
self.google_cloud_options.dataflow_endpoint)
_LOGGER.fatal('details of server error: %s', e)
raise
_LOGGER.info('Create job: %s', response)
# The response is a Job proto with the id for the new job.
_LOGGER.info('Created job with id: [%s]', response.id)
_LOGGER.info('Submitted job: %s', response.id)
_LOGGER.info(
'To access the Dataflow monitoring console, please navigate to '
'https://console.cloud.google.com/dataflow/jobs/%s/%s?project=%s',
self.google_cloud_options.region,
response.id,
self.google_cloud_options.project)
return response
@retry.with_exponential_backoff() # Using retry defaults from utils/retry.py
def modify_job_state(self, job_id, new_state):
"""Modify the run state of the job.
Args:
job_id: The id of the job.
new_state: A string representing the new desired state. It could be set to
either 'JOB_STATE_DONE', 'JOB_STATE_CANCELLED' or 'JOB_STATE_DRAINING'.
Returns:
True if the job was modified successfully.
"""
if new_state == 'JOB_STATE_DONE':
new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_DONE
elif new_state == 'JOB_STATE_CANCELLED':
new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_CANCELLED
elif new_state == 'JOB_STATE_DRAINING':
new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_DRAINING
else:
# Other states could only be set by the service.
return False
request = dataflow.DataflowProjectsLocationsJobsUpdateRequest()
request.jobId = job_id
request.projectId = self.google_cloud_options.project
request.location = self.google_cloud_options.region
request.job = dataflow.Job(requestedState=new_state)
self._client.projects_locations_jobs.Update(request)
return True
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_notfound_filter)
def get_job(self, job_id):
"""Gets the job status for a submitted job.
Args:
job_id: A string representing the job_id for the workflow as returned
by the a create_job() request.
Returns:
A Job proto. See below for interesting fields.
The Job proto returned from a get_job() request contains some interesting
fields:
currentState: An object representing the current state of the job. The
string representation of the object (str() result) has the following
possible values: JOB_STATE_UNKNONW, JOB_STATE_STOPPED,
JOB_STATE_RUNNING, JOB_STATE_DONE, JOB_STATE_FAILED,
JOB_STATE_CANCELLED.
createTime: UTC time when the job was created
(e.g. '2015-03-10T00:01:53.074Z')
currentStateTime: UTC time for the current state of the job.
"""
request = dataflow.DataflowProjectsLocationsJobsGetRequest()
request.jobId = job_id
request.projectId = self.google_cloud_options.project
request.location = self.google_cloud_options.region
response = self._client.projects_locations_jobs.Get(request)
return response
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_notfound_filter)
def list_messages(
self,
job_id,
start_time=None,
end_time=None,
page_token=None,
minimum_importance=None):
"""List messages associated with the execution of a job.
Args:
job_id: A string representing the job_id for the workflow as returned
by the a create_job() request.
start_time: If specified, only messages generated after the start time
will be returned, otherwise all messages since job started will be
returned. The value is a string representing UTC time
(e.g., '2015-08-18T21:03:50.644Z')
end_time: If specified, only messages generated before the end time
will be returned, otherwise all messages up to current time will be
returned. The value is a string representing UTC time
(e.g., '2015-08-18T21:03:50.644Z')
page_token: A string to be used as next page token if the list call
returned paginated results.
minimum_importance: Filter for messages based on importance. The possible
string values in increasing order of importance are: JOB_MESSAGE_DEBUG,
JOB_MESSAGE_DETAILED, JOB_MESSAGE_BASIC, JOB_MESSAGE_WARNING,
JOB_MESSAGE_ERROR. For example, a filter set on warning will allow only
warnings and errors and exclude all others.
Returns:
A tuple consisting of a list of JobMessage instances and a
next page token string.
Raises:
RuntimeError: if an unexpected value for the message_importance argument
is used.
The JobMessage objects returned by the call contain the following fields:
id: A unique string identifier for the message.
time: A string representing the UTC time of the message
(e.g., '2015-08-18T21:03:50.644Z')
messageImportance: An enumeration value for the message importance. The
value if converted to string will have the following possible values:
JOB_MESSAGE_DEBUG, JOB_MESSAGE_DETAILED, JOB_MESSAGE_BASIC,
JOB_MESSAGE_WARNING, JOB_MESSAGE_ERROR.
messageText: A message string.
"""
request = dataflow.DataflowProjectsLocationsJobsMessagesListRequest(
jobId=job_id,
location=self.google_cloud_options.region,
projectId=self.google_cloud_options.project)
if page_token is not None:
request.pageToken = page_token
if start_time is not None:
request.startTime = start_time
if end_time is not None:
request.endTime = end_time
if minimum_importance is not None:
if minimum_importance == 'JOB_MESSAGE_DEBUG':
request.minimumImportance = (
dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
MinimumImportanceValueValuesEnum.JOB_MESSAGE_DEBUG)
elif minimum_importance == 'JOB_MESSAGE_DETAILED':
request.minimumImportance = (
dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
MinimumImportanceValueValuesEnum.JOB_MESSAGE_DETAILED)
elif minimum_importance == 'JOB_MESSAGE_BASIC':
request.minimumImportance = (
dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
MinimumImportanceValueValuesEnum.JOB_MESSAGE_BASIC)
elif minimum_importance == 'JOB_MESSAGE_WARNING':
request.minimumImportance = (
dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
MinimumImportanceValueValuesEnum.JOB_MESSAGE_WARNING)
elif minimum_importance == 'JOB_MESSAGE_ERROR':
request.minimumImportance = (
dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
MinimumImportanceValueValuesEnum.JOB_MESSAGE_ERROR)
else:
raise RuntimeError(
'Unexpected value for minimum_importance argument: %r' %
minimum_importance)
response = self._client.projects_locations_jobs_messages.List(request)
return response.jobMessages, response.nextPageToken
def job_id_for_name(self, job_name):
token = None
while True:
request = dataflow.DataflowProjectsLocationsJobsListRequest(
projectId=self.google_cloud_options.project,
location=self.google_cloud_options.region,
pageToken=token)
response = self._client.projects_locations_jobs.List(request)
for job in response.jobs:
if (job.name == job_name and job.currentState in [
dataflow.Job.CurrentStateValueValuesEnum.JOB_STATE_RUNNING,
dataflow.Job.CurrentStateValueValuesEnum.JOB_STATE_DRAINING
]):
return job.id
token = response.nextPageToken
if token is None:
raise ValueError("No running job found with name '%s'" % job_name)
class MetricUpdateTranslators(object):
"""Translators between accumulators and dataflow metric updates."""
@staticmethod
def translate_boolean(accumulator, metric_update_proto):
metric_update_proto.boolean = accumulator.value
@staticmethod
def translate_scalar_mean_int(accumulator, metric_update_proto):
if accumulator.count:
metric_update_proto.integerMean = dataflow.IntegerMean()
metric_update_proto.integerMean.sum = to_split_int(accumulator.sum)
metric_update_proto.integerMean.count = to_split_int(accumulator.count)
else:
metric_update_proto.nameAndKind.kind = None
@staticmethod
def translate_scalar_mean_float(accumulator, metric_update_proto):
if accumulator.count:
metric_update_proto.floatingPointMean = dataflow.FloatingPointMean()
metric_update_proto.floatingPointMean.sum = accumulator.sum
metric_update_proto.floatingPointMean.count = to_split_int(
accumulator.count)
else:
metric_update_proto.nameAndKind.kind = None
@staticmethod
def translate_scalar_counter_int(accumulator, metric_update_proto):
metric_update_proto.integer = to_split_int(accumulator.value)
@staticmethod
def translate_scalar_counter_float(accumulator, metric_update_proto):
metric_update_proto.floatingPoint = accumulator.value
class _LegacyDataflowStager(Stager):
def __init__(self, dataflow_application_client):
super(_LegacyDataflowStager, self).__init__()
self._dataflow_application_client = dataflow_application_client
def stage_artifact(self, local_path_to_artifact, artifact_name):
self._dataflow_application_client._gcs_file_copy(
local_path_to_artifact, artifact_name)
def commit_manifest(self):
pass
@staticmethod
def get_sdk_package_name():
"""For internal use only; no backwards-compatibility guarantees.
Returns the PyPI package name to be staged to Google Cloud Dataflow.
"""
return shared_names.BEAM_PACKAGE_NAME
def to_split_int(n):
res = dataflow.SplitInt64()
res.lowBits = n & 0xffffffff
res.highBits = n >> 32
return res
def translate_distribution(distribution_update, metric_update_proto):
"""Translate metrics DistributionUpdate to dataflow distribution update.
Args:
distribution_update: Instance of DistributionData,
DistributionInt64Accumulator or DataflowDistributionCounter.
metric_update_proto: Used for report metrics.
"""
dist_update_proto = dataflow.DistributionUpdate()
dist_update_proto.min = to_split_int(distribution_update.min)
dist_update_proto.max = to_split_int(distribution_update.max)
dist_update_proto.count = to_split_int(distribution_update.count)
dist_update_proto.sum = to_split_int(distribution_update.sum)
# DataflowDistributionCounter needs to translate histogram
if isinstance(distribution_update, DataflowDistributionCounter):
dist_update_proto.histogram = dataflow.Histogram()
distribution_update.translate_to_histogram(dist_update_proto.histogram)
metric_update_proto.distribution = dist_update_proto
def translate_value(value, metric_update_proto):
metric_update_proto.integer = to_split_int(value)
def translate_mean(accumulator, metric_update):
if accumulator.count:
metric_update.meanSum = to_json_value(accumulator.sum, with_type=True)
metric_update.meanCount = to_json_value(accumulator.count, with_type=True)
else:
# A denominator of 0 will raise an error in the service.
# What it means is we have nothing to report yet, so don't.
metric_update.kind = None
def _use_fnapi(pipeline_options):
standard_options = pipeline_options.view_as(StandardOptions)
debug_options = pipeline_options.view_as(DebugOptions)
return standard_options.streaming or (
debug_options.experiments and 'beam_fn_api' in debug_options.experiments)
def _use_unified_worker(pipeline_options):
debug_options = pipeline_options.view_as(DebugOptions)
use_unified_worker_flag = 'use_unified_worker'
use_runner_v2_flag = 'use_runner_v2'
if (debug_options.lookup_experiment(use_runner_v2_flag) and
not debug_options.lookup_experiment(use_unified_worker_flag)):
debug_options.add_experiment(use_unified_worker_flag)
return debug_options.lookup_experiment(use_unified_worker_flag)
def _get_container_image_tag():
base_version = pkg_resources.parse_version(
beam_version.__version__).base_version
if base_version != beam_version.__version__:
warnings.warn(
"A non-standard version of Beam SDK detected: %s. "
"Dataflow runner will use container image tag %s. "
"This use case is not supported." %
(beam_version.__version__, base_version))
return base_version
def get_container_image_from_options(pipeline_options):
"""For internal use only; no backwards-compatibility guarantees.
Args:
pipeline_options (PipelineOptions): A container for pipeline options.
Returns:
str: Container image for remote execution.
"""
worker_options = pipeline_options.view_as(WorkerOptions)
if worker_options.sdk_container_image:
return worker_options.sdk_container_image
use_fnapi = _use_fnapi(pipeline_options)
# TODO(tvalentyn): Use enumerated type instead of strings for job types.
if use_fnapi:
fnapi_suffix = '-fnapi'
else:
fnapi_suffix = ''
version_suffix = '%s%s' % (sys.version_info[0:2])
image_name = '{repository}/python{version_suffix}{fnapi_suffix}'.format(
repository=names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY,
version_suffix=version_suffix,
fnapi_suffix=fnapi_suffix)
image_tag = _get_required_container_version(use_fnapi)
return image_name + ':' + image_tag
def _get_required_container_version(use_fnapi):
"""For internal use only; no backwards-compatibility guarantees.
Args:
use_fnapi (bool): True, if pipeline is using FnAPI, False otherwise.
Returns:
str: The tag of worker container images in GCR that corresponds to
current version of the SDK.
"""
if 'dev' in beam_version.__version__:
if use_fnapi:
return names.BEAM_FNAPI_CONTAINER_VERSION
else:
return names.BEAM_CONTAINER_VERSION
else:
return _get_container_image_tag()
def get_runner_harness_container_image():
"""For internal use only; no backwards-compatibility guarantees.
Returns:
str: Runner harness container image that shall be used by default
for current SDK version or None if the runner harness container image
bundled with the service shall be used.
"""
# Pin runner harness for released versions of the SDK.
if 'dev' not in beam_version.__version__:
return (
names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + 'harness' + ':' +
_get_container_image_tag())
# Don't pin runner harness for dev versions so that we can notice
# potential incompatibility between runner and sdk harnesses.
return None
def get_response_encoding():
"""Encoding to use to decode HTTP response from Google APIs."""
return 'utf8'
def _verify_interpreter_version_is_supported(pipeline_options):
if ('%s.%s' %
(sys.version_info[0],
sys.version_info[1]) in _PYTHON_VERSIONS_SUPPORTED_BY_DATAFLOW):
return
if 'dev' in beam_version.__version__:
return
debug_options = pipeline_options.view_as(DebugOptions)
if (debug_options.experiments and
'use_unsupported_python_version' in debug_options.experiments):
return
raise Exception(
'Dataflow runner currently supports Python versions %s, got %s.\n'
'To ignore this requirement and start a job '
'using an unsupported version of Python interpreter, pass '
'--experiment use_unsupported_python_version pipeline option.' %
(_PYTHON_VERSIONS_SUPPORTED_BY_DATAFLOW, sys.version))
# To enable a counter on the service, add it to this dictionary.
# This is required for the legacy python dataflow runner, as portability
# does not communicate to the service via python code, but instead via a
# a runner harness (in C++ or Java).
# TODO(BEAM-7050) : Remove this antipattern, legacy dataflow python
# pipelines will break whenever a new cy_combiner type is used.
structured_counter_translations = {
cy_combiners.CountCombineFn: (
dataflow.CounterMetadata.KindValueValuesEnum.SUM,
MetricUpdateTranslators.translate_scalar_counter_int),
cy_combiners.SumInt64Fn: (
dataflow.CounterMetadata.KindValueValuesEnum.SUM,
MetricUpdateTranslators.translate_scalar_counter_int),
cy_combiners.MinInt64Fn: (
dataflow.CounterMetadata.KindValueValuesEnum.MIN,
MetricUpdateTranslators.translate_scalar_counter_int),
cy_combiners.MaxInt64Fn: (
dataflow.CounterMetadata.KindValueValuesEnum.MAX,
MetricUpdateTranslators.translate_scalar_counter_int),
cy_combiners.MeanInt64Fn: (
dataflow.CounterMetadata.KindValueValuesEnum.MEAN,
MetricUpdateTranslators.translate_scalar_mean_int),
cy_combiners.SumFloatFn: (
dataflow.CounterMetadata.KindValueValuesEnum.SUM,
MetricUpdateTranslators.translate_scalar_counter_float),
cy_combiners.MinFloatFn: (
dataflow.CounterMetadata.KindValueValuesEnum.MIN,
MetricUpdateTranslators.translate_scalar_counter_float),
cy_combiners.MaxFloatFn: (
dataflow.CounterMetadata.KindValueValuesEnum.MAX,
MetricUpdateTranslators.translate_scalar_counter_float),
cy_combiners.MeanFloatFn: (
dataflow.CounterMetadata.KindValueValuesEnum.MEAN,
MetricUpdateTranslators.translate_scalar_mean_float),
cy_combiners.AllCombineFn: (
dataflow.CounterMetadata.KindValueValuesEnum.AND,
MetricUpdateTranslators.translate_boolean),
cy_combiners.AnyCombineFn: (
dataflow.CounterMetadata.KindValueValuesEnum.OR,
MetricUpdateTranslators.translate_boolean),
cy_combiners.DataflowDistributionCounterFn: (
dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
translate_distribution),
cy_combiners.DistributionInt64Fn: (
dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
translate_distribution),
}
counter_translations = {
cy_combiners.CountCombineFn: (
dataflow.NameAndKind.KindValueValuesEnum.SUM,
MetricUpdateTranslators.translate_scalar_counter_int),
cy_combiners.SumInt64Fn: (
dataflow.NameAndKind.KindValueValuesEnum.SUM,
MetricUpdateTranslators.translate_scalar_counter_int),
cy_combiners.MinInt64Fn: (
dataflow.NameAndKind.KindValueValuesEnum.MIN,
MetricUpdateTranslators.translate_scalar_counter_int),
cy_combiners.MaxInt64Fn: (
dataflow.NameAndKind.KindValueValuesEnum.MAX,
MetricUpdateTranslators.translate_scalar_counter_int),
cy_combiners.MeanInt64Fn: (
dataflow.NameAndKind.KindValueValuesEnum.MEAN,
MetricUpdateTranslators.translate_scalar_mean_int),
cy_combiners.SumFloatFn: (
dataflow.NameAndKind.KindValueValuesEnum.SUM,
MetricUpdateTranslators.translate_scalar_counter_float),
cy_combiners.MinFloatFn: (
dataflow.NameAndKind.KindValueValuesEnum.MIN,
MetricUpdateTranslators.translate_scalar_counter_float),
cy_combiners.MaxFloatFn: (
dataflow.NameAndKind.KindValueValuesEnum.MAX,
MetricUpdateTranslators.translate_scalar_counter_float),
cy_combiners.MeanFloatFn: (
dataflow.NameAndKind.KindValueValuesEnum.MEAN,
MetricUpdateTranslators.translate_scalar_mean_float),
cy_combiners.AllCombineFn: (
dataflow.NameAndKind.KindValueValuesEnum.AND,
MetricUpdateTranslators.translate_boolean),
cy_combiners.AnyCombineFn: (
dataflow.NameAndKind.KindValueValuesEnum.OR,
MetricUpdateTranslators.translate_boolean),
cy_combiners.DataflowDistributionCounterFn: (
dataflow.NameAndKind.KindValueValuesEnum.DISTRIBUTION,
translate_distribution),
cy_combiners.DistributionInt64Fn: (
dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
translate_distribution),
}