blob: 855f54c840263aea3be373b7bf4fac60d9b443da [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
from apache_beam.metrics.cells import CellCommitState
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.execution import ScopedMetricsContainer
from apache_beam.metrics.metric import Metrics
from apache_beam.metrics.metricbase import MetricName
class TestMetricsContainer(unittest.TestCase):
def test_create_new_counter(self):
mc = MetricsContainer('astep')
self.assertFalse(MetricName('namespace', 'name') in mc.counters)
mc.get_counter(MetricName('namespace', 'name'))
self.assertTrue(MetricName('namespace', 'name') in mc.counters)
def test_scoped_container(self):
c1 = MetricsContainer('mystep')
c2 = MetricsContainer('myinternalstep')
with ScopedMetricsContainer(c1):
self.assertEqual(c1, MetricsEnvironment.current_container())
counter = Metrics.counter('ns', 'name')
counter.inc(2)
with ScopedMetricsContainer(c2):
self.assertEqual(c2, MetricsEnvironment.current_container())
counter = Metrics.counter('ns', 'name')
counter.inc(3)
self.assertEqual(
c2.get_cumulative().counters.items(),
[(MetricKey('myinternalstep', MetricName('ns', 'name')), 3)])
self.assertEqual(c1, MetricsEnvironment.current_container())
counter = Metrics.counter('ns', 'name')
counter.inc(4)
self.assertEqual(
c1.get_cumulative().counters.items(),
[(MetricKey('mystep', MetricName('ns', 'name')), 6)])
def test_add_to_counter(self):
mc = MetricsContainer('astep')
counter = mc.get_counter(MetricName('namespace', 'name'))
counter.inc()
counter = mc.get_counter(MetricName('namespace', 'name'))
self.assertEqual(counter.value, 1)
def test_get_cumulative_or_updates(self):
mc = MetricsContainer('astep')
clean_values = []
dirty_values = []
for i in range(1, 11):
counter = mc.get_counter(MetricName('namespace', 'name{}'.format(i)))
distribution = mc.get_distribution(
MetricName('namespace', 'name{}'.format(i)))
counter.inc(i)
distribution.update(i)
if i % 2 == 0:
# Some are left to be DIRTY (i.e. not yet committed).
# Some are left to be CLEAN (i.e. already committed).
dirty_values.append(i)
continue
# Assert: Counter/Distribution is DIRTY or COMMITTING (not CLEAN)
self.assertEqual(distribution.commit.before_commit(), True)
self.assertEqual(counter.commit.before_commit(), True)
distribution.commit.after_commit()
counter.commit.after_commit()
# Assert: Counter/Distribution has been committed, therefore it's CLEAN
self.assertEqual(counter.commit.state, CellCommitState.CLEAN)
self.assertEqual(distribution.commit.state, CellCommitState.CLEAN)
clean_values.append(i)
# Retrieve NON-COMMITTED updates.
logical = mc.get_updates()
self.assertEqual(len(logical.counters), 5)
self.assertEqual(len(logical.distributions), 5)
self.assertEqual(set(dirty_values),
set([v for _, v in logical.counters.items()]))
# Retrieve ALL updates.
cumulative = mc.get_cumulative()
self.assertEqual(len(cumulative.counters), 10)
self.assertEqual(len(cumulative.distributions), 10)
self.assertEqual(set(dirty_values + clean_values),
set([v for _, v in cumulative.counters.items()]))
class TestMetricsEnvironment(unittest.TestCase):
def test_uses_right_container(self):
c1 = MetricsContainer('step1')
c2 = MetricsContainer('step2')
counter = Metrics.counter('ns', 'name')
MetricsEnvironment.set_current_container(c1)
counter.inc()
MetricsEnvironment.set_current_container(c2)
counter.inc(3)
MetricsEnvironment.unset_current_container()
self.assertEqual(
c1.get_cumulative().counters.items(),
[(MetricKey('step1', MetricName('ns', 'name')), 1)])
self.assertEqual(
c2.get_cumulative().counters.items(),
[(MetricKey('step2', MetricName('ns', 'name')), 3)])
def test_no_container(self):
self.assertEqual(MetricsEnvironment.current_container(),
None)
if __name__ == '__main__':
unittest.main()