| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # pylint: disable=invalid-name, unused-argument, import-outside-toplevel |
| import uuid |
| from contextlib import nullcontext, suppress |
| from typing import Optional, Union |
| |
| import pandas as pd |
| import pytest |
| from flask.ctx import AppContext |
| from pytest_mock import MockerFixture |
| |
| from superset.commands.report.exceptions import AlertQueryError |
| from superset.reports.models import ReportCreationMethod, ReportScheduleType |
| from superset.tasks.types import ExecutorType, FixedExecutor |
| from superset.utils.database import get_example_database |
| from tests.integration_tests.test_app import app |
| |
| |
| @pytest.mark.parametrize( |
| "owner_names,creator_name,config,expected_result", |
| [ |
| (["gamma"], None, [FixedExecutor("admin")], "admin"), |
| (["gamma"], None, [ExecutorType.OWNER], "gamma"), |
| ( |
| ["alpha", "gamma"], |
| "gamma", |
| [ExecutorType.CREATOR_OWNER], |
| "gamma", |
| ), |
| ( |
| ["alpha", "gamma"], |
| "alpha", |
| [ExecutorType.CREATOR_OWNER], |
| "alpha", |
| ), |
| ( |
| ["alpha", "gamma"], |
| "admin", |
| [ExecutorType.CREATOR_OWNER], |
| AlertQueryError(), |
| ), |
| (["gamma"], None, [ExecutorType.CURRENT_USER], AlertQueryError()), |
| ], |
| ) |
| def test_execute_query_as_report_executor( |
| owner_names: list[str], |
| creator_name: Optional[str], |
| config: list[ExecutorType], |
| expected_result: Union[tuple[ExecutorType, str], Exception], |
| mocker: MockerFixture, |
| app_context: AppContext, |
| get_user, |
| ) -> None: |
| from superset.commands.report.alert import AlertCommand |
| from superset.reports.models import ReportSchedule |
| |
| original_config = app.config["ALERT_REPORTS_EXECUTORS"] |
| app.config["ALERT_REPORTS_EXECUTORS"] = config |
| owners = [get_user(owner_name) for owner_name in owner_names] |
| report_schedule = ReportSchedule( |
| created_by=get_user(creator_name) if creator_name else None, |
| owners=owners, |
| type=ReportScheduleType.ALERT, |
| description="description", |
| crontab="0 9 * * *", |
| creation_method=ReportCreationMethod.ALERTS_REPORTS, |
| sql="SELECT 1", |
| grace_period=14400, |
| working_timeout=3600, |
| database=get_example_database(), |
| validator_config_json='{"op": "==", "threshold": 1}', |
| ) |
| command = AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()) |
| override_user_mock = mocker.patch("superset.commands.report.alert.override_user") |
| cm = ( |
| pytest.raises(type(expected_result)) |
| if isinstance(expected_result, Exception) |
| else nullcontext() |
| ) |
| with cm: |
| command.run() |
| assert override_user_mock.call_args[0][0].username == expected_result |
| |
| app.config["ALERT_REPORTS_EXECUTORS"] = original_config |
| |
| |
| def test_execute_query_mutate_query_enabled( |
| mocker: MockerFixture, |
| app_context: AppContext, |
| get_user, |
| ) -> None: |
| from superset.commands.report.alert import AlertCommand |
| from superset.reports.models import ReportSchedule |
| |
| default_alert_mutate_ff = app.config["MUTATE_ALERT_QUERY"] |
| |
| app.config["MUTATE_ALERT_QUERY"] = True |
| mocker.patch("superset.commands.report.alert.override_user") |
| mock_df = mocker.MagicMock(spec=pd.DataFrame) |
| mock_df.empty = True |
| mock_database = get_example_database() |
| mock_get_df = mocker.patch.object(mock_database, "get_df", return_value=mock_df) |
| mock_limited_sql = mocker.patch.object(mock_database, "apply_limit_to_sql") |
| mock_mutate_call = mocker.patch.object(mock_database, "mutate_sql_based_on_config") |
| |
| report_schedule = ReportSchedule( |
| created_by=get_user("admin"), |
| owners=[get_user("admin")], |
| type=ReportScheduleType.ALERT, |
| description="description", |
| crontab="0 9 * * *", |
| creation_method=ReportCreationMethod.ALERTS_REPORTS, |
| sql="SELECT 1", |
| grace_period=14400, |
| working_timeout=3600, |
| database=mock_database, |
| validator_config_json='{"op": "==", "threshold": 1}', |
| ) |
| AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run() |
| |
| mock_mutate_call.assert_called_once_with(mock_limited_sql.return_value) |
| mock_get_df.assert_called_once_with(sql=mock_mutate_call.return_value) |
| |
| app.config["MUTATE_ALERT_QUERY"] = default_alert_mutate_ff |
| |
| |
| def test_execute_query_mutate_query_disabled( |
| mocker: MockerFixture, |
| app_context: AppContext, |
| get_user, |
| ) -> None: |
| from superset.commands.report.alert import AlertCommand |
| from superset.reports.models import ReportSchedule |
| |
| default_alert_mutate_ff = app.config["MUTATE_ALERT_QUERY"] |
| |
| app.config["MUTATE_ALERT_QUERY"] = False |
| mocker.patch("superset.commands.report.alert.override_user") |
| mock_database = mocker.MagicMock() |
| |
| report_schedule = ReportSchedule( |
| created_by=get_user("admin"), |
| owners=[get_user("admin")], |
| type=ReportScheduleType.ALERT, |
| description="description", |
| crontab="0 9 * * *", |
| creation_method=ReportCreationMethod.ALERTS_REPORTS, |
| sql="SELECT 1", |
| grace_period=14400, |
| working_timeout=3600, |
| database=mock_database, |
| validator_config_json='{"op": "==", "threshold": 1}', |
| ) |
| AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run() |
| |
| mock_database.mutate_sql_based_on_config.assert_not_called() |
| mock_database.get_df.assert_called_once_with( |
| sql=mock_database.apply_limit_to_sql.return_value |
| ) |
| |
| app.config["MUTATE_ALERT_QUERY"] = default_alert_mutate_ff |
| |
| |
| def test_execute_query_succeeded_no_retry( |
| mocker: MockerFixture, app_context: None |
| ) -> None: |
| from superset.commands.report.alert import AlertCommand |
| |
| execute_query_mock = mocker.patch( |
| "superset.commands.report.alert.AlertCommand._execute_query", |
| side_effect=lambda: pd.DataFrame([{"sample_col": 0}]), |
| ) |
| |
| command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4()) |
| |
| command.validate() |
| |
| assert execute_query_mock.call_count == 1 |
| |
| |
| def test_execute_query_succeeded_with_retries( |
| mocker: MockerFixture, app_context: None |
| ) -> None: |
| from superset.commands.report.alert import AlertCommand, AlertQueryError |
| |
| execute_query_mock = mocker.patch( |
| "superset.commands.report.alert.AlertCommand._execute_query" |
| ) |
| |
| query_executed_count = 0 |
| # Should match the value defined in superset_test_config.py |
| expected_max_retries = 3 |
| |
| def _mocked_execute_query() -> pd.DataFrame: |
| nonlocal query_executed_count |
| query_executed_count += 1 |
| |
| if query_executed_count < expected_max_retries: |
| raise AlertQueryError() |
| else: |
| return pd.DataFrame([{"sample_col": 0}]) |
| |
| execute_query_mock.side_effect = _mocked_execute_query |
| execute_query_mock.__name__ = "mocked_execute_query" |
| |
| command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4()) |
| |
| command.validate() |
| |
| assert execute_query_mock.call_count == expected_max_retries |
| |
| |
| def test_execute_query_failed_no_retry( |
| mocker: MockerFixture, app_context: None |
| ) -> None: |
| from superset.commands.report.alert import AlertCommand, AlertQueryTimeout |
| |
| execute_query_mock = mocker.patch( |
| "superset.commands.report.alert.AlertCommand._execute_query" |
| ) |
| |
| def _mocked_execute_query() -> None: |
| raise AlertQueryTimeout |
| |
| execute_query_mock.side_effect = _mocked_execute_query |
| execute_query_mock.__name__ = "mocked_execute_query" |
| |
| command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4()) |
| |
| with suppress(AlertQueryTimeout): |
| command.validate() |
| assert execute_query_mock.call_count == 1 |
| |
| |
| def test_execute_query_failed_max_retries( |
| mocker: MockerFixture, app_context: None |
| ) -> None: |
| from superset.commands.report.alert import AlertCommand, AlertQueryError |
| |
| execute_query_mock = mocker.patch( |
| "superset.commands.report.alert.AlertCommand._execute_query" |
| ) |
| |
| def _mocked_execute_query() -> None: |
| raise AlertQueryError |
| |
| execute_query_mock.side_effect = _mocked_execute_query |
| execute_query_mock.__name__ = "mocked_execute_query" |
| |
| command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4()) |
| |
| with suppress(AlertQueryError): |
| command.validate() |
| # Should match the value defined in superset_test_config.py |
| assert execute_query_mock.call_count == 3 |
| |
| |
| def test_get_alert_metadata_from_object( |
| mocker: MockerFixture, |
| app_context: AppContext, |
| get_user, |
| ) -> None: |
| from superset.commands.report.alert import AlertCommand |
| from superset.reports.models import ReportSchedule |
| |
| app.config["ALERT_REPORTS_EXECUTORS"] = [ExecutorType.OWNER] |
| |
| mock_database = mocker.MagicMock() |
| mock_exec_id = uuid.uuid4() |
| report_schedule = ReportSchedule( |
| created_by=get_user("admin"), |
| owners=[get_user("admin")], |
| type=ReportScheduleType.ALERT, |
| description="description", |
| crontab="0 9 * * *", |
| creation_method=ReportCreationMethod.ALERTS_REPORTS, |
| sql="SELECT 1", |
| grace_period=14400, |
| working_timeout=3600, |
| database=mock_database, |
| validator_config_json='{"op": "==", "threshold": 1}', |
| ) |
| |
| cm = AlertCommand(report_schedule=report_schedule, execution_id=mock_exec_id) |
| assert cm._get_alert_metadata_from_object() == { |
| "report_schedule_id": report_schedule.id, |
| "execution_id": mock_exec_id, |
| } |