blob: c6e8e96df7e099476cceb303474ce7320e4e624a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
DEPRECATION NOTICE: this module is deprecated as of v1.0.0.
It will be removed in future versions of Superset. Please
migrate to the new scheduler: `superset.tasks.scheduler`.
"""
import logging
import time
import urllib.request
from collections import namedtuple
from datetime import datetime, timedelta
from email.utils import make_msgid, parseaddr
from typing import (
Any,
Callable,
Dict,
Iterator,
NamedTuple,
Optional,
Tuple,
TYPE_CHECKING,
Union,
)
from urllib.error import URLError
import croniter
import simplejson as json
from celery.app.task import Task
from dateutil.tz import tzlocal
from flask import current_app, render_template, url_for
from flask_babel import gettext as __
from retry.api import retry_call
from selenium.common.exceptions import WebDriverException
from selenium.webdriver import chrome, firefox
from selenium.webdriver.remote.webdriver import WebDriver
from sqlalchemy import func
from sqlalchemy.exc import NoSuchColumnError, ResourceClosedError
from sqlalchemy.orm import Session
from superset import app, security_manager, thumbnail_cache
from superset.extensions import celery_app, machine_auth_provider_factory
from superset.models.alerts import Alert, AlertLog
from superset.models.dashboard import Dashboard
from superset.models.schedules import (
EmailDeliveryType,
get_scheduler_model,
ScheduleType,
SliceEmailReportFormat,
)
from superset.models.slice import Slice
from superset.tasks.alerts.observer import observe
from superset.tasks.alerts.validator import get_validator_function
from superset.tasks.slack_util import deliver_slack_msg
from superset.utils.celery import session_scope
from superset.utils.core import get_email_address_list, send_email_smtp
from superset.utils.screenshots import ChartScreenshot, WebDriverProxy
from superset.utils.urls import get_url_path
# pylint: disable=too-few-public-methods
if TYPE_CHECKING:
from flask_appbuilder.security.sqla.models import User
from werkzeug.datastructures import TypeConversionDict
# Globals
config = app.config
logger = logging.getLogger("tasks.email_reports")
logger.setLevel(logging.INFO)
stats_logger = current_app.config["STATS_LOGGER"]
EMAIL_PAGE_RENDER_WAIT = config["EMAIL_PAGE_RENDER_WAIT"]
WEBDRIVER_BASEURL = config["WEBDRIVER_BASEURL"]
WEBDRIVER_BASEURL_USER_FRIENDLY = config["WEBDRIVER_BASEURL_USER_FRIENDLY"]
ReportContent = namedtuple(
"EmailContent",
[
"body", # email body
"data", # attachments
"images", # embedded images for the email
"slack_message", # html not supported, only markdown
# attachments for the slack message, embedding not supported
"slack_attachment",
],
)
class ScreenshotData(NamedTuple):
url: str # url to chat/dashboard for this screenshot
image: Optional[bytes] # bytes for the screenshot
class AlertContent(NamedTuple):
label: str # alert name
sql: str # sql statement for alert
observation_value: str # value from observation that triggered the alert
validation_error_message: str # a string of the comparison that triggered an alert
alert_url: str # url to alert details
image_data: Optional[ScreenshotData] # data for the alert screenshot
def _get_email_to_and_bcc(
recipients: str, deliver_as_group: bool
) -> Iterator[Tuple[str, str]]:
bcc = config["EMAIL_REPORT_BCC_ADDRESS"]
if deliver_as_group:
to = recipients
yield (to, bcc)
else:
for to in get_email_address_list(recipients):
yield (to, bcc)
# TODO(bkyryliuk): move email functionality into a separate module.
def _deliver_email( # pylint: disable=too-many-arguments
recipients: str,
deliver_as_group: bool,
subject: str,
body: str,
data: Optional[Dict[str, Any]],
images: Optional[Dict[str, bytes]],
) -> None:
for (to, bcc) in _get_email_to_and_bcc(recipients, deliver_as_group):
send_email_smtp(
to,
subject,
body,
config,
data=data,
images=images,
bcc=bcc,
mime_subtype="related",
dryrun=config["SCHEDULED_EMAIL_DEBUG_MODE"],
)
def _generate_report_content(
delivery_type: EmailDeliveryType, screenshot: bytes, name: str, url: str
) -> ReportContent:
data: Optional[Dict[str, Any]]
# how to: https://api.slack.com/reference/surfaces/formatting
slack_message = __(
"""
*%(name)s*\n
<%(url)s|Explore in Superset>
""",
name=name,
url=url,
)
if delivery_type == EmailDeliveryType.attachment:
images = None
data = {"screenshot": screenshot}
body = __(
'<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
name=name,
url=url,
)
elif delivery_type == EmailDeliveryType.inline:
# Get the domain from the 'From' address ..
# and make a message id without the < > in the ends
domain = parseaddr(config["SMTP_MAIL_FROM"])[1].split("@")[1]
msgid = make_msgid(domain)[1:-1]
images = {msgid: screenshot}
data = None
body = __(
"""
<b><a href="%(url)s">Explore in Superset</a></b><p></p>
<img src="cid:%(msgid)s">
""",
name=name,
url=url,
msgid=msgid,
)
return ReportContent(body, data, images, slack_message, screenshot)
def _get_url_path(view: str, user_friendly: bool = False, **kwargs: Any) -> str:
with app.test_request_context():
base_url = (
WEBDRIVER_BASEURL_USER_FRIENDLY if user_friendly else WEBDRIVER_BASEURL
)
return urllib.parse.urljoin(str(base_url), url_for(view, **kwargs))
def create_webdriver(session: Session) -> WebDriver:
return WebDriverProxy(driver_type=config["WEBDRIVER_TYPE"]).auth(
get_reports_user(session)
)
def get_reports_user(session: Session) -> "User":
return (
session.query(security_manager.user_model)
.filter(
func.lower(security_manager.user_model.username)
== func.lower(config["EMAIL_REPORTS_USER"])
)
.one()
)
def destroy_webdriver(
driver: Union[chrome.webdriver.WebDriver, firefox.webdriver.WebDriver]
) -> None:
"""
Destroy a driver
"""
# This is some very flaky code in selenium. Hence the retries
# and catch-all exceptions
try:
retry_call(driver.close, tries=2)
except Exception: # pylint: disable=broad-except
pass
try:
driver.quit()
except Exception: # pylint: disable=broad-except
pass
def deliver_dashboard( # pylint: disable=too-many-locals
dashboard_id: int,
recipients: Optional[str],
slack_channel: Optional[str],
delivery_type: EmailDeliveryType,
deliver_as_group: bool,
) -> None:
"""
Given a schedule, delivery the dashboard as an email report
"""
with session_scope(nullpool=True) as session:
dashboard = session.query(Dashboard).filter_by(id=dashboard_id).one()
dashboard_url = _get_url_path(
"Superset.dashboard", dashboard_id_or_slug=dashboard.id
)
dashboard_url_user_friendly = _get_url_path(
"Superset.dashboard", user_friendly=True, dashboard_id_or_slug=dashboard.id
)
# Create a driver, fetch the page, wait for the page to render
driver = create_webdriver(session)
window = config["WEBDRIVER_WINDOW"]["dashboard"]
driver.set_window_size(*window)
driver.get(dashboard_url)
time.sleep(EMAIL_PAGE_RENDER_WAIT)
# Set up a function to retry once for the element.
# This is buggy in certain selenium versions with firefox driver
get_element = getattr(driver, "find_element_by_class_name")
element = retry_call(
get_element, fargs=["grid-container"], tries=2, delay=EMAIL_PAGE_RENDER_WAIT
)
try:
screenshot = element.screenshot_as_png
except WebDriverException:
# Some webdrivers do not support screenshots for elements.
# In such cases, take a screenshot of the entire page.
screenshot = driver.screenshot() # pylint: disable=no-member
finally:
destroy_webdriver(driver)
# Generate the email body and attachments
report_content = _generate_report_content(
delivery_type,
screenshot,
dashboard.dashboard_title,
dashboard_url_user_friendly,
)
subject = __(
"%(prefix)s %(title)s",
prefix=config["EMAIL_REPORTS_SUBJECT_PREFIX"],
title=dashboard.dashboard_title,
)
if recipients:
_deliver_email(
recipients,
deliver_as_group,
subject,
report_content.body,
report_content.data,
report_content.images,
)
if slack_channel:
deliver_slack_msg(
slack_channel,
subject,
report_content.slack_message,
report_content.slack_attachment,
)
def _get_slice_data(
slc: Slice, delivery_type: EmailDeliveryType, session: Session
) -> ReportContent:
slice_url = _get_url_path(
"Superset.explore_json", csv="true", form_data=json.dumps({"slice_id": slc.id})
)
# URL to include in the email
slice_url_user_friendly = _get_url_path(
"Superset.slice", slice_id=slc.id, user_friendly=True
)
# Login on behalf of the "reports" user in order to get cookies to deal with auth
auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(
get_reports_user(session)
)
# Build something like "session=cool_sess.val;other-cookie=awesome_other_cookie"
cookie_str = ";".join([f"{key}={val}" for key, val in auth_cookies.items()])
opener = urllib.request.build_opener()
opener.addheaders.append(("Cookie", cookie_str))
response = opener.open(slice_url)
if response.getcode() != 200:
raise URLError(response.getcode())
# TODO: Move to the csv module
content = response.read()
rows = [r.split(b",") for r in content.splitlines()]
if delivery_type == EmailDeliveryType.inline:
data = None
# Parse the csv file and generate HTML
columns = rows.pop(0)
with app.app_context(): # type: ignore
body = render_template(
"superset/reports/slice_data.html",
columns=columns,
rows=rows,
name=slc.slice_name,
link=slice_url_user_friendly,
)
elif delivery_type == EmailDeliveryType.attachment:
data = {__("%(name)s.csv", name=slc.slice_name): content}
body = __(
'<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
name=slc.slice_name,
url=slice_url_user_friendly,
)
# how to: https://api.slack.com/reference/surfaces/formatting
slack_message = __(
"""
*%(slice_name)s*\n
<%(slice_url_user_friendly)s|Explore in Superset>
""",
slice_name=slc.slice_name,
slice_url_user_friendly=slice_url_user_friendly,
)
return ReportContent(body, data, None, slack_message, content)
def _get_slice_screenshot(slice_id: int, session: Session) -> ScreenshotData:
slice_obj = session.query(Slice).get(slice_id)
chart_url = get_url_path("Superset.slice", slice_id=slice_obj.id, standalone="true")
screenshot = ChartScreenshot(chart_url, slice_obj.digest)
image_url = _get_url_path(
"Superset.slice", user_friendly=True, slice_id=slice_obj.id,
)
user = security_manager.get_user_by_username(
current_app.config["THUMBNAIL_SELENIUM_USER"], session=session
)
image_data = screenshot.compute_and_cache(
user=user, cache=thumbnail_cache, force=True,
)
session.commit()
return ScreenshotData(image_url, image_data)
def _get_slice_visualization(
slc: Slice, delivery_type: EmailDeliveryType, session: Session
) -> ReportContent:
# Create a driver, fetch the page, wait for the page to render
driver = create_webdriver(session)
window = config["WEBDRIVER_WINDOW"]["slice"]
driver.set_window_size(*window)
slice_url = _get_url_path("Superset.slice", slice_id=slc.id)
slice_url_user_friendly = _get_url_path(
"Superset.slice", slice_id=slc.id, user_friendly=True
)
driver.get(slice_url)
time.sleep(EMAIL_PAGE_RENDER_WAIT)
# Set up a function to retry once for the element.
# This is buggy in certain selenium versions with firefox driver
element = retry_call(
driver.find_element_by_class_name,
fargs=["chart-container"],
tries=2,
delay=EMAIL_PAGE_RENDER_WAIT,
)
try:
screenshot = element.screenshot_as_png
except WebDriverException:
# Some webdrivers do not support screenshots for elements.
# In such cases, take a screenshot of the entire page.
screenshot = driver.screenshot() # pylint: disable=no-member
finally:
destroy_webdriver(driver)
# Generate the email body and attachments
return _generate_report_content(
delivery_type, screenshot, slc.slice_name, slice_url_user_friendly
)
def deliver_slice( # pylint: disable=too-many-arguments
slice_id: int,
recipients: Optional[str],
slack_channel: Optional[str],
delivery_type: EmailDeliveryType,
email_format: SliceEmailReportFormat,
deliver_as_group: bool,
session: Session,
) -> None:
"""
Given a schedule, delivery the slice as an email report
"""
slc = session.query(Slice).filter_by(id=slice_id).one()
if email_format == SliceEmailReportFormat.data:
report_content = _get_slice_data(slc, delivery_type, session)
elif email_format == SliceEmailReportFormat.visualization:
report_content = _get_slice_visualization(slc, delivery_type, session)
else:
raise RuntimeError("Unknown email report format")
subject = __(
"%(prefix)s %(title)s",
prefix=config["EMAIL_REPORTS_SUBJECT_PREFIX"],
title=slc.slice_name,
)
if recipients:
_deliver_email(
recipients,
deliver_as_group,
subject,
report_content.body,
report_content.data,
report_content.images,
)
if slack_channel:
deliver_slack_msg(
slack_channel,
subject,
report_content.slack_message,
report_content.slack_attachment,
)
@celery_app.task(
name="email_reports.send",
bind=True,
soft_time_limit=config["EMAIL_ASYNC_TIME_LIMIT_SEC"],
)
def schedule_email_report(
_task: Task,
report_type: ScheduleType,
schedule_id: int,
recipients: Optional[str] = None,
slack_channel: Optional[str] = None,
) -> None:
model_cls = get_scheduler_model(report_type)
with session_scope(nullpool=True) as session:
schedule = session.query(model_cls).get(schedule_id)
# The user may have disabled the schedule. If so, ignore this
if not schedule or not schedule.active:
logger.info("Ignoring deactivated schedule")
return
recipients = recipients or schedule.recipients
slack_channel = slack_channel or schedule.slack_channel
logger.info(
"Starting report for slack: %s and recipients: %s.",
slack_channel,
recipients,
)
if report_type == ScheduleType.dashboard:
deliver_dashboard(
schedule.dashboard_id,
recipients,
slack_channel,
schedule.delivery_type,
schedule.deliver_as_group,
)
elif report_type == ScheduleType.slice:
deliver_slice(
schedule.slice_id,
recipients,
slack_channel,
schedule.delivery_type,
schedule.email_format,
schedule.deliver_as_group,
session,
)
else:
raise RuntimeError("Unknown report type")
@celery_app.task(
name="alerts.run_query",
bind=True,
# TODO: find cause of https://github.com/apache/superset/issues/10530
# and remove retry
autoretry_for=(NoSuchColumnError, ResourceClosedError,),
retry_kwargs={"max_retries": 1},
retry_backoff=True,
)
def schedule_alert_query(
_task: Task,
report_type: ScheduleType,
schedule_id: int,
recipients: Optional[str] = None,
slack_channel: Optional[str] = None,
) -> None:
model_cls = get_scheduler_model(report_type)
with session_scope(nullpool=True) as session:
schedule = session.query(model_cls).get(schedule_id)
# The user may have disabled the schedule. If so, ignore this
if not schedule or not schedule.active:
logger.info("Ignoring deactivated alert")
return
if report_type == ScheduleType.alert:
evaluate_alert(
schedule.id, schedule.label, session, recipients, slack_channel
)
else:
raise RuntimeError("Unknown report type")
class AlertState:
ERROR = "error"
TRIGGER = "trigger"
PASS = "pass"
def deliver_alert(
alert_id: int,
session: Session,
recipients: Optional[str] = None,
slack_channel: Optional[str] = None,
) -> None:
"""
Gathers alert information and sends out the alert
to its respective email and slack recipients
"""
alert = session.query(Alert).get(alert_id)
logging.info("Triggering alert: %s", alert)
# Set all the values for the alert report
# Alternate values are used in the case of a test alert
# where an alert might not have a validator
recipients = recipients or alert.recipients
slack_channel = slack_channel or alert.slack_channel
validation_error_message = (
str(alert.observations[-1].value) + " " + alert.pretty_config
)
if alert.slice:
alert_content = AlertContent(
alert.label,
alert.sql,
str(alert.observations[-1].value),
validation_error_message,
_get_url_path("AlertModelView.show", user_friendly=True, pk=alert_id),
_get_slice_screenshot(alert.slice.id, session),
)
else:
# TODO: dashboard delivery!
alert_content = AlertContent(
alert.label,
alert.sql,
str(alert.observations[-1].value),
validation_error_message,
_get_url_path("AlertModelView.show", user_friendly=True, pk=alert_id),
None,
)
if recipients:
deliver_email_alert(alert_content, recipients)
if slack_channel:
deliver_slack_alert(alert_content, slack_channel)
def deliver_email_alert(alert_content: AlertContent, recipients: str) -> None:
"""Delivers an email alert to the given email recipients"""
subject = f"[Superset] Triggered alert: {alert_content.label}"
deliver_as_group = False
data = None
images = {}
# TODO(JasonD28): add support for emails with no screenshot
image_url = None
if alert_content.image_data:
image_url = alert_content.image_data.url
if alert_content.image_data.image:
images = {"screenshot": alert_content.image_data.image}
body = render_template(
"email/alert.txt",
alert_url=alert_content.alert_url,
label=alert_content.label,
sql=alert_content.sql,
observation_value=alert_content.observation_value,
validation_error_message=alert_content.validation_error_message,
image_url=image_url,
)
_deliver_email(recipients, deliver_as_group, subject, body, data, images)
def deliver_slack_alert(alert_content: AlertContent, slack_channel: str) -> None:
"""Delivers a slack alert to the given slack channel"""
subject = __("[Alert] %(label)s", label=alert_content.label)
image = None
if alert_content.image_data:
slack_message = render_template(
"slack/alert.txt",
label=alert_content.label,
sql=alert_content.sql,
observation_value=alert_content.observation_value,
validation_error_message=alert_content.validation_error_message,
url=alert_content.image_data.url,
alert_url=alert_content.alert_url,
)
image = alert_content.image_data.image
else:
slack_message = render_template(
"slack/alert_no_screenshot.txt",
label=alert_content.label,
sql=alert_content.sql,
observation_value=alert_content.observation_value,
validation_error_message=alert_content.validation_error_message,
alert_url=alert_content.alert_url,
)
deliver_slack_msg(
slack_channel, subject, slack_message, image,
)
def evaluate_alert(
alert_id: int,
label: str,
session: Session,
recipients: Optional[str] = None,
slack_channel: Optional[str] = None,
) -> None:
"""Processes an alert to see if it should be triggered"""
logger.info("Processing alert ID: %i", alert_id)
state = None
dttm_start = datetime.utcnow()
try:
logger.info("Querying observers for alert <%s:%s>", alert_id, label)
error_msg = observe(alert_id, session)
if error_msg:
state = AlertState.ERROR
logging.error(error_msg)
except Exception as exc: # pylint: disable=broad-except
state = AlertState.ERROR
logging.exception(exc)
logging.error("Failed at query observers for alert: %s (%s)", label, alert_id)
dttm_end = datetime.utcnow()
if state != AlertState.ERROR:
# Don't validate alert on test runs since it may not be triggered
if recipients or slack_channel:
deliver_alert(alert_id, session, recipients, slack_channel)
state = AlertState.TRIGGER
# Validate during regular workflow and deliver only if triggered
elif validate_observations(alert_id, label, session):
deliver_alert(alert_id, session, recipients, slack_channel)
state = AlertState.TRIGGER
else:
state = AlertState.PASS
session.commit()
alert = session.query(Alert).get(alert_id)
if state != AlertState.ERROR:
alert.last_eval_dttm = dttm_end
alert.last_state = state
alert.logs.append(
AlertLog(
scheduled_dttm=dttm_start,
dttm_start=dttm_start,
dttm_end=dttm_end,
state=state,
)
)
session.commit()
def validate_observations(alert_id: int, label: str, session: Session) -> bool:
"""
Runs an alert's validators to check if it should be triggered or not
If so, return the name of the validator that returned true
"""
logger.info("Validating observations for alert <%s:%s>", alert_id, label)
alert = session.query(Alert).get(alert_id)
validate = get_validator_function(alert.validator_type)
return bool(validate and validate(alert, alert.validator_config))
def next_schedules(
crontab: str, start_at: datetime, stop_at: datetime, resolution: int = 0
) -> Iterator[datetime]:
crons = croniter.croniter(crontab, start_at - timedelta(seconds=1))
previous = start_at - timedelta(days=1)
for eta in crons.all_next(datetime):
# Do not cross the time boundary
if eta >= stop_at:
break
if eta < start_at:
continue
# Do not allow very frequent tasks
if eta - previous < timedelta(seconds=resolution):
continue
yield eta
previous = eta
def schedule_window(
report_type: str,
start_at: datetime,
stop_at: datetime,
resolution: int,
session: Session,
) -> None:
"""
Find all active schedules and schedule celery tasks for
each of them with a specific ETA (determined by parsing
the cron schedule for the schedule)
"""
model_cls = get_scheduler_model(report_type)
if not model_cls:
return None
schedules = session.query(model_cls).filter(model_cls.active.is_(True))
for schedule in schedules:
logging.info("Processing schedule %s", schedule)
args = (report_type, schedule.id)
schedule_start_at = start_at
if (
hasattr(schedule, "last_eval_dttm")
and schedule.last_eval_dttm
and schedule.last_eval_dttm > start_at
):
schedule_start_at = schedule.last_eval_dttm + timedelta(seconds=1)
# Schedule the job for the specified time window
for eta in next_schedules(
schedule.crontab, schedule_start_at, stop_at, resolution=resolution
):
logging.info("Scheduled eta %s", eta)
get_scheduler_action(report_type).apply_async(args, eta=eta) # type: ignore
return None
def get_scheduler_action(report_type: str) -> Optional[Callable[..., Any]]:
if report_type == ScheduleType.dashboard:
return schedule_email_report
if report_type == ScheduleType.slice:
return schedule_email_report
if report_type == ScheduleType.alert:
return schedule_alert_query
return None
@celery_app.task(name="email_reports.schedule_hourly")
def schedule_hourly() -> None:
""" Celery beat job meant to be invoked hourly """
if not config["ENABLE_SCHEDULED_EMAIL_REPORTS"]:
logger.info("Scheduled email reports not enabled in config")
return
resolution = config["EMAIL_REPORTS_CRON_RESOLUTION"] * 60
# Get the top of the hour
start_at = datetime.now(tzlocal()).replace(microsecond=0, second=0, minute=0)
stop_at = start_at + timedelta(seconds=3600)
with session_scope(nullpool=True) as session:
schedule_window(ScheduleType.dashboard, start_at, stop_at, resolution, session)
schedule_window(ScheduleType.slice, start_at, stop_at, resolution, session)
@celery_app.task(name="alerts.schedule_check")
def schedule_alerts() -> None:
""" Celery beat job meant to be invoked every minute to check alerts """
resolution = 0
now = datetime.utcnow()
start_at = now - timedelta(
seconds=300
) # process any missed tasks in the past few minutes
stop_at = now + timedelta(seconds=1)
with session_scope(nullpool=True) as session:
schedule_window(ScheduleType.alert, start_at, stop_at, resolution, session)