blob: 2d91e201d4283c114abb3809b16bb01497b29b68 [file]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import abc
import logging
from dataclasses import asdict
from dataclasses import dataclass
from statistics import median
from typing import Optional
from typing import Union
import pandas as pd
import yaml
from google.api_core import exceptions
from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
from apache_beam.testing.analyzers import constants
from apache_beam.testing.load_tests import load_test_metrics_utils
from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
# pylint: disable=ungrouped-imports
try:
from google.cloud import bigquery
except ImportError:
bigquery = None # type: ignore
@dataclass(frozen=True)
class GitHubIssueMetaData:
"""
This class holds metadata that needs to be published to the
BigQuery when a GitHub issue is created on a performance
alert.
"""
issue_timestamp: pd.Timestamp
change_point_timestamp: pd.Timestamp
test_name: str
metric_name: str
issue_number: int
issue_url: str
test_id: str
change_point: float
@dataclass
class ChangePointConfig:
"""
This class holds the change point configuration parameters.
"""
min_runs_between_change_points: int = (
constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS)
num_runs_in_change_point_window: int = (
constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW)
median_abs_deviation_threshold: int = (
constants._DEFAULT_MEDIAN_ABS_DEVIATION_THRESHOLD)
@dataclass
class TestConfigContainer:
metric_name: str
project: str
metrics_dataset: str
metrics_table: str
test_id: str # unique id for each test config.
test_description: str
test_name: Optional[str] = None
labels: Optional[list[str]] = None
@dataclass
class MetricContainer:
"""
This class holds the metric values and timestamps for a given metric.
Args:
metric_values: List of metric values.
timestamps: List of pandas timestamps corresponding to the metric values.
"""
values: list[Union[int, float]]
timestamps: list[pd.Timestamp]
def sort_by_timestamp(self, in_place=True):
"""
Sorts the metric values and timestamps in ascending order wrt timestamps.
Args:
in_place: If True, sort the metric values and timestamps in place.
"""
timestamps, values = zip(*sorted(zip(self.timestamps, self.values)))
if not in_place:
return MetricContainer(values=values, timestamps=timestamps)
self.timestamps, self.values = zip(*sorted(
zip(self.timestamps, self.values)))
def is_change_point_in_valid_window(
num_runs_in_change_point_window: int, latest_change_point_run: int) -> bool:
return num_runs_in_change_point_window > latest_change_point_run
def get_existing_issues_data(table_name: str) -> Optional[pd.DataFrame]:
"""
Finds the most recent GitHub issue created for the test_name.
If no table found with name=test_name, return (None, None)
else return latest created issue_number along with
"""
query = f"""
SELECT * FROM {constants._BQ_PROJECT_NAME}.{constants._BQ_DATASET}.{table_name}
ORDER BY {constants._ISSUE_CREATION_TIMESTAMP_LABEL} DESC
LIMIT 10
"""
try:
if bigquery is None:
raise ImportError('Bigquery dependencies are not installed.')
client = bigquery.Client()
query_job = client.query(query=query)
existing_issue_data = query_job.result().to_dataframe()
except exceptions.NotFound:
# If no table found, that means this is first performance regression
# on the current test+metric.
return None
return existing_issue_data
def is_sibling_change_point(
previous_change_point_timestamps: list[pd.Timestamp],
change_point_index: int,
timestamps: list[pd.Timestamp],
min_runs_between_change_points: int,
test_id: str,
) -> bool:
"""
Sibling change points are the change points that are close to each other.
Search the previous_change_point_timestamps with current observed
change point sibling window and determine if it is a duplicate
change point or not.
timestamps are expected to be in ascending order.
Return False if the current observed change point is a duplicate of
already reported change points else return True.
"""
sibling_change_point_min_timestamp = timestamps[max(
0, change_point_index - min_runs_between_change_points)]
sibling_change_point_max_timestamp = timestamps[min(
change_point_index + min_runs_between_change_points, len(timestamps) - 1)]
# Search a list of previous change point timestamps and compare it with
# current change point timestamp. We do this in case, if a current change
# point is already reported in the past.
for previous_change_point_timestamp in previous_change_point_timestamps:
if (sibling_change_point_min_timestamp <= previous_change_point_timestamp <=
sibling_change_point_max_timestamp):
logging.info(
'Performance regression/improvement found for the test ID: %s. '
'Since the change point timestamp %s '
'lies within the sibling change point window: %s, '
'alert is not raised.' % (
test_id,
previous_change_point_timestamp.strftime('%Y-%m-%d %H:%M:%S'),
(
sibling_change_point_min_timestamp.strftime(
'%Y-%m-%d %H:%M:%S'),
sibling_change_point_max_timestamp.strftime(
'%Y-%m-%d %H:%M:%S'))))
return False
return True
def read_test_config(config_file_path: str) -> dict:
"""
Reads the config file in which the data required to
run the change point analysis is specified.
"""
with open(config_file_path, 'r') as stream:
config = yaml.safe_load(stream)
return config
def validate_config(keys):
return constants._PERF_TEST_KEYS.issubset(keys)
def find_change_points(metric_values: list[Union[float, int]]):
return e_divisive(metric_values)
def find_latest_change_point_index(
metric_values: list[Union[float, int]],
median_abs_deviation_threshold: int = 2):
"""
Args:
metric_values: Metric values used to run change point analysis.
Returns:
int: Right most change point index observed on metric_values.
"""
change_points_indices = find_change_points(metric_values)
# reduce noise in the change point analysis by filtering out
# the change points that are not significant enough.
change_points_indices = filter_change_points_by_median_threshold(
metric_values,
change_points_indices,
threshold=0.1,
median_abs_deviation_threshold=median_abs_deviation_threshold)
# Consider the latest change point.
if not change_points_indices:
return None
change_points_indices.sort()
# Remove the change points that are at the edges of the data.
# https://github.com/apache/beam/issues/28757
# Remove this workaround once we have a good solution to deal
# with the edge change points.
change_point_index = change_points_indices[-1]
if is_edge_change_point(change_point_index,
len(metric_values),
constants._EDGE_SEGMENT_SIZE):
logging.info(
'The change point %s is located at the edge of the data with an edge '
'segment size of %s. This change point will be ignored for now, '
'awaiting additional data. Should the change point persist after '
'gathering more data, an alert will be raised.' %
(change_point_index, constants._EDGE_SEGMENT_SIZE))
return None
return change_point_index
def publish_issue_metadata_to_big_query(
issue_metadata,
table_name,
project=constants._BQ_PROJECT_NAME,
):
"""
Published issue_metadata to BigQuery with table name.
"""
bq_metrics_publisher = BigQueryMetricsPublisher(
project_name=project,
dataset=constants._BQ_DATASET,
table=table_name,
bq_schema=constants._SCHEMA)
bq_metrics_publisher.publish([asdict(issue_metadata)])
logging.info(
'GitHub metadata is published to Big Query Dataset %s'
', table %s' % (constants._BQ_DATASET, table_name))
def create_performance_alert(
test_config_container: TestConfigContainer,
metric_container: MetricContainer,
change_point_index: int,
existing_issue_number: Optional[int],
) -> tuple[int, str]:
"""
Creates performance alert on GitHub issues and returns GitHub issue
number and issue URL.
"""
# avoid circular imports
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.testing.analyzers import github_issues_utils
description = github_issues_utils.get_issue_description(
test_config_container=test_config_container,
metric_container=metric_container,
change_point_index=change_point_index,
max_results_to_display=(
constants._NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION))
issue_number, issue_url = github_issues_utils.report_change_point_on_issues(
title=github_issues_utils._ISSUE_TITLE_TEMPLATE.format(
test_config_container.test_id, test_config_container.metric_name
),
description=description,
labels=test_config_container.labels,
existing_issue_number=existing_issue_number)
logging.info(
'Performance regression/improvement is alerted on issue #%s. Link '
': %s' % (issue_number, issue_url))
return issue_number, issue_url
def filter_change_points_by_median_threshold(
data: list[Union[int, float]],
change_points: list[int],
threshold: float = 0.05,
median_abs_deviation_threshold: int = 2,
):
"""
Reduces the number of change points by filtering out the ones that are
not significant enough based on the relative median threshold. Default
value of threshold is 0.05.
"""
valid_change_points = []
for idx in change_points:
if idx == 0 or idx == len(data):
continue
left_segment = data[:idx]
right_segment = data[idx:]
left_value = median(left_segment)
right_value = median(right_segment)
# MAD (Median Absolute Deviation) is a robust measure of variability.
# A low MAD indicates that the data points are tightly clustered around the
# median, while a high MAD suggests greater spread.
left_mad = median([abs(x - left_value) for x in left_segment])
right_mad = median([abs(x - right_value) for x in right_segment])
# The change is considered significant only if the absolute difference
# between the medians of the two segments is greater than a threshold
# times the median absolute deviation (MAD) of the respective segments.
# This approach helps to filter out changes that are not statistically
# significant, making the detection more robust to noise and outliers.
if abs(right_value - left_value) > (median_abs_deviation_threshold *
(left_mad + right_mad)):
valid_change_points.append(idx)
continue
return valid_change_points
def is_edge_change_point(
change_point_index,
data_size,
edge_segment_size=constants._EDGE_SEGMENT_SIZE):
"""
Removes the change points that are at the edges of the data.
Args:
change_point_index: Index of the change point.
data_size: Size of the data.
edge_segment_size: Size of the edge segment.
"""
return change_point_index > data_size - edge_segment_size
class MetricsFetcher(metaclass=abc.ABCMeta):
@abc.abstractmethod
def fetch_metric_data(
self, *, test_config: TestConfigContainer) -> MetricContainer:
"""
Define SQL query and fetch the timestamp values and metric values
from BigQuery tables.
"""
raise NotImplementedError
class BigQueryMetricsFetcher(MetricsFetcher):
def fetch_metric_data(
self, *, test_config: TestConfigContainer) -> MetricContainer:
"""
Args:
test_config: TestConfigContainer containing metadata required to fetch
metric data from BigQuery.
Returns:
MetricContainer containing metric values and timestamps.
"""
project = test_config.project
metrics_dataset = test_config.metrics_dataset
metrics_table = test_config.metrics_table
metric_name = test_config.metric_name
query = f"""
SELECT *
FROM {project}.{metrics_dataset}.{metrics_table}
WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), '{metric_name}')
ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC
LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS}
"""
if bigquery is None:
raise ImportError('Bigquery dependencies are not installed.')
client = bigquery.Client()
query_job = client.query(query=query)
metric_data = query_job.result().to_dataframe()
# metric_data.sort_values(
# by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True)
return MetricContainer(
values=metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(),
timestamps=metric_data[
load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist())