blob: cb18dc785eca95e258d1608f20707995195c0c38 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
import unittest
from builtins import object
from apache_beam.metrics.cells import DistributionData
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metric import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.metrics.metricbase import MetricName
from apache_beam.runners.worker import statesampler
from apache_beam.utils import counters
class NameTest(unittest.TestCase):
def test_basic_metric_name(self):
name = MetricName('namespace1', 'name1')
self.assertEqual(name.namespace, 'namespace1')
self.assertEqual(name.name, 'name1')
self.assertEqual(name, MetricName('namespace1', 'name1'))
key = MetricKey('step1', name)
self.assertEqual(key.step, 'step1')
self.assertEqual(key.metric.namespace, 'namespace1')
self.assertEqual(key.metric.name, 'name1')
self.assertEqual(key, MetricKey('step1', MetricName('namespace1', 'name1')))
class MetricResultsTest(unittest.TestCase):
def test_metric_filter_namespace_matching(self):
filter = MetricsFilter().with_namespace('ns1')
name = MetricName('ns1', 'name1')
key = MetricKey('step1', name)
self.assertTrue(MetricResults.matches(filter, key))
def test_metric_filter_name_matching(self):
filter = MetricsFilter().with_name('name1').with_namespace('ns1')
name = MetricName('ns1', 'name1')
key = MetricKey('step1', name)
self.assertTrue(MetricResults.matches(filter, key))
filter = MetricsFilter().with_name('name1')
name = MetricName('ns1', 'name1')
key = MetricKey('step1', name)
self.assertTrue(MetricResults.matches(filter, key))
def test_metric_filter_step_matching(self):
name = MetricName('ns1', 'name1')
filter = MetricsFilter().with_step('Step1')
key = MetricKey('Step1', name)
self.assertTrue(MetricResults.matches(filter, key))
key = MetricKey('Step10', name)
self.assertFalse(MetricResults.matches(filter, key))
key = MetricKey('Step10/Step1', name)
self.assertTrue(MetricResults.matches(filter, key))
key = MetricKey('Top1/Outer1/Inner1', name)
filter = MetricsFilter().with_step('Top1/Outer1/Inner1')
self.assertTrue(MetricResults.matches(filter, key))
filter = MetricsFilter().with_step('Top1/Outer1')
self.assertTrue(MetricResults.matches(filter, key))
filter = MetricsFilter().with_step('Outer1/Inner1')
self.assertTrue(MetricResults.matches(filter, key))
filter = MetricsFilter().with_step('Top1/Inner1')
self.assertFalse(MetricResults.matches(filter, key))
class MetricsTest(unittest.TestCase):
def test_get_namespace_class(self):
class MyClass(object):
pass
self.assertEqual('{}.{}'.format(MyClass.__module__, MyClass.__name__),
Metrics.get_namespace(MyClass))
def test_get_namespace_string(self):
namespace = 'MyNamespace'
self.assertEqual(namespace, Metrics.get_namespace(namespace))
def test_get_namespace_error(self):
with self.assertRaises(ValueError):
Metrics.get_namespace(object())
def test_counter_empty_name(self):
with self.assertRaises(ValueError):
Metrics.counter("namespace", "")
def test_counter_empty_namespace(self):
with self.assertRaises(ValueError):
Metrics.counter("", "names")
def test_distribution_empty_name(self):
with self.assertRaises(ValueError):
Metrics.distribution("namespace", "")
def test_distribution_empty_namespace(self):
with self.assertRaises(ValueError):
Metrics.distribution("", "names")
def test_create_counter_distribution(self):
sampler = statesampler.StateSampler('', counters.CounterFactory())
statesampler.set_current_tracker(sampler)
state1 = sampler.scoped_state('mystep', 'myState',
metrics_container=MetricsContainer('mystep'))
try:
sampler.start()
with state1:
counter_ns = 'aCounterNamespace'
distro_ns = 'aDistributionNamespace'
name = 'a_name'
counter = Metrics.counter(counter_ns, name)
distro = Metrics.distribution(distro_ns, name)
counter.inc(10)
counter.dec(3)
distro.update(10)
distro.update(2)
self.assertTrue(isinstance(counter, Metrics.DelegatingCounter))
self.assertTrue(isinstance(distro, Metrics.DelegatingDistribution))
del distro
del counter
container = MetricsEnvironment.current_container()
self.assertEqual(
container.get_counter(
MetricName(counter_ns, name)).get_cumulative(),
7)
self.assertEqual(
container.get_distribution(
MetricName(distro_ns, name)).get_cumulative(),
DistributionData(12, 2, 2, 10))
finally:
sampler.stop()
if __name__ == '__main__':
unittest.main()