blob: 0e1b21569362ba2d3e8d36a6d8ffab1569936bb9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=invalid-name, unused-argument, import-outside-toplevel
import uuid
from contextlib import nullcontext, suppress
from typing import Optional, Union
import pandas as pd
import pytest
from flask.ctx import AppContext
from pytest_mock import MockerFixture
from superset.commands.report.exceptions import AlertQueryError
from superset.reports.models import ReportCreationMethod, ReportScheduleType
from superset.tasks.types import ExecutorType, FixedExecutor
from superset.utils.database import get_example_database
from tests.integration_tests.test_app import app
@pytest.mark.parametrize(
"owner_names,creator_name,config,expected_result",
[
(["gamma"], None, [FixedExecutor("admin")], "admin"),
(["gamma"], None, [ExecutorType.OWNER], "gamma"),
(
["alpha", "gamma"],
"gamma",
[ExecutorType.CREATOR_OWNER],
"gamma",
),
(
["alpha", "gamma"],
"alpha",
[ExecutorType.CREATOR_OWNER],
"alpha",
),
(
["alpha", "gamma"],
"admin",
[ExecutorType.CREATOR_OWNER],
AlertQueryError(),
),
(["gamma"], None, [ExecutorType.CURRENT_USER], AlertQueryError()),
],
)
def test_execute_query_as_report_executor(
owner_names: list[str],
creator_name: Optional[str],
config: list[ExecutorType],
expected_result: Union[tuple[ExecutorType, str], Exception],
mocker: MockerFixture,
app_context: AppContext,
get_user,
) -> None:
from superset.commands.report.alert import AlertCommand
from superset.reports.models import ReportSchedule
original_config = app.config["ALERT_REPORTS_EXECUTORS"]
app.config["ALERT_REPORTS_EXECUTORS"] = config
owners = [get_user(owner_name) for owner_name in owner_names]
report_schedule = ReportSchedule(
created_by=get_user(creator_name) if creator_name else None,
owners=owners,
type=ReportScheduleType.ALERT,
description="description",
crontab="0 9 * * *",
creation_method=ReportCreationMethod.ALERTS_REPORTS,
sql="SELECT 1",
grace_period=14400,
working_timeout=3600,
database=get_example_database(),
validator_config_json='{"op": "==", "threshold": 1}',
)
command = AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4())
override_user_mock = mocker.patch("superset.commands.report.alert.override_user")
cm = (
pytest.raises(type(expected_result))
if isinstance(expected_result, Exception)
else nullcontext()
)
with cm:
command.run()
assert override_user_mock.call_args[0][0].username == expected_result
app.config["ALERT_REPORTS_EXECUTORS"] = original_config
def test_execute_query_mutate_query_enabled(
mocker: MockerFixture,
app_context: AppContext,
get_user,
) -> None:
from superset.commands.report.alert import AlertCommand
from superset.reports.models import ReportSchedule
default_alert_mutate_ff = app.config["MUTATE_ALERT_QUERY"]
app.config["MUTATE_ALERT_QUERY"] = True
mocker.patch("superset.commands.report.alert.override_user")
mock_df = mocker.MagicMock(spec=pd.DataFrame)
mock_df.empty = True
mock_database = get_example_database()
mock_get_df = mocker.patch.object(mock_database, "get_df", return_value=mock_df)
mock_limited_sql = mocker.patch.object(mock_database, "apply_limit_to_sql")
mock_mutate_call = mocker.patch.object(mock_database, "mutate_sql_based_on_config")
report_schedule = ReportSchedule(
created_by=get_user("admin"),
owners=[get_user("admin")],
type=ReportScheduleType.ALERT,
description="description",
crontab="0 9 * * *",
creation_method=ReportCreationMethod.ALERTS_REPORTS,
sql="SELECT 1",
grace_period=14400,
working_timeout=3600,
database=mock_database,
validator_config_json='{"op": "==", "threshold": 1}',
)
AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run()
mock_mutate_call.assert_called_once_with(mock_limited_sql.return_value)
mock_get_df.assert_called_once_with(sql=mock_mutate_call.return_value)
app.config["MUTATE_ALERT_QUERY"] = default_alert_mutate_ff
def test_execute_query_mutate_query_disabled(
mocker: MockerFixture,
app_context: AppContext,
get_user,
) -> None:
from superset.commands.report.alert import AlertCommand
from superset.reports.models import ReportSchedule
default_alert_mutate_ff = app.config["MUTATE_ALERT_QUERY"]
app.config["MUTATE_ALERT_QUERY"] = False
mocker.patch("superset.commands.report.alert.override_user")
mock_database = mocker.MagicMock()
report_schedule = ReportSchedule(
created_by=get_user("admin"),
owners=[get_user("admin")],
type=ReportScheduleType.ALERT,
description="description",
crontab="0 9 * * *",
creation_method=ReportCreationMethod.ALERTS_REPORTS,
sql="SELECT 1",
grace_period=14400,
working_timeout=3600,
database=mock_database,
validator_config_json='{"op": "==", "threshold": 1}',
)
AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run()
mock_database.mutate_sql_based_on_config.assert_not_called()
mock_database.get_df.assert_called_once_with(
sql=mock_database.apply_limit_to_sql.return_value
)
app.config["MUTATE_ALERT_QUERY"] = default_alert_mutate_ff
def test_execute_query_succeeded_no_retry(
mocker: MockerFixture, app_context: None
) -> None:
from superset.commands.report.alert import AlertCommand
execute_query_mock = mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
side_effect=lambda: pd.DataFrame([{"sample_col": 0}]),
)
command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4())
command.validate()
assert execute_query_mock.call_count == 1
def test_execute_query_succeeded_with_retries(
mocker: MockerFixture, app_context: None
) -> None:
from superset.commands.report.alert import AlertCommand, AlertQueryError
execute_query_mock = mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query"
)
query_executed_count = 0
# Should match the value defined in superset_test_config.py
expected_max_retries = 3
def _mocked_execute_query() -> pd.DataFrame:
nonlocal query_executed_count
query_executed_count += 1
if query_executed_count < expected_max_retries:
raise AlertQueryError()
else:
return pd.DataFrame([{"sample_col": 0}])
execute_query_mock.side_effect = _mocked_execute_query
execute_query_mock.__name__ = "mocked_execute_query"
command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4())
command.validate()
assert execute_query_mock.call_count == expected_max_retries
def test_execute_query_failed_no_retry(
mocker: MockerFixture, app_context: None
) -> None:
from superset.commands.report.alert import AlertCommand, AlertQueryTimeout
execute_query_mock = mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query"
)
def _mocked_execute_query() -> None:
raise AlertQueryTimeout
execute_query_mock.side_effect = _mocked_execute_query
execute_query_mock.__name__ = "mocked_execute_query"
command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4())
with suppress(AlertQueryTimeout):
command.validate()
assert execute_query_mock.call_count == 1
def test_execute_query_failed_max_retries(
mocker: MockerFixture, app_context: None
) -> None:
from superset.commands.report.alert import AlertCommand, AlertQueryError
execute_query_mock = mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query"
)
def _mocked_execute_query() -> None:
raise AlertQueryError
execute_query_mock.side_effect = _mocked_execute_query
execute_query_mock.__name__ = "mocked_execute_query"
command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4())
with suppress(AlertQueryError):
command.validate()
# Should match the value defined in superset_test_config.py
assert execute_query_mock.call_count == 3
def test_get_alert_metadata_from_object(
mocker: MockerFixture,
app_context: AppContext,
get_user,
) -> None:
from superset.commands.report.alert import AlertCommand
from superset.reports.models import ReportSchedule
app.config["ALERT_REPORTS_EXECUTORS"] = [ExecutorType.OWNER]
mock_database = mocker.MagicMock()
mock_exec_id = uuid.uuid4()
report_schedule = ReportSchedule(
created_by=get_user("admin"),
owners=[get_user("admin")],
type=ReportScheduleType.ALERT,
description="description",
crontab="0 9 * * *",
creation_method=ReportCreationMethod.ALERTS_REPORTS,
sql="SELECT 1",
grace_period=14400,
working_timeout=3600,
database=mock_database,
validator_config_json='{"op": "==", "threshold": 1}',
)
cm = AlertCommand(report_schedule=report_schedule, execution_id=mock_exec_id)
assert cm._get_alert_metadata_from_object() == {
"report_schedule_id": report_schedule.id,
"execution_id": mock_exec_id,
}