| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| DataflowRunner implementation of MetricResults. It is in charge of |
| responding to queries of current metrics by going to the dataflow |
| service. |
| """ |
| |
| from __future__ import absolute_import |
| |
| import argparse |
| import logging |
| import numbers |
| import sys |
| from collections import defaultdict |
| |
| from future.utils import iteritems |
| |
| from apache_beam.metrics.cells import DistributionData |
| from apache_beam.metrics.cells import DistributionResult |
| from apache_beam.metrics.execution import MetricKey |
| from apache_beam.metrics.execution import MetricResult |
| from apache_beam.metrics.metric import MetricResults |
| from apache_beam.metrics.metricbase import MetricName |
| from apache_beam.options.pipeline_options import GoogleCloudOptions |
| from apache_beam.options.pipeline_options import PipelineOptions |
| |
| |
| def _get_match(proto, filter_fn): |
| """Finds and returns the first element that matches a query. |
| |
| If no element matches the query, it throws ValueError. |
| If more than one element matches the query, it returns only the first. |
| """ |
| query = [elm for elm in proto if filter_fn(elm)] |
| if len(query) == 0: |
| raise ValueError('Could not find element') |
| elif len(query) > 1: |
| raise ValueError('Too many matches') |
| |
| return query[0] |
| |
| |
| # V1b3 MetricStructuredName keys to accept and copy to the MetricKey labels. |
| STEP_LABEL = 'step' |
| STRUCTURED_NAME_LABELS = set([ |
| 'execution_step', 'original_name', 'output_user_name']) |
| |
| |
| class DataflowMetrics(MetricResults): |
| """Implementation of MetricResults class for the Dataflow runner.""" |
| |
| def __init__(self, dataflow_client=None, job_result=None, job_graph=None): |
| """Initialize the Dataflow metrics object. |
| |
| Args: |
| dataflow_client: apiclient.DataflowApplicationClient to interact with the |
| dataflow service. |
| job_result: DataflowPipelineResult with the state and id information of |
| the job. |
| job_graph: apiclient.Job instance to be able to translate between internal |
| step names (e.g. "s2"), and user step names (e.g. "split"). |
| """ |
| super(DataflowMetrics, self).__init__() |
| self._dataflow_client = dataflow_client |
| self.job_result = job_result |
| self._queried_after_termination = False |
| self._cached_metrics = None |
| self._job_graph = job_graph |
| |
| @staticmethod |
| def _is_counter(metric_result): |
| return isinstance(metric_result.attempted, numbers.Number) |
| |
| @staticmethod |
| def _is_distribution(metric_result): |
| return isinstance(metric_result.attempted, DistributionResult) |
| |
| def _translate_step_name(self, internal_name): |
| """Translate between internal step names (e.g. "s1") and user step names.""" |
| if not self._job_graph: |
| raise ValueError('Could not translate the internal step name.') |
| |
| try: |
| step = _get_match(self._job_graph.proto.steps, |
| lambda x: x.name == internal_name) |
| user_step_name = _get_match( |
| step.properties.additionalProperties, |
| lambda x: x.key == 'user_name').value.string_value |
| except ValueError: |
| raise ValueError('Could not translate the internal step name.') |
| return user_step_name |
| |
| def _get_metric_key(self, metric): |
| """Populate the MetricKey object for a queried metric result.""" |
| step = "" |
| name = metric.name.name # Always extract a name |
| labels = dict() |
| try: # Try to extract the user step name. |
| # If ValueError is thrown within this try-block, it is because of |
| # one of the following: |
| # 1. Unable to translate the step name. Only happening with improperly |
| # formatted job graph (unlikely), or step name not being the internal |
| # step name (only happens for unstructured-named metrics). |
| # 2. Unable to unpack [step] or [namespace]; which should only happen |
| # for unstructured names. |
| step = _get_match(metric.name.context.additionalProperties, |
| lambda x: x.key == STEP_LABEL).value |
| step = self._translate_step_name(step) |
| except ValueError: |
| pass |
| |
| namespace = "dataflow/v1b3" # Try to extract namespace or add a default. |
| try: |
| namespace = _get_match(metric.name.context.additionalProperties, |
| lambda x: x.key == 'namespace').value |
| except ValueError: |
| pass |
| |
| for kv in metric.name.context.additionalProperties: |
| if kv.key in STRUCTURED_NAME_LABELS: |
| labels[kv.key] = kv.value |
| # Package everything besides namespace and name the labels as well, |
| # including unmodified step names to assist in integration the exact |
| # unmodified values which come from dataflow. |
| return MetricKey(step, MetricName(namespace, name), labels=labels) |
| |
| def _populate_metrics(self, response, result, user_metrics=False): |
| """Move metrics from response to results as MetricResults.""" |
| if user_metrics: |
| metrics = [metric |
| for metric in response.metrics |
| if metric.name.origin == 'user'] |
| else: |
| metrics = [metric |
| for metric in response.metrics |
| if metric.name.origin == 'dataflow/v1b3'] |
| |
| # Get the tentative/committed versions of every metric together. |
| metrics_by_name = defaultdict(lambda: {}) |
| for metric in metrics: |
| if (metric.name.name.endswith('[MIN]') or |
| metric.name.name.endswith('[MAX]') or |
| metric.name.name.endswith('[MEAN]') or |
| metric.name.name.endswith('[COUNT]')): |
| # The Dataflow Service presents distribution metrics in two ways: |
| # One way is as a single distribution object with all its fields, and |
| # another way is as four different scalar metrics labeled as [MIN], |
| # [MAX], [COUNT], [MEAN]. |
| # TODO(pabloem) remove these when distributions are not being broken up |
| # in the service. |
| # The second way is only useful for the UI, and should be ignored. |
| continue |
| is_tentative = [prop |
| for prop in metric.name.context.additionalProperties |
| if prop.key == 'tentative' and prop.value == 'true'] |
| tentative_or_committed = 'tentative' if is_tentative else 'committed' |
| |
| metric_key = self._get_metric_key(metric) |
| if metric_key is None: |
| continue |
| metrics_by_name[metric_key][tentative_or_committed] = metric |
| |
| # Now we create the MetricResult elements. |
| for metric_key, metric in iteritems(metrics_by_name): |
| attempted = self._get_metric_value(metric['tentative']) |
| committed = self._get_metric_value(metric['committed']) |
| result.append(MetricResult(metric_key, |
| attempted=attempted, |
| committed=committed)) |
| |
| def _get_metric_value(self, metric): |
| """Get a metric result object from a MetricUpdate from Dataflow API.""" |
| if metric is None: |
| return None |
| |
| if metric.scalar is not None: |
| return metric.scalar.integer_value |
| elif metric.distribution is not None: |
| dist_count = _get_match(metric.distribution.object_value.properties, |
| lambda x: x.key == 'count').value.integer_value |
| dist_min = _get_match(metric.distribution.object_value.properties, |
| lambda x: x.key == 'min').value.integer_value |
| dist_max = _get_match(metric.distribution.object_value.properties, |
| lambda x: x.key == 'max').value.integer_value |
| dist_sum = _get_match(metric.distribution.object_value.properties, |
| lambda x: x.key == 'sum').value.integer_value |
| return DistributionResult( |
| DistributionData( |
| dist_sum, dist_count, dist_min, dist_max)) |
| else: |
| return None |
| |
| def _get_metrics_from_dataflow(self, job_id=None): |
| """Return cached metrics or query the dataflow service.""" |
| if not job_id: |
| try: |
| job_id = self.job_result.job_id() |
| except AttributeError: |
| job_id = None |
| if not job_id: |
| raise ValueError('Can not query metrics. Job id is unknown.') |
| |
| if self._cached_metrics: |
| return self._cached_metrics |
| |
| job_metrics = self._dataflow_client.get_job_metrics(job_id) |
| # If we cannot determine that the job has terminated, |
| # then metrics will not change and we can cache them. |
| if self.job_result and self.job_result.is_in_terminal_state(): |
| self._cached_metrics = job_metrics |
| return job_metrics |
| |
| def all_metrics(self, job_id=None): |
| """Return all user and system metrics from the dataflow service.""" |
| metric_results = [] |
| response = self._get_metrics_from_dataflow(job_id=job_id) |
| self._populate_metrics(response, metric_results, user_metrics=True) |
| self._populate_metrics(response, metric_results, user_metrics=False) |
| return metric_results |
| |
| def query(self, filter=None): |
| metric_results = [] |
| response = self._get_metrics_from_dataflow() |
| self._populate_metrics(response, metric_results, user_metrics=True) |
| return {self.COUNTERS: [elm for elm in metric_results |
| if self.matches(filter, elm.key) |
| and DataflowMetrics._is_counter(elm)], |
| self.DISTRIBUTIONS: [elm for elm in metric_results |
| if self.matches(filter, elm.key) |
| and DataflowMetrics._is_distribution(elm)], |
| self.GAUGES: []} # TODO(pabloem): Add Gauge support for dataflow. |
| |
| |
| def main(argv): |
| """Print the metric results for a the dataflow --job_id and --project. |
| |
| Instead of running an entire pipeline which takes several minutes, use this |
| main method to display MetricResults for a specific --job_id and --project |
| which takes only a few seconds. |
| """ |
| # TODO(BEAM-6833): The MetricResults do not show translated step names as the |
| # job_graph is not provided to DataflowMetrics. |
| # Import here to avoid adding the dependency for local running scenarios. |
| try: |
| # pylint: disable=wrong-import-order, wrong-import-position |
| from apache_beam.runners.dataflow.internal import apiclient |
| except ImportError: |
| raise ImportError( |
| 'Google Cloud Dataflow runner not available, ' |
| 'please install apache_beam[gcp]') |
| if argv[0] == __file__: |
| argv = argv[1:] |
| parser = argparse.ArgumentParser() |
| parser.add_argument('-j', '--job_id', type=str, |
| help='The job id to query metrics for.') |
| parser.add_argument('-p', '--project', type=str, |
| help='The project name to query metrics for.') |
| flags = parser.parse_args(argv) |
| |
| # Get a Dataflow API client and set its project and job_id in the options. |
| options = PipelineOptions() |
| gcloud_options = options.view_as(GoogleCloudOptions) |
| gcloud_options.project = flags.project |
| dataflow_client = apiclient.DataflowApplicationClient(options) |
| df_metrics = DataflowMetrics(dataflow_client) |
| all_metrics = df_metrics.all_metrics(job_id=flags.job_id) |
| logging.info('Printing all MetricResults for %s in %s', |
| flags.job_id, flags.project) |
| for metric_result in all_metrics: |
| logging.info(metric_result) |
| |
| |
| if __name__ == '__main__': |
| logging.getLogger().setLevel(logging.INFO) |
| main(sys.argv) |