blob: a730f2cfc9bbaeb9980d81a819b3e55a4fcce400 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
# pytype: skip-file
import re
from apache_beam.io.gcp import gce_metadata_util
_VALID_CLOUD_LABEL_PATTERN = re.compile(r'^[a-z0-9\_\-]{1,63}$')
def _sanitize_value(value):
"""Sanitizes a value into a valid BigQuery label value."""
return re.sub(r'[^\w-]+', '', value.lower().replace('/', '-'))[0:63]
def _is_valid_cloud_label_value(label_value):
"""Returns true if label_value is a valid cloud label string.
This function can return false in cases where the label value is valid.
However, it will not return true in a case where the lavel value is invalid.
This is because a stricter set of allowed characters is used in this
validator, because foreign language characters are not accepted.
Thus, this should not be used as a generic validator for all cloud labels.
See Also:
https://cloud.google.com/compute/docs/labeling-resources
Args:
label_value: The label value to validate.
Returns:
True if the label value is a valid
"""
return _VALID_CLOUD_LABEL_PATTERN.match(label_value)
def create_bigquery_io_metadata(step_name=None):
"""Creates a BigQueryIOMetadata.
This will request metadata properly based on which runner is being used.
"""
dataflow_job_id = gce_metadata_util.fetch_dataflow_job_id()
# If a dataflow_job id is returned on GCE metadata. Then it means
# This program is running on a Dataflow GCE VM.
is_dataflow_runner = bool(dataflow_job_id)
kwargs = {}
if is_dataflow_runner:
# Only use this label if it is validated already.
# As we do not want a bad label to fail the BQ job.
if _is_valid_cloud_label_value(dataflow_job_id):
kwargs['beam_job_id'] = dataflow_job_id
if step_name:
step_name = _sanitize_value(step_name)
if _is_valid_cloud_label_value(step_name):
kwargs['step_name'] = step_name
return BigQueryIOMetadata(**kwargs)
class BigQueryIOMetadata(object):
"""Metadata class for BigQueryIO. i.e. to use as BQ job labels.
Do not construct directly, use the create_bigquery_io_metadata factory.
Which will request metadata properly based on which runner is being used.
"""
def __init__(self, beam_job_id=None, step_name=None):
self.beam_job_id = beam_job_id
self.step_name = step_name
def add_additional_bq_job_labels(self, job_labels=None):
job_labels = job_labels or {}
if self.beam_job_id and 'beam_job_id' not in job_labels:
job_labels['beam_job_id'] = self.beam_job_id
if self.step_name and 'step_name' not in job_labels:
job_labels['step_name'] = self.step_name
return job_labels