| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| import logging |
| from datetime import datetime |
| |
| from celery import Celery |
| from celery.exceptions import SoftTimeLimitExceeded |
| |
| from superset import app, is_feature_enabled |
| from superset.commands.exceptions import CommandException |
| from superset.commands.report.exceptions import ReportScheduleUnexpectedError |
| from superset.commands.report.execute import AsyncExecuteReportScheduleCommand |
| from superset.commands.report.log_prune import AsyncPruneReportScheduleLogCommand |
| from superset.daos.report import ReportScheduleDAO |
| from superset.extensions import celery_app |
| from superset.stats_logger import BaseStatsLogger |
| from superset.tasks.cron_util import cron_schedule_window |
| from superset.utils.core import LoggerLevel |
| from superset.utils.log import get_logger_from_status |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| @celery_app.task(name="reports.scheduler") |
| def scheduler() -> None: |
| """ |
| Celery beat main scheduler for reports |
| """ |
| stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"] |
| stats_logger.incr("reports.scheduler") |
| |
| if not is_feature_enabled("ALERT_REPORTS"): |
| return |
| active_schedules = ReportScheduleDAO.find_active() |
| triggered_at = ( |
| datetime.fromisoformat(scheduler.request.expires) |
| - app.config["CELERY_BEAT_SCHEDULER_EXPIRES"] |
| if scheduler.request.expires |
| else datetime.utcnow() |
| ) |
| for active_schedule in active_schedules: |
| for schedule in cron_schedule_window( |
| triggered_at, active_schedule.crontab, active_schedule.timezone |
| ): |
| logger.info("Scheduling alert %s eta: %s", active_schedule.name, schedule) |
| async_options = {"eta": schedule} |
| if ( |
| active_schedule.working_timeout is not None |
| and app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] |
| ): |
| async_options["time_limit"] = ( |
| active_schedule.working_timeout |
| + app.config["ALERT_REPORTS_WORKING_TIME_OUT_LAG"] |
| ) |
| async_options["soft_time_limit"] = ( |
| active_schedule.working_timeout |
| + app.config["ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG"] |
| ) |
| execute.apply_async((active_schedule.id,), **async_options) |
| |
| |
| @celery_app.task(name="reports.execute", bind=True) |
| def execute(self: Celery.task, report_schedule_id: int) -> None: |
| stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"] |
| stats_logger.incr("reports.execute") |
| |
| task_id = None |
| try: |
| task_id = execute.request.id |
| scheduled_dttm = execute.request.eta |
| logger.info( |
| "Executing alert/report, task id: %s, scheduled_dttm: %s", |
| task_id, |
| scheduled_dttm, |
| ) |
| AsyncExecuteReportScheduleCommand( |
| task_id, |
| report_schedule_id, |
| scheduled_dttm, |
| ).run() |
| except ReportScheduleUnexpectedError: |
| logger.exception( |
| "An unexpected occurred while executing the report: %s", task_id |
| ) |
| self.update_state(state="FAILURE") |
| except CommandException as ex: |
| logger_func, level = get_logger_from_status(ex.status) |
| logger_func( |
| f"A downstream {level} occurred " |
| f"while generating a report: {task_id}. {ex.message}", |
| exc_info=True, |
| ) |
| if level == LoggerLevel.EXCEPTION: |
| self.update_state(state="FAILURE") |
| |
| |
| @celery_app.task(name="reports.prune_log") |
| def prune_log() -> None: |
| stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"] |
| stats_logger.incr("reports.prune_log") |
| |
| try: |
| AsyncPruneReportScheduleLogCommand().run() |
| except SoftTimeLimitExceeded as ex: |
| logger.warning("A timeout occurred while pruning report schedule logs: %s", ex) |
| except CommandException: |
| logger.exception("An exception occurred while pruning report schedule logs") |