# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
DEPRECATION NOTICE: this module is deprecated as of v1.0.0.
It will be removed in future versions of Superset. Please
migrate to the new scheduler: `superset.tasks.scheduler`.
"""

import logging
import time
import urllib.request
from collections import namedtuple
from datetime import datetime, timedelta
from email.utils import make_msgid, parseaddr
from typing import (
    Any,
    Callable,
    Dict,
    Iterator,
    NamedTuple,
    Optional,
    Tuple,
    TYPE_CHECKING,
    Union,
)
from urllib.error import URLError

import croniter
import simplejson as json
from celery.app.task import Task
from dateutil.tz import tzlocal
from flask import current_app, render_template, url_for
from flask_babel import gettext as __
from selenium.common.exceptions import WebDriverException
from selenium.webdriver import chrome, firefox
from selenium.webdriver.remote.webdriver import WebDriver
from sqlalchemy import func
from sqlalchemy.exc import NoSuchColumnError, ResourceClosedError
from sqlalchemy.orm import Session

from superset import app, security_manager, thumbnail_cache
from superset.extensions import celery_app, machine_auth_provider_factory
from superset.models.alerts import Alert, AlertLog
from superset.models.dashboard import Dashboard
from superset.models.schedules import (
    EmailDeliveryType,
    get_scheduler_model,
    ScheduleType,
    SliceEmailReportFormat,
)
from superset.models.slice import Slice
from superset.tasks.alerts.observer import observe
from superset.tasks.alerts.validator import get_validator_function
from superset.tasks.slack_util import deliver_slack_msg
from superset.utils.celery import session_scope
from superset.utils.core import get_email_address_list, send_email_smtp
from superset.utils.retries import retry_call
from superset.utils.screenshots import ChartScreenshot, WebDriverProxy
from superset.utils.urls import get_url_path

# pylint: disable=too-few-public-methods

if TYPE_CHECKING:
    from flask_appbuilder.security.sqla.models import User
    from werkzeug.datastructures import TypeConversionDict

# Globals
config = app.config
logger = logging.getLogger("tasks.email_reports")
logger.setLevel(logging.INFO)

stats_logger = current_app.config["STATS_LOGGER"]
EMAIL_PAGE_RENDER_WAIT = config["EMAIL_PAGE_RENDER_WAIT"]
WEBDRIVER_BASEURL = config["WEBDRIVER_BASEURL"]
WEBDRIVER_BASEURL_USER_FRIENDLY = config["WEBDRIVER_BASEURL_USER_FRIENDLY"]

ReportContent = namedtuple(
    "EmailContent",
    [
        "body",  # email body
        "data",  # attachments
        "images",  # embedded images for the email
        "slack_message",  # html not supported, only markdown
        # attachments for the slack message, embedding not supported
        "slack_attachment",
    ],
)


class ScreenshotData(NamedTuple):
    url: str  # url to chat/dashboard for this screenshot
    image: Optional[bytes]  # bytes for the screenshot


class AlertContent(NamedTuple):
    label: str  # alert name
    sql: str  # sql statement for alert
    observation_value: str  # value from observation that triggered the alert
    validation_error_message: str  # a string of the comparison that triggered an alert
    alert_url: str  # url to alert details
    image_data: Optional[ScreenshotData]  # data for the alert screenshot


def _get_email_to_and_bcc(
    recipients: str, deliver_as_group: bool
) -> Iterator[Tuple[str, str]]:
    bcc = config["EMAIL_REPORT_BCC_ADDRESS"]

    if deliver_as_group:
        to = recipients
        yield (to, bcc)
    else:
        for to in get_email_address_list(recipients):
            yield (to, bcc)


# TODO(bkyryliuk): move email functionality into a separate module.
def _deliver_email(  # pylint: disable=too-many-arguments
    recipients: str,
    deliver_as_group: bool,
    subject: str,
    body: str,
    data: Optional[Dict[str, Any]],
    images: Optional[Dict[str, bytes]],
) -> None:
    for (to, bcc) in _get_email_to_and_bcc(recipients, deliver_as_group):
        send_email_smtp(
            to,
            subject,
            body,
            config,
            data=data,
            images=images,
            bcc=bcc,
            mime_subtype="related",
            dryrun=config["SCHEDULED_EMAIL_DEBUG_MODE"],
        )


def _generate_report_content(
    delivery_type: EmailDeliveryType, screenshot: bytes, name: str, url: str
) -> ReportContent:
    data: Optional[Dict[str, Any]]

    # how to: https://api.slack.com/reference/surfaces/formatting
    slack_message = __(
        """
        *%(name)s*\n
        <%(url)s|Explore in Superset>
        """,
        name=name,
        url=url,
    )

    if delivery_type == EmailDeliveryType.attachment:
        images = None
        data = {"screenshot": screenshot}
        body = __(
            '<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
            name=name,
            url=url,
        )
    elif delivery_type == EmailDeliveryType.inline:
        # Get the domain from the 'From' address ..
        # and make a message id without the < > in the ends
        domain = parseaddr(config["SMTP_MAIL_FROM"])[1].split("@")[1]
        msgid = make_msgid(domain)[1:-1]

        images = {msgid: screenshot}
        data = None
        body = __(
            """
            <b><a href="%(url)s">Explore in Superset</a></b><p></p>
            <img src="cid:%(msgid)s">
            """,
            name=name,
            url=url,
            msgid=msgid,
        )

    return ReportContent(body, data, images, slack_message, screenshot)


def _get_url_path(view: str, user_friendly: bool = False, **kwargs: Any) -> str:
    with app.test_request_context():
        base_url = (
            WEBDRIVER_BASEURL_USER_FRIENDLY if user_friendly else WEBDRIVER_BASEURL
        )
        return urllib.parse.urljoin(str(base_url), url_for(view, **kwargs))


def create_webdriver(session: Session) -> WebDriver:
    return WebDriverProxy(driver_type=config["WEBDRIVER_TYPE"]).auth(
        get_reports_user(session)
    )


def get_reports_user(session: Session) -> "User":
    return (
        session.query(security_manager.user_model)
        .filter(
            func.lower(security_manager.user_model.username)
            == func.lower(config["EMAIL_REPORTS_USER"])
        )
        .one()
    )


def destroy_webdriver(
    driver: Union[chrome.webdriver.WebDriver, firefox.webdriver.WebDriver]
) -> None:
    """
    Destroy a driver
    """

    # This is some very flaky code in selenium. Hence the retries
    # and catch-all exceptions
    try:
        retry_call(driver.close, max_tries=2)
    except Exception:  # pylint: disable=broad-except
        pass
    try:
        driver.quit()
    except Exception:  # pylint: disable=broad-except
        pass


def deliver_dashboard(  # pylint: disable=too-many-locals
    dashboard_id: int,
    recipients: Optional[str],
    slack_channel: Optional[str],
    delivery_type: EmailDeliveryType,
    deliver_as_group: bool,
) -> None:

    """
    Given a schedule, delivery the dashboard as an email report
    """
    with session_scope(nullpool=True) as session:
        dashboard = session.query(Dashboard).filter_by(id=dashboard_id).one()

        dashboard_url = _get_url_path(
            "Superset.dashboard", dashboard_id_or_slug=dashboard.id
        )
        dashboard_url_user_friendly = _get_url_path(
            "Superset.dashboard", user_friendly=True, dashboard_id_or_slug=dashboard.id
        )

        # Create a driver, fetch the page, wait for the page to render
        driver = create_webdriver(session)
        window = config["WEBDRIVER_WINDOW"]["dashboard"]
        driver.set_window_size(*window)
        driver.get(dashboard_url)
        time.sleep(EMAIL_PAGE_RENDER_WAIT)

        # Set up a function to retry once for the element.
        # This is buggy in certain selenium versions with firefox driver
        get_element = getattr(driver, "find_element_by_class_name")
        element = retry_call(
            get_element,
            fargs=["grid-container"],
            max_tries=2,
            interval=EMAIL_PAGE_RENDER_WAIT,
        )

        try:
            screenshot = element.screenshot_as_png
        except WebDriverException:
            # Some webdrivers do not support screenshots for elements.
            # In such cases, take a screenshot of the entire page.
            screenshot = driver.screenshot()  # pylint: disable=no-member
        finally:
            destroy_webdriver(driver)

        # Generate the email body and attachments
        report_content = _generate_report_content(
            delivery_type,
            screenshot,
            dashboard.dashboard_title,
            dashboard_url_user_friendly,
        )

        subject = __(
            "%(prefix)s %(title)s",
            prefix=config["EMAIL_REPORTS_SUBJECT_PREFIX"],
            title=dashboard.dashboard_title,
        )

        if recipients:
            _deliver_email(
                recipients,
                deliver_as_group,
                subject,
                report_content.body,
                report_content.data,
                report_content.images,
            )
        if slack_channel:
            deliver_slack_msg(
                slack_channel,
                subject,
                report_content.slack_message,
                report_content.slack_attachment,
            )


def _get_slice_data(
    slc: Slice, delivery_type: EmailDeliveryType, session: Session
) -> ReportContent:
    slice_url = _get_url_path(
        "Superset.explore_json", csv="true", form_data=json.dumps({"slice_id": slc.id})
    )

    # URL to include in the email
    slice_url_user_friendly = _get_url_path(
        "Superset.slice", slice_id=slc.id, user_friendly=True
    )

    # Login on behalf of the "reports" user in order to get cookies to deal with auth
    auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(
        get_reports_user(session)
    )
    # Build something like "session=cool_sess.val;other-cookie=awesome_other_cookie"
    cookie_str = ";".join([f"{key}={val}" for key, val in auth_cookies.items()])

    opener = urllib.request.build_opener()
    opener.addheaders.append(("Cookie", cookie_str))
    response = opener.open(slice_url)
    if response.getcode() != 200:
        raise URLError(response.getcode())

    # TODO: Move to the csv module
    content = response.read()
    rows = [r.split(b",") for r in content.splitlines()]

    if delivery_type == EmailDeliveryType.inline:
        data = None

        # Parse the csv file and generate HTML
        columns = rows.pop(0)
        with app.app_context():
            body = render_template(
                "superset/reports/slice_data.html",
                columns=columns,
                rows=rows,
                name=slc.slice_name,
                link=slice_url_user_friendly,
            )

    elif delivery_type == EmailDeliveryType.attachment:
        data = {__("%(name)s.csv", name=slc.slice_name): content}
        body = __(
            '<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
            name=slc.slice_name,
            url=slice_url_user_friendly,
        )

    # how to: https://api.slack.com/reference/surfaces/formatting
    slack_message = __(
        """
        *%(slice_name)s*\n
        <%(slice_url_user_friendly)s|Explore in Superset>
        """,
        slice_name=slc.slice_name,
        slice_url_user_friendly=slice_url_user_friendly,
    )

    return ReportContent(body, data, None, slack_message, content)


def _get_slice_screenshot(slice_id: int, session: Session) -> ScreenshotData:
    slice_obj = session.query(Slice).get(slice_id)

    chart_url = get_url_path("Superset.slice", slice_id=slice_obj.id, standalone="true")
    screenshot = ChartScreenshot(chart_url, slice_obj.digest)
    image_url = _get_url_path(
        "Superset.slice", user_friendly=True, slice_id=slice_obj.id,
    )

    user = security_manager.get_user_by_username(
        current_app.config["THUMBNAIL_SELENIUM_USER"], session=session
    )
    image_data = screenshot.compute_and_cache(
        user=user, cache=thumbnail_cache, force=True,
    )

    session.commit()
    return ScreenshotData(image_url, image_data)


def _get_slice_visualization(
    slc: Slice, delivery_type: EmailDeliveryType, session: Session
) -> ReportContent:
    # Create a driver, fetch the page, wait for the page to render
    driver = create_webdriver(session)
    window = config["WEBDRIVER_WINDOW"]["slice"]
    driver.set_window_size(*window)

    slice_url = _get_url_path("Superset.slice", slice_id=slc.id)
    slice_url_user_friendly = _get_url_path(
        "Superset.slice", slice_id=slc.id, user_friendly=True
    )

    driver.get(slice_url)
    time.sleep(EMAIL_PAGE_RENDER_WAIT)

    # Set up a function to retry once for the element.
    # This is buggy in certain selenium versions with firefox driver
    element = retry_call(
        driver.find_element_by_class_name,
        fargs=["chart-container"],
        max_tries=2,
        interval=EMAIL_PAGE_RENDER_WAIT,
    )

    try:
        screenshot = element.screenshot_as_png
    except WebDriverException:
        # Some webdrivers do not support screenshots for elements.
        # In such cases, take a screenshot of the entire page.
        screenshot = driver.screenshot()  # pylint: disable=no-member
    finally:
        destroy_webdriver(driver)

    # Generate the email body and attachments
    return _generate_report_content(
        delivery_type, screenshot, slc.slice_name, slice_url_user_friendly
    )


def deliver_slice(  # pylint: disable=too-many-arguments
    slice_id: int,
    recipients: Optional[str],
    slack_channel: Optional[str],
    delivery_type: EmailDeliveryType,
    email_format: SliceEmailReportFormat,
    deliver_as_group: bool,
    session: Session,
) -> None:
    """
    Given a schedule, delivery the slice as an email report
    """
    slc = session.query(Slice).filter_by(id=slice_id).one()

    if email_format == SliceEmailReportFormat.data:
        report_content = _get_slice_data(slc, delivery_type, session)
    elif email_format == SliceEmailReportFormat.visualization:
        report_content = _get_slice_visualization(slc, delivery_type, session)
    else:
        raise RuntimeError("Unknown email report format")

    subject = __(
        "%(prefix)s %(title)s",
        prefix=config["EMAIL_REPORTS_SUBJECT_PREFIX"],
        title=slc.slice_name,
    )

    if recipients:
        _deliver_email(
            recipients,
            deliver_as_group,
            subject,
            report_content.body,
            report_content.data,
            report_content.images,
        )
    if slack_channel:
        deliver_slack_msg(
            slack_channel,
            subject,
            report_content.slack_message,
            report_content.slack_attachment,
        )


@celery_app.task(
    name="email_reports.send",
    bind=True,
    soft_time_limit=config["EMAIL_ASYNC_TIME_LIMIT_SEC"],
)
def schedule_email_report(
    _task: Task,
    report_type: ScheduleType,
    schedule_id: int,
    recipients: Optional[str] = None,
    slack_channel: Optional[str] = None,
) -> None:
    model_cls = get_scheduler_model(report_type)
    with session_scope(nullpool=True) as session:
        schedule = session.query(model_cls).get(schedule_id)

        # The user may have disabled the schedule. If so, ignore this
        if not schedule or not schedule.active:
            logger.info("Ignoring deactivated schedule")
            return

        recipients = recipients or schedule.recipients
        slack_channel = slack_channel or schedule.slack_channel
        logger.info(
            "Starting report for slack: %s and recipients: %s.",
            slack_channel,
            recipients,
        )

        if report_type == ScheduleType.dashboard:
            deliver_dashboard(
                schedule.dashboard_id,
                recipients,
                slack_channel,
                schedule.delivery_type,
                schedule.deliver_as_group,
            )
        elif report_type == ScheduleType.slice:
            deliver_slice(
                schedule.slice_id,
                recipients,
                slack_channel,
                schedule.delivery_type,
                schedule.email_format,
                schedule.deliver_as_group,
                session,
            )
        else:
            raise RuntimeError("Unknown report type")


@celery_app.task(
    name="alerts.run_query",
    bind=True,
    # TODO: find cause of https://github.com/apache/superset/issues/10530
    # and remove retry
    autoretry_for=(NoSuchColumnError, ResourceClosedError,),
    retry_kwargs={"max_retries": 1},
    retry_backoff=True,
)
def schedule_alert_query(
    _task: Task,
    report_type: ScheduleType,
    schedule_id: int,
    recipients: Optional[str] = None,
    slack_channel: Optional[str] = None,
) -> None:
    model_cls = get_scheduler_model(report_type)
    with session_scope(nullpool=True) as session:
        schedule = session.query(model_cls).get(schedule_id)

        # The user may have disabled the schedule. If so, ignore this
        if not schedule or not schedule.active:
            logger.info("Ignoring deactivated alert")
            return

        if report_type == ScheduleType.alert:
            evaluate_alert(
                schedule.id, schedule.label, session, recipients, slack_channel
            )
        else:
            raise RuntimeError("Unknown report type")


class AlertState:
    ERROR = "error"
    TRIGGER = "trigger"
    PASS = "pass"


def deliver_alert(
    alert_id: int,
    session: Session,
    recipients: Optional[str] = None,
    slack_channel: Optional[str] = None,
) -> None:
    """
    Gathers alert information and sends out the alert
    to its respective email and slack recipients
    """

    alert = session.query(Alert).get(alert_id)

    logging.info("Triggering alert: %s", alert)

    # Set all the values for the alert report
    # Alternate values are used in the case of a test alert
    # where an alert might not have a validator
    recipients = recipients or alert.recipients
    slack_channel = slack_channel or alert.slack_channel
    validation_error_message = (
        str(alert.observations[-1].value) + " " + alert.pretty_config
    )

    if alert.slice:
        alert_content = AlertContent(
            alert.label,
            alert.sql,
            str(alert.observations[-1].value),
            validation_error_message,
            _get_url_path("AlertModelView.show", user_friendly=True, pk=alert_id),
            _get_slice_screenshot(alert.slice.id, session),
        )
    else:
        # TODO: dashboard delivery!
        alert_content = AlertContent(
            alert.label,
            alert.sql,
            str(alert.observations[-1].value),
            validation_error_message,
            _get_url_path("AlertModelView.show", user_friendly=True, pk=alert_id),
            None,
        )

    if recipients:
        deliver_email_alert(alert_content, recipients)
    if slack_channel:
        deliver_slack_alert(alert_content, slack_channel)


def deliver_email_alert(alert_content: AlertContent, recipients: str) -> None:
    """Delivers an email alert to the given email recipients"""
    subject = f"[Superset] Triggered alert: {alert_content.label}"
    deliver_as_group = False
    data = None
    images = {}
    # TODO(JasonD28): add support for emails with no screenshot
    image_url = None
    if alert_content.image_data:
        image_url = alert_content.image_data.url
        if alert_content.image_data.image:
            images = {"screenshot": alert_content.image_data.image}

    body = render_template(
        "email/alert.txt",
        alert_url=alert_content.alert_url,
        label=alert_content.label,
        sql=alert_content.sql,
        observation_value=alert_content.observation_value,
        validation_error_message=alert_content.validation_error_message,
        image_url=image_url,
    )

    _deliver_email(recipients, deliver_as_group, subject, body, data, images)


def deliver_slack_alert(alert_content: AlertContent, slack_channel: str) -> None:
    """Delivers a slack alert to the given slack channel"""

    subject = __("[Alert] %(label)s", label=alert_content.label)

    image = None
    if alert_content.image_data:
        slack_message = render_template(
            "slack/alert.txt",
            label=alert_content.label,
            sql=alert_content.sql,
            observation_value=alert_content.observation_value,
            validation_error_message=alert_content.validation_error_message,
            url=alert_content.image_data.url,
            alert_url=alert_content.alert_url,
        )
        image = alert_content.image_data.image
    else:
        slack_message = render_template(
            "slack/alert_no_screenshot.txt",
            label=alert_content.label,
            sql=alert_content.sql,
            observation_value=alert_content.observation_value,
            validation_error_message=alert_content.validation_error_message,
            alert_url=alert_content.alert_url,
        )

    deliver_slack_msg(
        slack_channel, subject, slack_message, image,
    )


def evaluate_alert(
    alert_id: int,
    label: str,
    session: Session,
    recipients: Optional[str] = None,
    slack_channel: Optional[str] = None,
) -> None:
    """Processes an alert to see if it should be triggered"""

    logger.info("Processing alert ID: %i", alert_id)

    state = None
    dttm_start = datetime.utcnow()

    try:
        logger.info("Querying observers for alert <%s:%s>", alert_id, label)
        error_msg = observe(alert_id, session)
        if error_msg:
            state = AlertState.ERROR
            logging.error(error_msg)
    except Exception as exc:  # pylint: disable=broad-except
        state = AlertState.ERROR
        logging.exception(exc)
        logging.error("Failed at query observers for alert: %s (%s)", label, alert_id)

    dttm_end = datetime.utcnow()

    if state != AlertState.ERROR:
        # Don't validate alert on test runs since it may not be triggered
        if recipients or slack_channel:
            deliver_alert(alert_id, session, recipients, slack_channel)
            state = AlertState.TRIGGER
        # Validate during regular workflow and deliver only if triggered
        elif validate_observations(alert_id, label, session):
            deliver_alert(alert_id, session, recipients, slack_channel)
            state = AlertState.TRIGGER
        else:
            state = AlertState.PASS

    session.commit()
    alert = session.query(Alert).get(alert_id)
    if state != AlertState.ERROR:
        alert.last_eval_dttm = dttm_end
    alert.last_state = state
    alert.logs.append(
        AlertLog(
            scheduled_dttm=dttm_start,
            dttm_start=dttm_start,
            dttm_end=dttm_end,
            state=state,
        )
    )
    session.commit()


def validate_observations(alert_id: int, label: str, session: Session) -> bool:
    """
    Runs an alert's validators to check if it should be triggered or not
    If so, return the name of the validator that returned true
    """

    logger.info("Validating observations for alert <%s:%s>", alert_id, label)
    alert = session.query(Alert).get(alert_id)
    validate = get_validator_function(alert.validator_type)
    return bool(validate and validate(alert, alert.validator_config))


def next_schedules(
    crontab: str, start_at: datetime, stop_at: datetime, resolution: int = 0
) -> Iterator[datetime]:
    crons = croniter.croniter(crontab, start_at - timedelta(seconds=1))
    previous = start_at - timedelta(days=1)

    for eta in crons.all_next(datetime):
        # Do not cross the time boundary
        if eta >= stop_at:
            break

        if eta < start_at:
            continue

        # Do not allow very frequent tasks
        if eta - previous < timedelta(seconds=resolution):
            continue

        yield eta
        previous = eta


def schedule_window(
    report_type: str,
    start_at: datetime,
    stop_at: datetime,
    resolution: int,
    session: Session,
) -> None:
    """
    Find all active schedules and schedule celery tasks for
    each of them with a specific ETA (determined by parsing
    the cron schedule for the schedule)
    """
    model_cls = get_scheduler_model(report_type)

    if not model_cls:
        return None

    schedules = session.query(model_cls).filter(model_cls.active.is_(True))

    for schedule in schedules:
        logging.info("Processing schedule %s", schedule)
        args = (report_type, schedule.id)
        schedule_start_at = start_at

        if (
            hasattr(schedule, "last_eval_dttm")
            and schedule.last_eval_dttm
            and schedule.last_eval_dttm > start_at
        ):
            schedule_start_at = schedule.last_eval_dttm + timedelta(seconds=1)

        # Schedule the job for the specified time window
        for eta in next_schedules(
            schedule.crontab, schedule_start_at, stop_at, resolution=resolution
        ):
            logging.info("Scheduled eta %s", eta)
            get_scheduler_action(report_type).apply_async(args, eta=eta)  # type: ignore

    return None


def get_scheduler_action(report_type: str) -> Optional[Callable[..., Any]]:
    if report_type == ScheduleType.dashboard:
        return schedule_email_report
    if report_type == ScheduleType.slice:
        return schedule_email_report
    if report_type == ScheduleType.alert:
        return schedule_alert_query
    return None


@celery_app.task(name="email_reports.schedule_hourly")
def schedule_hourly() -> None:
    """ Celery beat job meant to be invoked hourly """
    if not config["ENABLE_SCHEDULED_EMAIL_REPORTS"]:
        logger.info("Scheduled email reports not enabled in config")
        return

    resolution = config["EMAIL_REPORTS_CRON_RESOLUTION"] * 60

    # Get the top of the hour
    start_at = datetime.now(tzlocal()).replace(microsecond=0, second=0, minute=0)
    stop_at = start_at + timedelta(seconds=3600)

    with session_scope(nullpool=True) as session:
        schedule_window(ScheduleType.dashboard, start_at, stop_at, resolution, session)
        schedule_window(ScheduleType.slice, start_at, stop_at, resolution, session)


@celery_app.task(name="alerts.schedule_check")
def schedule_alerts() -> None:
    """ Celery beat job meant to be invoked every minute to check alerts """
    resolution = 0
    now = datetime.utcnow()
    start_at = now - timedelta(
        seconds=300
    )  # process any missed tasks in the past few minutes
    stop_at = now + timedelta(seconds=1)
    with session_scope(nullpool=True) as session:
        schedule_window(ScheduleType.alert, start_at, stop_at, resolution, session)
