Rename the two different ChangePoint classes for clarity The recent work in #96 to replace the original external dependency on the so called "signal processing" repository with our own implementation, introduced new classess ChangePoint and CandidateChangePoint, in change_point_divisive/base.py but also left in place the original ChangePoint class in analysis.py. These come together in series.py, where the newer is renamed as _ChangePoint() and also acts as a parent to older class, thus aligning their signature as much as possible. It turns out having two similarly named classes can be a source of confusion and bugs. For example, in #141 vishnuchalla fixes a bug that is due to this and has essentially blocked the --orig-edivisive code path completely. This patch is an effort to make the existence of two separate classes very explicit, by renaming them to ChagePointHunter and ChangePointOtava based on their "lineage". A test case is added to exercise the --orig-edivisive code path. The test fails, as predicted by #141. The test is now cmmented out. The bug is due to a missing cp.metric property in one variation of the ChangePoint class. Note that this patch is intended more for discussion than to merge.
diff --git a/otava/analysis.py b/otava/analysis.py index 5ff8975..e57453b 100644 --- a/otava/analysis.py +++ b/otava/analysis.py
@@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import copy from dataclasses import dataclass from typing import List, Optional, Sequence, SupportsFloat, Tuple @@ -24,7 +25,7 @@ from otava.change_point_divisive.base import ( BaseStats, CandidateChangePoint, - ChangePoint, + ChangePointOtava, GenericStats, SignificanceTester, ) @@ -97,11 +98,11 @@ # Generic Change Point List -GenCPList = List[ChangePoint[GenericStats]] +GenCPList = List[ChangePointOtava[GenericStats]] # Permutation Change Point List -PermCPList = List[ChangePoint[PermutationStats]] +PermCPList = List[ChangePointOtava[PermutationStats]] # T-test Change Point List -TtestCPList = List[ChangePoint[TTestStats]] +TtestCPList = List[ChangePointOtava[TTestStats]] class TTestSignificanceTester(SignificanceTester): @@ -130,7 +131,7 @@ def change_point( self, candidate: CandidateChangePoint, series: Sequence[SupportsFloat], intervals: List[slice] - ) -> ChangePoint[TTestStats]: + ) -> ChangePointOtava[TTestStats]: """ Computes properties of the change point if the Candidate Change Point based on the provided intervals. @@ -167,7 +168,7 @@ left = series[left_interval] right = series[right_interval] stats = self.compare(left, right) - return ChangePoint.from_candidate(candidate, stats) + return ChangePointOtava.from_candidate(candidate, stats) def fill_missing(data: Sequence[SupportsFloat]): @@ -323,4 +324,4 @@ """ first_pass_pvalue = max_pvalue * 10 if max_pvalue < 0.05 else (max_pvalue * 2 if max_pvalue < 0.5 else max_pvalue) weak_change_points = split(series, window_len, first_pass_pvalue, new_points=new_data, old_cp=old_weak_cp) - return merge(weak_change_points, series, max_pvalue, min_magnitude), weak_change_points + return merge(copy.copy(weak_change_points), series, max_pvalue, min_magnitude), weak_change_points
diff --git a/otava/bigquery.py b/otava/bigquery.py index 34f329d..33816b9 100644 --- a/otava/bigquery.py +++ b/otava/bigquery.py
@@ -22,7 +22,7 @@ from google.cloud import bigquery from google.oauth2 import service_account -from otava.analysis import ChangePoint +from otava.analysis import ChangePointOtava from otava.test_config import BigQueryTestConfig @@ -87,7 +87,7 @@ test: BigQueryTestConfig, metric_name: str, attributes: Dict, - change_point: ChangePoint, + change_point: ChangePointOtava, ): kwargs = {**attributes, **{test.time_column: datetime.utcfromtimestamp(change_point.time)}} update_stmt = test.update_stmt.format(
diff --git a/otava/change_point_divisive/base.py b/otava/change_point_divisive/base.py index edde8e8..e50d2d4 100644 --- a/otava/change_point_divisive/base.py +++ b/otava/change_point_divisive/base.py
@@ -39,7 +39,7 @@ @dataclass -class ChangePoint(CandidateChangePoint, Generic[GenericStats]): +class ChangePointOtava(CandidateChangePoint, Generic[GenericStats]): '''Change point class, defined by index and signigicance test statistic.''' stats: GenericStats @@ -48,7 +48,7 @@ return isinstance(other, self.__class__) and self.index == other.index @classmethod - def from_candidate(cls, candidate: CandidateChangePoint, stats: GenericStats) -> 'ChangePoint[GenericStats]': + def from_candidate(cls, candidate: CandidateChangePoint, stats: GenericStats) -> 'ChangePointOtava[GenericStats]': return cls( index=candidate.index, qhat=candidate.qhat, @@ -67,7 +67,7 @@ def __init__(self, max_pvalue: float): self.max_pvalue = max_pvalue - def get_intervals(self, change_points: List[ChangePoint[GenericStats]]) -> List[slice]: + def get_intervals(self, change_points: List[ChangePointOtava[GenericStats]]) -> List[slice]: '''Returns list of slices of the series. Change points must be sorted by index.''' assert all( change_points[i].index <= change_points[i + 1].index @@ -82,12 +82,12 @@ ] return [interval for interval in intervals if interval.start != interval.stop] - def is_significant(self, point: ChangePoint[GenericStats]) -> bool: - '''Compares ChangePoint to level of significance max_pvalue''' + def is_significant(self, point: ChangePointOtava[GenericStats]) -> bool: + '''Compares ChangePointOtava to level of significance max_pvalue''' return point.stats.pvalue <= self.max_pvalue - def change_point(self, candidate: CandidateChangePoint, series: NDArray, intervals: List[slice]) -> ChangePoint[GenericStats]: - '''Computes stats for a change point candidate and wraps it into ChangePoint class''' + def change_point(self, candidate: CandidateChangePoint, series: NDArray, intervals: List[slice]) -> ChangePointOtava[GenericStats]: + '''Computes stats for a change point candidate and wraps it into ChangePointOtava class''' ...
diff --git a/otava/change_point_divisive/detector.py b/otava/change_point_divisive/detector.py index 8945372..e2026e9 100644 --- a/otava/change_point_divisive/detector.py +++ b/otava/change_point_divisive/detector.py
@@ -21,7 +21,7 @@ from otava.change_point_divisive.base import ( Calculator, - ChangePoint, + ChangePointOtava, GenericStats, SignificanceTester, ) @@ -32,7 +32,7 @@ self.tester = significance_tester self.calculator = calculator - def get_change_points(self, series: Sequence[SupportsFloat], start: Optional[int] = None, end: Optional[int] = None) -> List[ChangePoint[GenericStats]]: + def get_change_points(self, series: Sequence[SupportsFloat], start: Optional[int] = None, end: Optional[int] = None) -> List[ChangePointOtava[GenericStats]]: '''Finds change points in `series[start : end]`.''' if not isinstance(series, np.ndarray): series = np.array(series[start : end], dtype=np.float64)
diff --git a/otava/change_point_divisive/significance_test.py b/otava/change_point_divisive/significance_test.py index 7e348c7..aa5a8f2 100644 --- a/otava/change_point_divisive/significance_test.py +++ b/otava/change_point_divisive/significance_test.py
@@ -25,7 +25,7 @@ BaseStats, Calculator, CandidateChangePoint, - ChangePoint, + ChangePointOtava, SignificanceTester, ) @@ -50,7 +50,7 @@ self.seed = seed self.rng = np.random.default_rng(seed) - def change_point(self, candidate: CandidateChangePoint, series: NDArray, intervals: List[slice]) -> ChangePoint[PermutationStats]: + def change_point(self, candidate: CandidateChangePoint, series: NDArray, intervals: List[slice]) -> ChangePointOtava[PermutationStats]: '''Perform permutation test within candidate cluster''' # 1. Find permutated Qhats @@ -74,4 +74,4 @@ extreme_qhat_perm=extreme_qhat_perm, n_perm=self.permutations ) - return ChangePoint.from_candidate(candidate, stats) + return ChangePointOtava.from_candidate(candidate, stats)
diff --git a/otava/postgres.py b/otava/postgres.py index 40ef53d..5a30aa1 100644 --- a/otava/postgres.py +++ b/otava/postgres.py
@@ -21,7 +21,7 @@ import pg8000 -from otava.analysis import ChangePoint +from otava.analysis import ChangePointOtava from otava.test_config import PostgresTestConfig @@ -88,7 +88,7 @@ test: PostgresTestConfig, metric_name: str, attributes: Dict, - change_point: ChangePoint, + change_point: ChangePointOtava, ): cursor = self.__get_conn().cursor() kwargs = {**attributes, **{test.time_column: datetime.utcfromtimestamp(change_point.time)}}
diff --git a/otava/series.py b/otava/series.py index bb3bf47..1318b8d 100644 --- a/otava/series.py +++ b/otava/series.py
@@ -27,7 +27,7 @@ compute_change_points_orig, fill_missing, ) -from otava.change_point_divisive.base import ChangePoint as _ChangePoint +from otava.change_point_divisive.base import ChangePointOtava @dataclass @@ -72,7 +72,7 @@ @dataclass -class ChangePoint(_ChangePoint[TTestStats]): +class ChangePointHunter(ChangePointOtava[TTestStats]): """A change-point for a single metric""" metric: str time: int @@ -140,7 +140,7 @@ prev_time: int attributes: Dict[str, str] prev_attributes: Dict[str, str] - changes: List[ChangePoint] + changes: List[ChangePointHunter] def to_json(self, rounded=False): return { @@ -215,11 +215,11 @@ __series: Series options: AnalysisOptions - change_points: Dict[str, List[ChangePoint]] + change_points: Dict[str, List[ChangePointHunter]] change_points_by_time: List[ChangePointGroup] change_points_timestamp: Any - def __init__(self, series: Series, options: AnalysisOptions, change_points: Dict[str, ChangePoint] = None): + def __init__(self, series: Series, options: AnalysisOptions, change_points: Dict[str, ChangePointHunter] = None): self.__series = series self.options = options self.change_points_timestamp = datetime.now(tz=timezone.utc) @@ -235,7 +235,7 @@ @staticmethod def __compute_change_points( series: Series, options: AnalysisOptions - ) -> Dict[str, List[ChangePoint]]: + ) -> Dict[str, List[ChangePointHunter]]: result = {} weak_change_points = {} for metric in series.data.keys(): @@ -258,13 +258,13 @@ ) for c in weak_cps: weak_change_points[metric].append( - ChangePoint( + ChangePointHunter( index=c.index, qhat=0.0, time=series.time[c.index], metric=metric, stats=c.stats ) ) for c in change_points: result[metric].append( - ChangePoint( + ChangePointHunter( index=c.index, qhat=0.0, time=series.time[c.index], metric=metric, stats=c.stats ) ) @@ -274,9 +274,9 @@ @staticmethod def __group_change_points_by_time( - series: Series, change_points: Dict[str, List[ChangePoint]] + series: Series, change_points: Dict[str, List[ChangePointHunter]] ) -> List[ChangePointGroup]: - changes: List[ChangePoint] = [] + changes: List[ChangePointHunter] = [] for metric in change_points.keys(): changes += change_points[metric] @@ -381,14 +381,14 @@ result[metric] = [] for c in change_points: result[metric].append( - ChangePoint( + ChangePointHunter( index=c.index, qhat=0.0, time=self.__series.time[c.index], metric=metric, stats=c.stats ) ) weak_change_points[metric] = [] for c in weak_cps: weak_change_points[metric].append( - ChangePoint( + ChangePointHunter( index=c.index, qhat=0.0, time=self.__series.time[c.index], metric=metric, stats=c.stats ) ) @@ -493,7 +493,7 @@ pvalue=cp["pvalue"], ) new_list.append( - ChangePoint( + ChangePointHunter( index=cp["index"], time=cp["time"], metric=cp["metric"], stats=stat ) ) @@ -511,7 +511,7 @@ pvalue=cp["pvalue"], ) new_list.append( - ChangePoint( + ChangePointHunter( index=cp["index"], time=cp["time"], metric=cp["metric"], stats=stat ) )
diff --git a/tests/change_point_divisive_test.py b/tests/change_point_divisive_test.py index 45d4431..950d982 100644 --- a/tests/change_point_divisive_test.py +++ b/tests/change_point_divisive_test.py
@@ -19,7 +19,7 @@ import pytest from otava.analysis import TTestSignificanceTester, TTestStats -from otava.change_point_divisive.base import ChangePoint +from otava.change_point_divisive.base import ChangePointOtava from otava.change_point_divisive.calculator import PairDistanceCalculator from otava.change_point_divisive.detector import ChangePointDetector from otava.change_point_divisive.significance_test import PermutationsSignificanceTester @@ -124,9 +124,9 @@ # Sorted change points should work sorted_cps = [ - ChangePoint(index=5, qhat=1.0, stats=stats), - ChangePoint(index=10, qhat=1.0, stats=stats), - ChangePoint(index=15, qhat=1.0, stats=stats), + ChangePointOtava(index=5, qhat=1.0, stats=stats), + ChangePointOtava(index=10, qhat=1.0, stats=stats), + ChangePointOtava(index=15, qhat=1.0, stats=stats), ] intervals = tester.get_intervals(sorted_cps) assert len(intervals) == 4 @@ -137,9 +137,9 @@ # Unsorted change points should raise AssertionError unsorted_cps = [ - ChangePoint(index=10, qhat=1.0, stats=stats), - ChangePoint(index=5, qhat=1.0, stats=stats), - ChangePoint(index=15, qhat=1.0, stats=stats), + ChangePointOtava(index=10, qhat=1.0, stats=stats), + ChangePointOtava(index=5, qhat=1.0, stats=stats), + ChangePointOtava(index=15, qhat=1.0, stats=stats), ] with pytest.raises(AssertionError, match="Change points must be sorted by index"): tester.get_intervals(unsorted_cps)
diff --git a/tests/cli_options_test.py b/tests/cli_options_test.py new file mode 100644 index 0000000..d578f14 --- /dev/null +++ b/tests/cli_options_test.py
@@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import csv +import tempfile +import textwrap +import unittest +from datetime import datetime, timedelta +from pathlib import Path +from unittest.mock import patch + +import otava.series +from otava.main import script_main + + +class CliOptionsTest(unittest.TestCase): + + # Test --deterministic-edivisive in various ways + def test_no_cli_option(self): + with patch('otava.series.compute_change_points') as mock_split: + script_main(args=[]) + mock_split.assert_not_called() + + def test_default_cli_option(self): + with patch('otava.series.compute_change_points') as mock_split: + mock_split.return_value = ([], []) + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + csv_path, timestamps, config_path, test_name = _create_files_in_temp_dir(td_path) + # _uv_run(td_path, test_name) + config_path_str = "" + str(config_path) + script_main(args=["analyze", "--config", config_path_str, test_name]) + + assert otava.series.compute_change_points.call_count == 2 + + # Failing due to lack of cp.metric see pull#141 + # def test_orig_cli_option(self): + # with patch('otava.series.compute_change_points') as mock_orig: + # mock_orig.return_value = ([], None) + # with tempfile.TemporaryDirectory() as td: + # td_path = Path(td) + # csv_path, timestamps, config_path, test_name = _create_files_in_temp_dir(td_path) + # # _uv_run(td_path, test_name) + # config_path_str = "" + str(config_path) + # script_main(args=["analyze", "--config", config_path_str, "--orig-edivisive", "true", test_name]) + # + # assert otava.series.compute_change_points_orig.call_count == 2 + + +def _create_files_in_temp_dir(td_path: Path): + data_dir = td_path / "data" + data_dir.mkdir(parents=True, exist_ok=True) + + csv_path, timestamps = _create_csv_data_file_for_test(td_path) + config_path, test_name = _create_csv_config_file_for_test(td_path) + + return csv_path, timestamps, config_path, test_name + + +def _create_csv_data_file_for_test(td_path: Path): + # create data directory and write CSV + data_dir = td_path / "data" + csv_path = data_dir / "local_sample.csv" + + # Generate some CSV content + now = datetime.now() + n = 10 + timestamps = [now - timedelta(days=i) for i in range(n)] + metrics1 = [154023, 138455, 143112, 149190, 132098, 151344, 155145, 148889, 149466, 148209] + metrics2 = [10.43, 10.23, 10.29, 10.91, 10.34, 10.69, 9.23, 9.11, 9.13, 9.03] + data_points = [] + for i in range(n): + data_points.append( + ( + timestamps[i].strftime("%Y.%m.%d %H:%M:%S %z"), # time + "aaa" + str(i), # commit + metrics1[i], + metrics2[i], + ) + ) + + with open(csv_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["time", "commit", "metric1", "metric2"]) + writer.writerows(data_points) + + return csv_path, timestamps + + +def _create_csv_config_file_for_test(td_path: Path): + data_dir = td_path / "data" + csv_path = str(data_dir / "local_sample.csv") + + config_content = textwrap.dedent( + """\ + tests: + sample_for_test: + type: csv + file: """ + csv_path + """ + time_column: time + attributes: [commit] + metrics: [metric1, metric2] + csv_options: + delimiter: "," + quotechar: "'" + """ + ) + config_path = td_path / "otava.yaml" + config_path.write_text(config_content, encoding="utf-8") + return config_path, "sample_for_test"
diff --git a/tests/postgres_e2e_test.py b/tests/postgres_e2e_test.py index 5da9768..58bb03e 100644 --- a/tests/postgres_e2e_test.py +++ b/tests/postgres_e2e_test.py
@@ -80,7 +80,9 @@ 2025-04-27 10:03:02 +0000 aggregate-0af4ccbc 0af4ccbc 1 56950 2052 13532 """ ) - assert _remove_trailing_whitespaces(proc.stdout) == expected_output.rstrip("\n") + out = _remove_trailing_whitespaces(proc.stdout) + print(out) + assert out == expected_output.rstrip("\n") # Verify the DB was updated with the detected change. # Query the updated change metric at the detected change point.
diff --git a/tests/report_test.py b/tests/report_test.py index 3692a79..a220eb5 100644 --- a/tests/report_test.py +++ b/tests/report_test.py
@@ -40,6 +40,13 @@ @pytest.fixture(scope="module") def change_points(series): + # o = AnalysisOptions() + # print("default AnalysisOptions are...") + # print(o) + # o.max_pvalue = 0.001 + # print("setting them to") + # print(o) + # return series.analyze(options=o).change_points_by_time return series.analyze().change_points_by_time @@ -51,6 +58,7 @@ def test_report(series, change_points): report = Report(series, change_points) output = report.produce_report("test", ReportType.LOG) + assert "series1" in output assert "series2" in output assert "1.02" in output
diff --git a/tests/slack_notification_test.py b/tests/slack_notification_test.py index 7bc4705..0f0b81e 100644 --- a/tests/slack_notification_test.py +++ b/tests/slack_notification_test.py
@@ -99,6 +99,7 @@ since=since_time, ) dispatches = mock_client.dispatches + assert list(dispatches.keys()) == NOTIFICATION_CHANNELS, "Wrong channels were notified" for channel in NOTIFICATION_CHANNELS: assert len(dispatches[channel]) == 1, "Unexpected number of Slack messages created"
diff --git a/tests/tigerbeetle_test.py b/tests/tigerbeetle_test.py index 94a9019..77ea524 100644 --- a/tests/tigerbeetle_test.py +++ b/tests/tigerbeetle_test.py
@@ -19,6 +19,10 @@ from otava.analysis import compute_change_points +def tigerbeetle_demo_data(): + return _get_series() + + def _get_series(): """ This is the Tigerbeetle dataset used for demo purposes at Nyrkiö.