blob: 989917648b1fc6c8bc189a7ffd692cab32afd717 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Tests corresponding to the DataflowRunner implementation of MetricsResult,
the DataflowMetrics class.
"""
from __future__ import absolute_import
import types
import unittest
from builtins import object
import mock
from apache_beam.metrics.cells import DistributionData
from apache_beam.metrics.cells import DistributionResult
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricResult
from apache_beam.metrics.metricbase import MetricName
from apache_beam.runners.dataflow import dataflow_metrics
from apache_beam.testing import metric_result_matchers
from apache_beam.testing.metric_result_matchers import MetricResultMatcher
class DictToObject(object):
"""Translate from a dict(list()) structure to an object structure"""
def __init__(self, data):
for name, value in data.items():
setattr(self, name, self._wrap(value))
def _wrap(self, value):
if isinstance(value, (tuple, list, set, frozenset)):
return type(value)([self._wrap(v) for v in value])
return DictToObject(value) if isinstance(value, dict) else value
class TestDataflowMetrics(unittest.TestCase):
# TODO(BEAM-6734): Write a dump tool to generate this fake data, or
# somehow make this easier to maintain.
ONLY_COUNTERS_LIST = {"metrics": [
{"name": {"context":
{"additionalProperties": [
{"key": "namespace",
"value": "__main__.WordExtractingDoFn"},
{"key": "step",
"value": "s2"},
{"key": "tentative",
"value": "true"}]
},
"name": "words",
"origin": "user"
},
"scalar": {"integer_value": 26185},
"distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
{"name": {"context":
{"additionalProperties": [
{"key": "namespace",
"value": "__main__.WordExtractingDoFn"},
{"key": "step",
"value": "s2"}]
},
"name": "words",
"origin": "user"
},
"scalar": {"integer_value": 26181},
"distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
{"name": {"context":
{"additionalProperties": [
{"key": "namespace",
"value": "__main__.WordExtractingDoFn"},
{"key": "step",
"value": "s2"},
{"key": "tentative",
"value": "true"}]
},
"name": "empty_lines",
"origin": "user"
},
"scalar": {"integer_value": 1080},
"distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
{"name": {"context":
{"additionalProperties": [
{"key": "namespace",
"value": "__main__.WordExtractingDoFn"},
{"key": "step",
"value": "s2"}]
},
"name": "empty_lines",
"origin": "user"
},
"scalar": {"integer_value": 1080},
"distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
]}
STRUCTURED_COUNTER_LIST = {"metrics": [
{"name": {"context":
{"additionalProperties": [
{"key": "namespace",
"value": "__main__.WordExtractingDoFn"},
{"key": "step",
"value": "s2"},
{"key": "tentative",
"value": "true"}]
},
"name": "word_lengths",
"origin": "user"
},
"scalar": {"integer_value": 109475},
"distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
{"name": {"context":
{"additionalProperties": [
{"key": "namespace",
"value": "__main__.WordExtractingDoFn"},
{"key": "step",
"value": "s2"}]
},
"name": "word_lengths",
"origin": "user"
},
"scalar": {"integer_value": 109475},
"distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
{"name": {"context":
{"additionalProperties": [
{"key": "namespace",
"value": "__main__.WordExtractingDoFn"},
{"key": "step",
"value": "s2"},
{"key": "tentative",
"value": "true"}]
},
"name": "word_length_dist",
"origin": "user"
},
"scalar": None,
"distribution": {
"object_value": {
"properties": [
{"key": "min", "value":
{"integer_value": 2}},
{"key": "max", "value":
{"integer_value": 16}},
{"key": "count", "value":
{"integer_value": 2}},
{"key": "mean", "value":
{"integer_value": 9}},
{"key": "sum", "value":
{"integer_value": 18}},]
}
},
"updateTime": "2017-03-22T18:47:06.402Z"
},
{"name": {"context":
{"additionalProperties": [
{"key": "namespace",
"value": "__main__.WordExtractingDoFn"},
{"key": "step",
"value": "s2"}]
},
"name": "word_length_dist",
"origin": "user"
},
"scalar": None,
"distribution": {
"object_value": {
"properties": [
{"key": "min", "value":
{"integer_value": 2}},
{"key": "max", "value":
{"integer_value": 16}},
{"key": "count", "value":
{"integer_value": 2}},
{"key": "mean", "value":
{"integer_value": 9}},
{"key": "sum", "value":
{"integer_value": 18}},
]
}
},
"updateTime": "2017-03-22T18:47:06.402Z"
},
]}
SYSTEM_COUNTERS_LIST = {"metrics": [
# ElementCount
{"name": {"context":
{"additionalProperties": [
{"key": "original_name",
"value": "ToIsmRecordForMultimap-out0-ElementCount"},
{"key": "output_user_name",
"value": "ToIsmRecordForMultimap-out0"}
]
},
"name": "ElementCount",
"origin": "dataflow/v1b3"
},
"scalar": {"integer_value": 42},
"distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
{"name": {"context":
{"additionalProperties": [
{"key": "original_name",
"value": "ToIsmRecordForMultimap-out0-ElementCount"},
{"key": "output_user_name",
"value": "ToIsmRecordForMultimap-out0"},
{"key": "tentative",
"value": "true"}
]
},
"name": "ElementCount",
"origin": "dataflow/v1b3"
},
"scalar": {"integer_value": 42},
"distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
# MeanByteCount
{"name": {"context":
{"additionalProperties": [
{"key": "original_name",
"value": "Read-out0-MeanByteCount"},
{"key": "output_user_name",
"value": "GroupByKey/Read-out0"}
]
},
"name": "MeanByteCount",
"origin": "dataflow/v1b3"
},
"scalar": {"integer_value": 31},
"distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
{"name": {"context":
{"additionalProperties": [
{"key": "original_name",
"value": "Read-out0-MeanByteCount"},
{"key": "output_user_name",
"value": "GroupByKey/Read-out0"},
{"key": "tentative",
"value": "true"}
]
},
"name": "MeanByteCount",
"origin": "dataflow/v1b3"
},
"scalar": {"integer_value": 31},
"distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
# ExecutionTime
{"name": {"context":
{"additionalProperties": [
{"key": "step",
"value": "write/Write/Write"},
]
},
"name": "ExecutionTime_ProcessElement",
"origin": "dataflow/v1b3"
},
"scalar": {"integer_value": 1000},
"distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
{"name": {"context":
{"additionalProperties": [
{"key": "step",
"value": "write/Write/Write"},
{"key": "tentative",
"value": "true"}
]
},
"name": "ExecutionTime_ProcessElement",
"origin": "dataflow/v1b3"
},
"scalar": {"integer_value": 1000},
"distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
]}
def setup_mock_client_result(self, counter_list=None):
mock_client = mock.Mock()
mock_query_result = DictToObject(counter_list)
mock_client.get_job_metrics.return_value = mock_query_result
mock_job_result = mock.Mock()
mock_job_result.job_id.return_value = 1
mock_job_result.is_in_terminal_state.return_value = False
return mock_client, mock_job_result
def test_cache_functions(self):
mock_client, mock_job_result = self.setup_mock_client_result(
self.STRUCTURED_COUNTER_LIST)
dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
# At first creation, we should always query dataflow.
self.assertTrue(dm._cached_metrics is None)
# Right after querying, we still query again.
dm.query()
self.assertTrue(dm._cached_metrics is None)
# The job has ended. The query should not run again after this.
mock_job_result.is_in_terminal_state.return_value = True
dm.query()
self.assertTrue(dm._cached_metrics)
def test_query_structured_metrics(self):
mock_client, mock_job_result = self.setup_mock_client_result(
self.STRUCTURED_COUNTER_LIST)
dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
dm._translate_step_name = types.MethodType(lambda self, x: 'split', dm)
query_result = dm.query()
expected_counters = [
MetricResult(
MetricKey('split',
MetricName('__main__.WordExtractingDoFn', 'word_lengths'),
),
109475, 109475),
]
self.assertEqual(query_result['counters'], expected_counters)
expected_distributions = [
MetricResult(
MetricKey('split',
MetricName('__main__.WordExtractingDoFn',
'word_length_dist'),
),
DistributionResult(DistributionData(
18, 2, 2, 16)),
DistributionResult(DistributionData(
18, 2, 2, 16))),
]
self.assertEqual(query_result['distributions'], expected_distributions)
def test_query_counters(self):
mock_client, mock_job_result = self.setup_mock_client_result(
self.ONLY_COUNTERS_LIST)
dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
dm._translate_step_name = types.MethodType(lambda self, x: 'split', dm)
query_result = dm.query()
expected_counters = [
MetricResult(
MetricKey('split',
MetricName('__main__.WordExtractingDoFn', 'empty_lines')
),
1080, 1080),
MetricResult(
MetricKey('split',
MetricName('__main__.WordExtractingDoFn', 'words')),
26181, 26185),
]
self.assertEqual(sorted(query_result['counters'],
key=lambda x: x.key.metric.name),
sorted(expected_counters,
key=lambda x: x.key.metric.name))
def test_system_counters_set_labels_and_step_name(self):
mock_client, mock_job_result = self.setup_mock_client_result(
self.SYSTEM_COUNTERS_LIST)
test_object = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
all_metrics = test_object.all_metrics()
matchers = [
MetricResultMatcher(
name='ElementCount',
labels={
'original_name' : 'ToIsmRecordForMultimap-out0-ElementCount',
'output_user_name' : 'ToIsmRecordForMultimap-out0'
},
attempted=42,
committed=42
),
MetricResultMatcher(
name='MeanByteCount',
labels={
'original_name' : 'Read-out0-MeanByteCount',
'output_user_name' : 'GroupByKey/Read-out0'
},
attempted=31,
committed=31
),
MetricResultMatcher(
name='ExecutionTime_ProcessElement',
step='write/Write/Write',
attempted=1000,
committed=1000
)
]
errors = metric_result_matchers.verify_all(all_metrics, matchers)
self.assertFalse(errors, errors)
if __name__ == '__main__':
unittest.main()