blob: cf226ec21d953722e0fe09f174348839bae798e4 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime, timedelta
from typing import Any, List, Optional
from flask_appbuilder.security.sqla.models import User
from sqlalchemy.orm import Session
from superset import app, thumbnail_cache
from superset.commands.base import BaseCommand
from superset.commands.exceptions import CommandException
from superset.models.reports import (
ReportExecutionLog,
ReportSchedule,
ReportScheduleType,
ReportState,
)
from superset.reports.commands.alert import AlertCommand
from superset.reports.commands.exceptions import (
ReportScheduleAlertEndGracePeriodError,
ReportScheduleAlertGracePeriodError,
ReportScheduleExecuteUnexpectedError,
ReportScheduleNotFoundError,
ReportScheduleNotificationError,
ReportSchedulePreviousWorkingError,
ReportScheduleScreenshotFailedError,
ReportScheduleSelleniumUserNotFoundError,
ReportScheduleStateNotFoundError,
ReportScheduleUnexpectedError,
ReportScheduleWorkingTimeoutError,
)
from superset.reports.dao import (
REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
ReportScheduleDAO,
)
from superset.reports.notifications import create_notification
from superset.reports.notifications.base import NotificationContent, ScreenshotData
from superset.reports.notifications.exceptions import NotificationError
from superset.utils.celery import session_scope
from superset.utils.screenshots import (
BaseScreenshot,
ChartScreenshot,
DashboardScreenshot,
)
from superset.utils.urls import get_url_path
logger = logging.getLogger(__name__)
class BaseReportState:
current_states: List[ReportState] = []
initial: bool = False
def __init__(
self,
session: Session,
report_schedule: ReportSchedule,
scheduled_dttm: datetime,
) -> None:
self._session = session
self._report_schedule = report_schedule
self._scheduled_dttm = scheduled_dttm
self._start_dttm = datetime.utcnow()
def set_state_and_log(
self, state: ReportState, error_message: Optional[str] = None,
) -> None:
"""
Updates current ReportSchedule state and TS. If on final state writes the log
for this execution
"""
now_dttm = datetime.utcnow()
self.set_state(state, now_dttm)
self.create_log(
state, error_message=error_message,
)
def set_state(self, state: ReportState, dttm: datetime) -> None:
"""
Set the current report schedule state, on this case we want to
commit immediately
"""
self._report_schedule.last_state = state
self._report_schedule.last_eval_dttm = dttm
self._session.merge(self._report_schedule)
self._session.commit()
def create_log( # pylint: disable=too-many-arguments
self, state: ReportState, error_message: Optional[str] = None,
) -> None:
"""
Creates a Report execution log, uses the current computed last_value for Alerts
"""
log = ReportExecutionLog(
scheduled_dttm=self._scheduled_dttm,
start_dttm=self._start_dttm,
end_dttm=datetime.utcnow(),
value=self._report_schedule.last_value,
value_row_json=self._report_schedule.last_value_row_json,
state=state,
error_message=error_message,
report_schedule=self._report_schedule,
)
self._session.add(log)
self._session.commit()
def _get_url(self, user_friendly: bool = False, **kwargs: Any) -> str:
"""
Get the url for this report schedule: chart or dashboard
"""
if self._report_schedule.chart:
return get_url_path(
"Superset.slice",
user_friendly=user_friendly,
slice_id=self._report_schedule.chart_id,
**kwargs,
)
return get_url_path(
"Superset.dashboard",
user_friendly=user_friendly,
dashboard_id_or_slug=self._report_schedule.dashboard_id,
**kwargs,
)
def _get_screenshot_user(self) -> User:
user = (
self._session.query(User)
.filter(User.username == app.config["THUMBNAIL_SELENIUM_USER"])
.one_or_none()
)
if not user:
raise ReportScheduleSelleniumUserNotFoundError()
return user
def _get_screenshot(self) -> ScreenshotData:
"""
Get a chart or dashboard screenshot
:raises: ReportScheduleScreenshotFailedError
"""
screenshot: Optional[BaseScreenshot] = None
if self._report_schedule.chart:
url = self._get_url(standalone="true")
screenshot = ChartScreenshot(
url,
self._report_schedule.chart.digest,
window_size=app.config["WEBDRIVER_WINDOW"]["slice"],
thumb_size=app.config["WEBDRIVER_WINDOW"]["slice"],
)
else:
url = self._get_url()
screenshot = DashboardScreenshot(
url,
self._report_schedule.dashboard.digest,
window_size=app.config["WEBDRIVER_WINDOW"]["dashboard"],
thumb_size=app.config["WEBDRIVER_WINDOW"]["dashboard"],
)
image_url = self._get_url(user_friendly=True)
user = self._get_screenshot_user()
image_data = screenshot.compute_and_cache(
user=user, cache=thumbnail_cache, force=True,
)
if not image_data:
raise ReportScheduleScreenshotFailedError()
return ScreenshotData(url=image_url, image=image_data)
def _get_notification_content(self) -> NotificationContent:
"""
Gets a notification content, this is composed by a title and a screenshot
:raises: ReportScheduleScreenshotFailedError
"""
screenshot_data = self._get_screenshot()
if self._report_schedule.chart:
name = (
f"{self._report_schedule.name}: "
f"{self._report_schedule.chart.slice_name}"
)
else:
name = (
f"{self._report_schedule.name}: "
f"{self._report_schedule.dashboard.dashboard_title}"
)
return NotificationContent(name=name, screenshot=screenshot_data)
def _send(self, notification_content: NotificationContent) -> None:
"""
Sends a notification to all recipients
:raises: ReportScheduleNotificationError
"""
notification_errors = []
for recipient in self._report_schedule.recipients:
notification = create_notification(recipient, notification_content)
try:
notification.send()
except NotificationError as ex:
# collect notification errors but keep processing them
notification_errors.append(str(ex))
if notification_errors:
raise ReportScheduleNotificationError(";".join(notification_errors))
def send(self) -> None:
"""
Creates the notification content and sends them to all recipients
:raises: ReportScheduleNotificationError
"""
notification_content = self._get_notification_content()
self._send(notification_content)
def send_error(self, name: str, message: str) -> None:
"""
Creates and sends a notification for an error, to all recipients
:raises: ReportScheduleNotificationError
"""
notification_content = NotificationContent(name=name, text=message)
self._send(notification_content)
def is_in_grace_period(self) -> bool:
"""
Checks if an alert is on it's grace period
"""
last_success = ReportScheduleDAO.find_last_success_log(
self._report_schedule, session=self._session
)
return (
last_success is not None
and self._report_schedule.grace_period
and datetime.utcnow()
- timedelta(seconds=self._report_schedule.grace_period)
< last_success.end_dttm
)
def is_in_error_grace_period(self) -> bool:
"""
Checks if an alert/report on error is on it's notification grace period
"""
last_success = ReportScheduleDAO.find_last_error_notification(
self._report_schedule, session=self._session
)
if not last_success:
return False
return (
last_success is not None
and self._report_schedule.grace_period
and datetime.utcnow()
- timedelta(seconds=self._report_schedule.grace_period)
< last_success.end_dttm
)
def is_on_working_timeout(self) -> bool:
"""
Checks if an alert is on a working timeout
"""
return (
self._report_schedule.working_timeout is not None
and self._report_schedule.last_eval_dttm is not None
and datetime.utcnow()
- timedelta(seconds=self._report_schedule.working_timeout)
> self._report_schedule.last_eval_dttm
)
def next(self) -> None:
raise NotImplementedError()
class ReportNotTriggeredErrorState(BaseReportState):
"""
Handle Not triggered and Error state
next final states:
- Not Triggered
- Success
- Error
"""
current_states = [ReportState.NOOP, ReportState.ERROR]
initial = True
def next(self) -> None:
self.set_state_and_log(ReportState.WORKING)
try:
# If it's an alert check if the alert is triggered
if self._report_schedule.type == ReportScheduleType.ALERT:
if not AlertCommand(self._report_schedule).run():
self.set_state_and_log(ReportState.NOOP)
return
self.send()
self.set_state_and_log(ReportState.SUCCESS)
except CommandException as first_ex:
self.set_state_and_log(ReportState.ERROR, error_message=str(first_ex))
# TODO (dpgaspar) convert this logic to a new state eg: ERROR_ON_GRACE
if not self.is_in_error_grace_period():
try:
self.send_error(
f"Error occurred for {self._report_schedule.type}:"
f" {self._report_schedule.name}",
str(first_ex),
)
self.set_state_and_log(
ReportState.ERROR,
error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
)
except CommandException as second_ex:
self.set_state_and_log(
ReportState.ERROR, error_message=str(second_ex)
)
raise first_ex
class ReportWorkingState(BaseReportState):
"""
Handle Working state
next states:
- Error
- Working
"""
current_states = [ReportState.WORKING]
def next(self) -> None:
if self.is_on_working_timeout():
exception_timeout = ReportScheduleWorkingTimeoutError()
self.set_state_and_log(
ReportState.ERROR, error_message=str(exception_timeout),
)
raise exception_timeout
exception_working = ReportSchedulePreviousWorkingError()
self.set_state_and_log(
ReportState.WORKING, error_message=str(exception_working),
)
raise exception_working
class ReportSuccessState(BaseReportState):
"""
Handle Success, Grace state
next states:
- Grace
- Not triggered
- Success
"""
current_states = [ReportState.SUCCESS, ReportState.GRACE]
def next(self) -> None:
self.set_state_and_log(ReportState.WORKING)
if self._report_schedule.type == ReportScheduleType.ALERT:
if self.is_in_grace_period():
self.set_state_and_log(
ReportState.GRACE,
error_message=str(ReportScheduleAlertGracePeriodError()),
)
return
self.set_state_and_log(
ReportState.NOOP,
error_message=str(ReportScheduleAlertEndGracePeriodError()),
)
return
try:
self.send()
self.set_state_and_log(ReportState.SUCCESS)
except CommandException as ex:
self.set_state_and_log(ReportState.ERROR, error_message=str(ex))
class ReportScheduleStateMachine: # pylint: disable=too-few-public-methods
"""
Simple state machine for Alerts/Reports states
"""
states_cls = [ReportWorkingState, ReportNotTriggeredErrorState, ReportSuccessState]
def __init__(
self,
session: Session,
report_schedule: ReportSchedule,
scheduled_dttm: datetime,
):
self._session = session
self._report_schedule = report_schedule
self._scheduled_dttm = scheduled_dttm
def run(self) -> None:
state_found = False
for state_cls in self.states_cls:
if (self._report_schedule.last_state is None and state_cls.initial) or (
self._report_schedule.last_state in state_cls.current_states
):
state_cls(
self._session, self._report_schedule, self._scheduled_dttm
).next()
state_found = True
break
if not state_found:
raise ReportScheduleStateNotFoundError()
class AsyncExecuteReportScheduleCommand(BaseCommand):
"""
Execute all types of report schedules.
- On reports takes chart or dashboard screenshots and sends configured notifications
- On Alerts uses related Command AlertCommand and sends configured notifications
"""
def __init__(self, model_id: int, scheduled_dttm: datetime):
self._model_id = model_id
self._model: Optional[ReportSchedule] = None
self._scheduled_dttm = scheduled_dttm
def run(self) -> None:
with session_scope(nullpool=True) as session:
try:
self.validate(session=session)
if not self._model:
raise ReportScheduleExecuteUnexpectedError()
ReportScheduleStateMachine(
session, self._model, self._scheduled_dttm
).run()
except CommandException as ex:
raise ex
except Exception as ex:
raise ReportScheduleUnexpectedError(str(ex))
def validate( # pylint: disable=arguments-differ
self, session: Session = None
) -> None:
# Validate/populate model exists
self._model = ReportScheduleDAO.find_by_id(self._model_id, session=session)
if not self._model:
raise ReportScheduleNotFoundError()