blob: 420f7ff23a6a10b8d496730d5f16257a3034b0fe [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module is for internal use only; no backwards-compatibility guarantees.
The classes in this file keep shared state, and organize metrics information.
Available classes:
- MetricKey - Internal key for a metric.
- MetricResult - Current status of a metric's updates/commits.
- _MetricsEnvironment - Keeps track of MetricsContainer and other metrics
information for every single execution working thread.
- MetricsContainer - Holds the metrics of a single step and a single
unit-of-commit (bundle).
"""
from __future__ import absolute_import
import threading
from builtins import object
from collections import defaultdict
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.cells import CounterCell
from apache_beam.metrics.cells import DistributionCell
from apache_beam.metrics.cells import GaugeCell
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.runners.worker import statesampler
class MetricKey(object):
"""Key used to identify instance of metric cell.
Metrics are internally keyed by the name of the step they're associated with,
the name and namespace (if it is a user defined metric) of the metric,
and any extra label metadata added by the runner specific metric collection
service.
"""
def __init__(self, step, metric, labels=None):
"""Initializes ``MetricKey``.
Args:
step: A string with the step this metric cell is part of.
metric: A ``MetricName`` namespace+name that identifies a metric.
labels: An arbitrary set of labels that also identifies the metric.
"""
self.step = step
self.metric = metric
self.labels = labels if labels else dict()
def __eq__(self, other):
return (self.step == other.step and
self.metric == other.metric and
self.labels == other.labels)
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __hash__(self):
return hash((self.step, self.metric, frozenset(self.labels)))
def __repr__(self):
return 'MetricKey(step={}, metric={}, labels={})'.format(
self.step, self.metric, self.labels)
class MetricResult(object):
"""Keeps track of the status of a metric within a single bundle.
It contains the physical and logical updates to the metric. Physical updates
are updates that have not necessarily been committed, but that have been made
during pipeline execution. Logical updates are updates that have been
committed.
Attributes:
key: A ``MetricKey`` that identifies the metric and bundle of this result.
committed: The committed updates of the metric. This attribute's type is
of metric type result (e.g. int, DistributionResult, GaugeResult).
attempted: The logical updates of the metric. This attribute's type is that
of metric type result (e.g. int, DistributionResult, GaugeResult).
"""
def __init__(self, key, committed, attempted):
"""Initializes ``MetricResult``.
Args:
key: A ``MetricKey`` object.
committed: Metric data that has been committed (e.g. logical updates)
attempted: Metric data that has been attempted (e.g. physical updates)
"""
self.key = key
self.committed = committed
self.attempted = attempted
def __eq__(self, other):
return (self.key == other.key and
self.committed == other.committed and
self.attempted == other.attempted)
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __hash__(self):
return hash((self.key, self.committed, self.attempted))
def __repr__(self):
return 'MetricResult(key={}, committed={}, attempted={})'.format(
self.key, str(self.committed), str(self.attempted))
def __str__(self):
return repr(self)
@property
def result(self):
"""Short-hand for falling back to attempted metrics if it seems that
committed was not populated (e.g. due to not being supported on a given
runner"""
return self.committed if self.committed else self.attempted
class _MetricsEnvironment(object):
"""Holds the MetricsContainer for every thread and other metric information.
This class is not meant to be instantiated, instead being used to keep
track of global state.
"""
def __init__(self):
self.METRICS_SUPPORTED = False
self._METRICS_SUPPORTED_LOCK = threading.Lock()
def set_metrics_supported(self, supported):
with self._METRICS_SUPPORTED_LOCK:
self.METRICS_SUPPORTED = supported
def current_container(self):
"""Returns the current MetricsContainer."""
sampler = statesampler.get_current_tracker()
if sampler is None:
return None
return sampler.current_state().metrics_container
MetricsEnvironment = _MetricsEnvironment()
class MetricsContainer(object):
"""Holds the metrics of a single step and a single bundle."""
def __init__(self, step_name):
self.step_name = step_name
self.counters = defaultdict(lambda: CounterCell())
self.distributions = defaultdict(lambda: DistributionCell())
self.gauges = defaultdict(lambda: GaugeCell())
def get_counter(self, metric_name):
return self.counters[metric_name]
def get_distribution(self, metric_name):
return self.distributions[metric_name]
def get_gauge(self, metric_name):
return self.gauges[metric_name]
def _get_updates(self, filter=None):
"""Return cumulative values of metrics filtered according to a lambda.
This returns all the cumulative values for all metrics after filtering
then with the filter parameter lambda function. If None is passed in,
then cumulative values for all metrics are returned.
"""
if filter is None:
filter = lambda v: True
counters = {MetricKey(self.step_name, k): v.get_cumulative()
for k, v in self.counters.items()
if filter(v)}
distributions = {MetricKey(self.step_name, k): v.get_cumulative()
for k, v in self.distributions.items()
if filter(v)}
gauges = {MetricKey(self.step_name, k): v.get_cumulative()
for k, v in self.gauges.items()
if filter(v)}
return MetricUpdates(counters, distributions, gauges)
def get_updates(self):
"""Return cumulative values of metrics that changed since the last commit.
This returns all the cumulative values for all metrics only if their state
prior to the function call was COMMITTING or DIRTY.
"""
return self._get_updates(filter=lambda v: v.commit.before_commit())
def get_cumulative(self):
"""Return MetricUpdates with cumulative values of all metrics in container.
This returns all the cumulative values for all metrics regardless of whether
they have been committed or not.
"""
return self._get_updates()
def to_runner_api(self):
return (
[beam_fn_api_pb2.Metrics.User(
metric_name=k.to_runner_api(),
counter_data=beam_fn_api_pb2.Metrics.User.CounterData(
value=v.get_cumulative()))
for k, v in self.counters.items()] +
[beam_fn_api_pb2.Metrics.User(
metric_name=k.to_runner_api(),
distribution_data=v.get_cumulative().to_runner_api())
for k, v in self.distributions.items()] +
[beam_fn_api_pb2.Metrics.User(
metric_name=k.to_runner_api(),
gauge_data=v.get_cumulative().to_runner_api())
for k, v in self.gauges.items()]
)
def to_runner_api_monitoring_infos(self, transform_id):
"""Returns a list of MonitoringInfos for the metrics in this container."""
all_user_metrics = []
for k, v in self.counters.items():
all_user_metrics.append(monitoring_infos.int64_user_counter(
k.namespace, k.name,
v.to_runner_api_monitoring_info(),
ptransform=transform_id
))
for k, v in self.distributions.items():
all_user_metrics.append(monitoring_infos.int64_user_distribution(
k.namespace, k.name,
v.get_cumulative().to_runner_api_monitoring_info(),
ptransform=transform_id
))
for k, v in self.gauges.items():
all_user_metrics.append(monitoring_infos.int64_user_gauge(
k.namespace, k.name,
v.get_cumulative().to_runner_api_monitoring_info(),
ptransform=transform_id
))
return {monitoring_infos.to_key(mi) : mi for mi in all_user_metrics}
def reset(self):
for counter in self.counters.values():
counter.reset()
for distribution in self.distributions.values():
distribution.reset()
for gauge in self.gauges.values():
gauge.reset()
class MetricUpdates(object):
"""Contains updates for several metrics.
A metric update is an object containing information to update a metric.
For Distribution metrics, it is DistributionData, and for Counter metrics,
it's an int.
"""
def __init__(self, counters=None, distributions=None, gauges=None):
"""Create a MetricUpdates object.
Args:
counters: Dictionary of MetricKey:MetricUpdate updates.
distributions: Dictionary of MetricKey:MetricUpdate objects.
gauges: Dictionary of MetricKey:MetricUpdate objects.
"""
self.counters = counters or {}
self.distributions = distributions or {}
self.gauges = gauges or {}