| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| # This script is used to run Change Point Analysis using a config file. |
| # config file holds the parameters required to fetch data, and to run the |
| # change point analysis. Change Point Analysis is used to find Performance |
| # regressions for benchmark/load/performance test. |
| |
| import argparse |
| import logging |
| import os |
| import uuid |
| from datetime import datetime |
| from datetime import timezone |
| from typing import Any |
| from typing import Dict |
| from typing import Optional |
| |
| import pandas as pd |
| |
| from apache_beam.testing.analyzers import constants |
| from apache_beam.testing.analyzers.perf_analysis_utils import GitHubIssueMetaData |
| from apache_beam.testing.analyzers.perf_analysis_utils import create_performance_alert |
| from apache_beam.testing.analyzers.perf_analysis_utils import fetch_metric_data |
| from apache_beam.testing.analyzers.perf_analysis_utils import find_latest_change_point_index |
| from apache_beam.testing.analyzers.perf_analysis_utils import get_existing_issues_data |
| from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window |
| from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert |
| from apache_beam.testing.analyzers.perf_analysis_utils import publish_issue_metadata_to_big_query |
| from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config |
| from apache_beam.testing.analyzers.perf_analysis_utils import validate_config |
| from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsFetcher |
| |
| |
| def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): |
| """ |
| Args: |
| params: Dict containing parameters to run change point analysis. |
| test_id: Test id for the current test. |
| big_query_metrics_fetcher: BigQuery metrics fetcher used to fetch data for |
| change point analysis. |
| Returns: |
| bool indicating if a change point is observed and alerted on GitHub. |
| """ |
| logging.info("Running change point analysis for test %s" % test_name) |
| if not validate_config(params.keys()): |
| raise ValueError( |
| f"Please make sure all these keys {constants._PERF_TEST_KEYS} " |
| f"are specified for the {test_name}") |
| |
| metric_name = params['metric_name'] |
| |
| min_runs_between_change_points = ( |
| constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS) |
| if 'min_runs_between_change_points' in params: |
| min_runs_between_change_points = params['min_runs_between_change_points'] |
| |
| num_runs_in_change_point_window = ( |
| constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW) |
| if 'num_runs_in_change_point_window' in params: |
| num_runs_in_change_point_window = params['num_runs_in_change_point_window'] |
| |
| metric_values, timestamps = fetch_metric_data( |
| params=params, |
| big_query_metrics_fetcher=big_query_metrics_fetcher |
| ) |
| |
| change_point_index = find_latest_change_point_index( |
| metric_values=metric_values) |
| if not change_point_index: |
| logging.info("Change point is not detected for the test %s" % test_name) |
| return False |
| # since timestamps are ordered in ascending order and |
| # num_runs_in_change_point_window refers to the latest runs, |
| # latest_change_point_run can help determine if the change point |
| # index is recent wrt num_runs_in_change_point_window |
| latest_change_point_run = len(timestamps) - 1 - change_point_index |
| if not is_change_point_in_valid_window(num_runs_in_change_point_window, |
| latest_change_point_run): |
| logging.info( |
| 'Performance regression/improvement found for the test: %s. ' |
| 'on metric %s. Since the change point run %s ' |
| 'lies outside the num_runs_in_change_point_window distance: %s, ' |
| 'alert is not raised.' % ( |
| test_name, |
| metric_name, |
| latest_change_point_run + 1, |
| num_runs_in_change_point_window)) |
| return False |
| |
| is_alert = True |
| last_reported_issue_number = None |
| issue_metadata_table_name = f'{params.get("metrics_table")}_{metric_name}' |
| existing_issue_data = get_existing_issues_data( |
| table_name=issue_metadata_table_name, |
| big_query_metrics_fetcher=big_query_metrics_fetcher) |
| |
| if existing_issue_data is not None: |
| existing_issue_timestamps = existing_issue_data[ |
| constants._CHANGE_POINT_TIMESTAMP_LABEL].tolist() |
| last_reported_issue_number = existing_issue_data[ |
| constants._ISSUE_NUMBER].tolist()[0] |
| # convert numpy.int64 to int |
| last_reported_issue_number = last_reported_issue_number.item() |
| |
| is_alert = is_perf_alert( |
| previous_change_point_timestamps=existing_issue_timestamps, |
| change_point_index=change_point_index, |
| timestamps=timestamps, |
| min_runs_between_change_points=min_runs_between_change_points) |
| if is_alert: |
| issue_number, issue_url = create_performance_alert( |
| metric_name, test_name, timestamps, |
| metric_values, change_point_index, |
| params.get('labels', None), |
| last_reported_issue_number, |
| test_description = params.get('test_description', None), |
| ) |
| |
| issue_metadata = GitHubIssueMetaData( |
| issue_timestamp=pd.Timestamp( |
| datetime.now().replace(tzinfo=timezone.utc)), |
| # BQ doesn't allow '.' in table name |
| test_name=test_name.replace('.', '_'), |
| metric_name=metric_name, |
| test_id=uuid.uuid4().hex, |
| change_point=metric_values[change_point_index], |
| issue_number=issue_number, |
| issue_url=issue_url, |
| change_point_timestamp=timestamps[change_point_index]) |
| |
| publish_issue_metadata_to_big_query( |
| issue_metadata=issue_metadata, table_name=issue_metadata_table_name) |
| |
| return is_alert |
| |
| |
| def run(config_file_path: Optional[str] = None) -> None: |
| """ |
| run is the entry point to run change point analysis on test metric |
| data, which is read from config file, and if there is a performance |
| regression/improvement observed for a test, an alert |
| will filed with GitHub Issues. |
| |
| If config_file_path is None, then the run method will use default |
| config file to read the required perf test parameters. |
| |
| Please take a look at the README for more information on the parameters |
| defined in the config file. |
| |
| """ |
| if config_file_path is None: |
| config_file_path = os.path.join( |
| os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml') |
| |
| tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path) |
| |
| big_query_metrics_fetcher = BigQueryMetricsFetcher() |
| |
| for test_name, params in tests_config.items(): |
| run_change_point_analysis( |
| params=params, |
| test_name=test_name, |
| big_query_metrics_fetcher=big_query_metrics_fetcher) |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.INFO) |
| |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| '--config_file_path', |
| default=None, |
| type=str, |
| help='Path to the config file that contains data to run the Change Point ' |
| 'Analysis.The default file will used will be ' |
| 'apache_beam/testing/analyzers/tests.config.yml. ' |
| 'If you would like to use the Change Point Analysis for finding ' |
| 'performance regression in the tests, ' |
| 'please provide an .yml file in the same structure as the above ' |
| 'mentioned file. ') |
| known_args, unknown_args = parser.parse_known_args() |
| |
| if unknown_args: |
| logging.warning('Discarding unknown arguments : %s ' % unknown_args) |
| |
| run(known_args.config_file_path) |