| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| This file contains internal metric cell classes. A metric cell is used to |
| accumulate in-memory changes to a metric. It represents a specific metric |
| in a single context. |
| |
| For internal use only. No backwards compatibility guarantees. |
| """ |
| |
| # pytype: skip-file |
| |
| from typing import TYPE_CHECKING |
| from typing import Optional |
| |
| from apache_beam.metrics.cells import MetricAggregator |
| from apache_beam.metrics.cells import MetricCell |
| from apache_beam.metrics.cells import MetricCellFactory |
| from apache_beam.utils.histogram import Histogram |
| |
| if TYPE_CHECKING: |
| from apache_beam.utils.histogram import BucketType |
| |
| |
| class HistogramCell(MetricCell): |
| """For internal use only; no backwards-compatibility guarantees. |
| |
| Tracks the current value and delta for a histogram metric. |
| |
| Each cell tracks the state of a metric independently per context per bundle. |
| Therefore, each metric has a different cell in each bundle, that is later |
| aggregated. |
| |
| This class is thread safe since underlying histogram object is thread safe. |
| """ |
| def __init__(self, bucket_type): |
| self._bucket_type = bucket_type |
| self.data = HistogramAggregator(bucket_type).identity_element() |
| |
| def reset(self): |
| self.data = HistogramAggregator(self._bucket_type).identity_element() |
| |
| def combine(self, other): |
| # type: (HistogramCell) -> HistogramCell |
| result = HistogramCell(self._bucket_type) |
| result.data = self.data.combine(other.data) |
| return result |
| |
| def update(self, value): |
| self.data.histogram.record(value) |
| |
| def get_cumulative(self): |
| # type: () -> HistogramData |
| return self.data.get_cumulative() |
| |
| def to_runner_api_monitoring_info(self, name, transform_id): |
| # Histogram metric is currently worker-local and internal |
| # use only. This method should be implemented when runners |
| # support Histogram metric reporting. |
| return None |
| |
| |
| class HistogramCellFactory(MetricCellFactory): |
| def __init__(self, bucket_type): |
| self._bucket_type = bucket_type |
| |
| def __call__(self): |
| return HistogramCell(self._bucket_type) |
| |
| def __eq__(self, other): |
| if not isinstance(other, HistogramCellFactory): |
| return False |
| return self._bucket_type == other._bucket_type |
| |
| def __hash__(self): |
| return hash(self._bucket_type) |
| |
| |
| class HistogramResult(object): |
| def __init__(self, data): |
| # type: (HistogramData) -> None |
| self.data = data |
| |
| def __eq__(self, other): |
| if isinstance(other, HistogramResult): |
| return self.data == other.data |
| else: |
| return False |
| |
| def __hash__(self): |
| return hash(self.data) |
| |
| def __repr__(self): |
| return '<HistogramResult({})>'.format( |
| self.data.histogram.get_percentile_info()) |
| |
| @property |
| def p99(self): |
| return self.data.histogram.p99() |
| |
| @property |
| def p95(self): |
| return self.data.histogram.p95() |
| |
| @property |
| def p90(self): |
| return self.data.histogram.p90() |
| |
| |
| class HistogramData(object): |
| """For internal use only; no backwards-compatibility guarantees. |
| |
| The data structure that holds data about a histogram metric. |
| |
| This object is not thread safe, so it's not supposed to be modified |
| outside the HistogramCell. |
| """ |
| def __init__(self, histogram): |
| self.histogram = histogram |
| |
| def __eq__(self, other): |
| return self.histogram == other.histogram |
| |
| def __hash__(self): |
| return hash(self.histogram) |
| |
| def __repr__(self): |
| return 'HistogramData({})'.format(self.histogram.get_percentile_info()) |
| |
| def get_cumulative(self): |
| # type: () -> HistogramData |
| return HistogramData(self.histogram) |
| |
| def combine(self, other): |
| # type: (Optional[HistogramData]) -> HistogramData |
| if other is None: |
| return self |
| |
| return HistogramData(self.histogram.combine(other.histogram)) |
| |
| |
| class HistogramAggregator(MetricAggregator): |
| """For internal use only; no backwards-compatibility guarantees. |
| |
| Aggregator for Histogram metric data during pipeline execution. |
| |
| Values aggregated should be ``HistogramData`` objects. |
| """ |
| def __init__(self, bucket_type): |
| # type: (BucketType) -> None |
| self._bucket_type = bucket_type |
| |
| def identity_element(self): |
| # type: () -> HistogramData |
| return HistogramData(Histogram(self._bucket_type)) |
| |
| def combine(self, x, y): |
| # type: (HistogramData, HistogramData) -> HistogramData |
| return x.combine(y) |
| |
| def result(self, x): |
| # type: (HistogramData) -> HistogramResult |
| return HistogramResult(x.get_cumulative()) |