blob: e4fd440531192b8b5dfc91e665fc443649524c45 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
DirectRunner implementation of MetricResults. It is in charge not only of
responding to queries of current metrics, but also of keeping the common
state consistent.
"""
# pytype: skip-file
import threading
from collections import defaultdict
from apache_beam.metrics.cells import CounterAggregator
from apache_beam.metrics.cells import DistributionAggregator
from apache_beam.metrics.cells import GaugeAggregator
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricResult
from apache_beam.metrics.metric import MetricResults
class DirectMetrics(MetricResults):
def __init__(self):
self._counters = defaultdict(lambda: DirectMetric(CounterAggregator()))
self._distributions = defaultdict(
lambda: DirectMetric(DistributionAggregator()))
self._gauges = defaultdict(lambda: DirectMetric(GaugeAggregator()))
def _apply_operation(self, bundle, updates, op):
for k, v in updates.counters.items():
op(self._counters[k], bundle, v)
for k, v in updates.distributions.items():
op(self._distributions[k], bundle, v)
for k, v in updates.gauges.items():
op(self._gauges[k], bundle, v)
def commit_logical(self, bundle, updates):
op = lambda obj, bundle, update: obj.commit_logical(bundle, update)
self._apply_operation(bundle, updates, op)
def commit_physical(self, bundle, updates):
op = lambda obj, bundle, update: obj.commit_physical(bundle, update)
self._apply_operation(bundle, updates, op)
def update_physical(self, bundle, updates):
op = lambda obj, bundle, update: obj.update_physical(bundle, update)
self._apply_operation(bundle, updates, op)
def query(self, filter=None):
counters = [
MetricResult(
MetricKey(k.step, k.metric),
v.extract_committed(),
v.extract_latest_attempted()) for k,
v in self._counters.items() if self.matches(filter, k)
]
distributions = [
MetricResult(
MetricKey(k.step, k.metric),
v.extract_committed(),
v.extract_latest_attempted()) for k,
v in self._distributions.items() if self.matches(filter, k)
]
gauges = [
MetricResult(
MetricKey(k.step, k.metric),
v.extract_committed(),
v.extract_latest_attempted()) for k,
v in self._gauges.items() if self.matches(filter, k)
]
return {
self.COUNTERS: counters,
self.DISTRIBUTIONS: distributions,
self.GAUGES: gauges
}
class DirectMetric(object):
""" Keeps a consistent state for a single metric.
It keeps track of the metric's physical and logical updates.
It's thread safe.
"""
def __init__(self, aggregator):
self.aggregator = aggregator
self._attempted_lock = threading.Lock()
self.finished_attempted = aggregator.identity_element()
self.inflight_attempted = {}
self._committed_lock = threading.Lock()
self.finished_committed = aggregator.identity_element()
def commit_logical(self, bundle, update):
with self._committed_lock:
self.finished_committed = self.aggregator.combine(
update, self.finished_committed)
def commit_physical(self, bundle, update):
with self._attempted_lock:
self.inflight_attempted[bundle] = update
self.finished_attempted = self.aggregator.combine(
update, self.finished_attempted)
del self.inflight_attempted[bundle]
def update_physical(self, bundle, update):
self.inflight_attempted[bundle] = update
def extract_committed(self):
return self.aggregator.result(self.finished_committed)
def extract_latest_attempted(self):
res = self.finished_attempted
for _, u in self.inflight_attempted.items():
res = self.aggregator.combine(res, u)
return self.aggregator.result(res)