| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| import collections |
| import copy |
| import json |
| import logging |
| import math |
| import re |
| import socket |
| import sys |
| import traceback |
| import warnings |
| from collections import defaultdict |
| from datetime import timedelta |
| from functools import wraps |
| from json import JSONDecodeError |
| from operator import itemgetter |
| from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union |
| from urllib.parse import parse_qsl, unquote, urlencode, urlparse |
| |
| import lazy_object_proxy |
| import markupsafe |
| import nvd3 |
| import sqlalchemy as sqla |
| from flask import ( |
| Response, |
| abort, |
| before_render_template, |
| current_app, |
| flash, |
| g, |
| jsonify, |
| make_response, |
| redirect, |
| render_template, |
| request, |
| send_from_directory, |
| session as flask_session, |
| url_for, |
| ) |
| from flask_appbuilder import BaseView, ModelView, expose |
| from flask_appbuilder.actions import action |
| from flask_appbuilder.fieldwidgets import Select2Widget |
| from flask_appbuilder.models.sqla.filters import BaseFilter |
| from flask_appbuilder.security.decorators import has_access |
| from flask_appbuilder.security.views import ( |
| PermissionModelView, |
| PermissionViewModelView, |
| ResetMyPasswordView, |
| ResetPasswordView, |
| RoleModelView, |
| UserDBModelView, |
| UserInfoEditView, |
| UserLDAPModelView, |
| UserOAuthModelView, |
| UserOIDModelView, |
| UserRemoteUserModelView, |
| UserStatsChartView, |
| ViewMenuModelView, |
| ) |
| from flask_appbuilder.widgets import FormWidget |
| from flask_babel import lazy_gettext |
| from jinja2.utils import htmlsafe_json_dumps, pformat # type: ignore |
| from markupsafe import Markup, escape |
| from pendulum.datetime import DateTime |
| from pendulum.parsing.exceptions import ParserError |
| from pygments import highlight, lexers |
| from pygments.formatters import HtmlFormatter |
| from sqlalchemy import Date, and_, desc, func, inspect, union_all |
| from sqlalchemy.exc import IntegrityError |
| from sqlalchemy.orm import joinedload |
| from wtforms import SelectField, validators |
| from wtforms.validators import InputRequired |
| |
| import airflow |
| from airflow import models, plugins_manager, settings |
| from airflow.api.common.experimental.mark_tasks import ( |
| set_dag_run_state_to_failed, |
| set_dag_run_state_to_success, |
| ) |
| from airflow.compat.functools import cached_property |
| from airflow.configuration import AIRFLOW_CONFIG, conf |
| from airflow.exceptions import AirflowException |
| from airflow.executors.executor_loader import ExecutorLoader |
| from airflow.jobs.base_job import BaseJob |
| from airflow.jobs.scheduler_job import SchedulerJob |
| from airflow.jobs.triggerer_job import TriggererJob |
| from airflow.models import DAG, Connection, DagModel, DagTag, Log, SlaMiss, TaskFail, XCom, errors |
| from airflow.models.baseoperator import BaseOperator |
| from airflow.models.dagcode import DagCode |
| from airflow.models.dagrun import DagRun, DagRunType |
| from airflow.models.serialized_dag import SerializedDagModel |
| from airflow.models.taskinstance import TaskInstance |
| from airflow.providers_manager import ProvidersManager |
| from airflow.security import permissions |
| from airflow.ti_deps.dep_context import DepContext |
| from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS |
| from airflow.utils import json as utils_json, timezone, yaml |
| from airflow.utils.dates import infer_time_unit, scale_time_units |
| from airflow.utils.docs import get_doc_url_for_provider, get_docs_url |
| from airflow.utils.helpers import alchemy_to_dict |
| from airflow.utils.log import secrets_masker |
| from airflow.utils.log.log_reader import TaskLogReader |
| from airflow.utils.session import create_session, provide_session |
| from airflow.utils.state import State |
| from airflow.utils.strings import to_boolean |
| from airflow.utils.timezone import td_format, utcnow |
| from airflow.version import version |
| from airflow.www import auth, utils as wwwutils |
| from airflow.www.decorators import action_logging, gzipped |
| from airflow.www.forms import ( |
| ConnectionForm, |
| DagRunEditForm, |
| DateTimeForm, |
| DateTimeWithNumRunsForm, |
| DateTimeWithNumRunsWithDagRunsForm, |
| TaskInstanceEditForm, |
| ) |
| from airflow.www.widgets import AirflowModelListWidget |
| |
| PAGE_SIZE = conf.getint('webserver', 'page_size') |
| FILTER_TAGS_COOKIE = 'tags_filter' |
| FILTER_STATUS_COOKIE = 'dag_status_filter' |
| LINECHART_X_AXIS_TICKFORMAT = ( |
| "function (d, i) { let xLabel;" |
| "if (i === undefined) {xLabel = d3.time.format('%H:%M, %d %b %Y')(new Date(parseInt(d)));" |
| "} else {xLabel = d3.time.format('%H:%M, %d %b')(new Date(parseInt(d)));} return xLabel;}" |
| ) |
| |
| |
| def truncate_task_duration(task_duration): |
| """ |
| Cast the task_duration to an int was for optimization for large/huge dags if task_duration > 10s |
| otherwise we keep it as a float with 3dp |
| """ |
| return int(task_duration) if task_duration > 10.0 else round(task_duration, 3) |
| |
| |
| def get_safe_url(url): |
| """Given a user-supplied URL, ensure it points to our web server""" |
| valid_schemes = ['http', 'https', ''] |
| valid_netlocs = [request.host, ''] |
| |
| if not url: |
| return url_for('Airflow.index') |
| |
| parsed = urlparse(url) |
| |
| # If the url contains semicolon, redirect it to homepage to avoid |
| # potential XSS. (Similar to https://github.com/python/cpython/pull/24297/files (bpo-42967)) |
| if ';' in unquote(url): |
| return url_for('Airflow.index') |
| |
| query = parse_qsl(parsed.query, keep_blank_values=True) |
| |
| url = parsed._replace(query=urlencode(query)).geturl() |
| |
| if parsed.scheme in valid_schemes and parsed.netloc in valid_netlocs: |
| return url |
| |
| return url_for('Airflow.index') |
| |
| |
| def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag): |
| """Get Execution Data, Base Date & Number of runs from a Request""" |
| date_time = www_request.args.get('execution_date') |
| if date_time: |
| date_time = timezone.parse(date_time) |
| else: |
| date_time = dag.get_latest_execution_date(session=session) or timezone.utcnow() |
| |
| base_date = www_request.args.get('base_date') |
| if base_date: |
| base_date = timezone.parse(base_date) |
| else: |
| # The DateTimeField widget truncates milliseconds and would loose |
| # the first dag run. Round to next second. |
| base_date = (date_time + timedelta(seconds=1)).replace(microsecond=0) |
| |
| default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') |
| num_runs = www_request.args.get('num_runs', default=default_dag_run, type=int) |
| |
| drs = ( |
| session.query(DagRun) |
| .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date) |
| .order_by(desc(DagRun.execution_date)) |
| .limit(num_runs) |
| .all() |
| ) |
| dr_choices = [] |
| dr_state = None |
| for dr in drs: |
| dr_choices.append((dr.execution_date.isoformat(), dr.run_id)) |
| if date_time == dr.execution_date: |
| dr_state = dr.state |
| |
| # Happens if base_date was changed and the selected dag run is not in result |
| if not dr_state and drs: |
| dr = drs[0] |
| date_time = dr.execution_date |
| dr_state = dr.state |
| |
| return { |
| 'dttm': date_time, |
| 'base_date': base_date, |
| 'num_runs': num_runs, |
| 'execution_date': date_time.isoformat(), |
| 'dr_choices': dr_choices, |
| 'dr_state': dr_state, |
| } |
| |
| |
| def task_group_to_tree(task_item_or_group, dag, dag_runs, tis): |
| """ |
| Create a nested dict representation of this TaskGroup and its children used to construct |
| the Graph. |
| """ |
| if isinstance(task_item_or_group, BaseOperator): |
| return { |
| 'id': task_item_or_group.task_id, |
| 'instances': [wwwutils.encode_ti(ti) for ti in tis if ti.task_id == task_item_or_group.task_id], |
| 'label': task_item_or_group.label, |
| 'extra_links': task_item_or_group.extra_links, |
| } |
| |
| # Task Group |
| task_group = task_item_or_group |
| |
| children = [task_group_to_tree(child, dag, dag_runs, tis) for child in task_group.children.values()] |
| |
| def get_summary(dag_run, children): |
| priority = [ |
| 'failed', |
| 'upstream_failed', |
| 'up_for_retry', |
| 'up_for_reschedule', |
| 'queued', |
| 'scheduled', |
| 'deferred', |
| 'sensing', |
| 'running', |
| 'shutdown', |
| 'restarting', |
| 'removed', |
| 'no_status', |
| 'success', |
| 'skipped', |
| ] |
| |
| child_instances = [child['instances'] for child in children if 'instances' in child] |
| child_instances = [item for sublist in child_instances for item in sublist] |
| |
| children_start_dates = [ |
| item['start_date'] for item in child_instances if item['run_id'] == dag_run.run_id |
| ] |
| children_end_dates = [ |
| item['end_date'] for item in child_instances if item['run_id'] == dag_run.run_id |
| ] |
| children_states = [item['state'] for item in child_instances if item['run_id'] == dag_run.run_id] |
| |
| group_state = None |
| for state in priority: |
| if state in children_states: |
| group_state = state |
| break |
| group_start_date = wwwutils.datetime_to_string( |
| min((timezone.parse(date) for date in children_start_dates if date), default=None) |
| ) |
| group_end_date = wwwutils.datetime_to_string( |
| max((timezone.parse(date) for date in children_end_dates if date), default=None) |
| ) |
| |
| return { |
| 'task_id': task_group.group_id, |
| 'run_id': dag_run.run_id, |
| 'state': group_state, |
| 'start_date': group_start_date, |
| 'end_date': group_end_date, |
| } |
| |
| group_summaries = [get_summary(dr, children) for dr in dag_runs] |
| |
| return { |
| 'id': task_group.group_id, |
| 'label': task_group.label, |
| 'children': children, |
| 'tooltip': task_group.tooltip, |
| 'instances': group_summaries, |
| } |
| |
| |
| def task_group_to_dict(task_item_or_group): |
| """ |
| Create a nested dict representation of this TaskGroup and its children used to construct |
| the Graph. |
| """ |
| if isinstance(task_item_or_group, BaseOperator): |
| return { |
| 'id': task_item_or_group.task_id, |
| 'value': { |
| 'label': task_item_or_group.label, |
| 'labelStyle': f"fill:{task_item_or_group.ui_fgcolor};", |
| 'style': f"fill:{task_item_or_group.ui_color};", |
| 'rx': 5, |
| 'ry': 5, |
| }, |
| } |
| task_group = task_item_or_group |
| children = [ |
| task_group_to_dict(child) for child in sorted(task_group.children.values(), key=lambda t: t.label) |
| ] |
| |
| if task_group.upstream_group_ids or task_group.upstream_task_ids: |
| children.append( |
| { |
| 'id': task_group.upstream_join_id, |
| 'value': { |
| 'label': '', |
| 'labelStyle': f"fill:{task_group.ui_fgcolor};", |
| 'style': f"fill:{task_group.ui_color};", |
| 'shape': 'circle', |
| }, |
| } |
| ) |
| |
| if task_group.downstream_group_ids or task_group.downstream_task_ids: |
| # This is the join node used to reduce the number of edges between two TaskGroup. |
| children.append( |
| { |
| 'id': task_group.downstream_join_id, |
| 'value': { |
| 'label': '', |
| 'labelStyle': f"fill:{task_group.ui_fgcolor};", |
| 'style': f"fill:{task_group.ui_color};", |
| 'shape': 'circle', |
| }, |
| } |
| ) |
| |
| return { |
| "id": task_group.group_id, |
| 'value': { |
| 'label': task_group.label, |
| 'labelStyle': f"fill:{task_group.ui_fgcolor};", |
| 'style': f"fill:{task_group.ui_color}", |
| 'rx': 5, |
| 'ry': 5, |
| 'clusterLabelPos': 'top', |
| 'tooltip': task_group.tooltip, |
| }, |
| 'children': children, |
| } |
| |
| |
| def get_key_paths(input_dict): |
| """Return a list of dot-separated dictionary paths""" |
| for key, value in input_dict.items(): |
| if isinstance(value, dict): |
| for sub_key in get_key_paths(value): |
| yield '.'.join((key, sub_key)) |
| else: |
| yield key |
| |
| |
| def get_value_from_path(key_path, content): |
| """Return the value from a dictionary based on dot-separated path of keys""" |
| elem = content |
| for x in key_path.strip(".").split("."): |
| try: |
| x = int(x) |
| elem = elem[x] |
| except ValueError: |
| elem = elem.get(x) |
| |
| return elem |
| |
| |
| def dag_edges(dag): |
| """ |
| Create the list of edges needed to construct the Graph view. |
| |
| A special case is made if a TaskGroup is immediately upstream/downstream of another |
| TaskGroup or task. Two dummy nodes named upstream_join_id and downstream_join_id are |
| created for the TaskGroup. Instead of drawing an edge onto every task in the TaskGroup, |
| all edges are directed onto the dummy nodes. This is to cut down the number of edges on |
| the graph. |
| |
| For example: A DAG with TaskGroups group1 and group2: |
| group1: task1, task2, task3 |
| group2: task4, task5, task6 |
| |
| group2 is downstream of group1: |
| group1 >> group2 |
| |
| Edges to add (This avoids having to create edges between every task in group1 and group2): |
| task1 >> downstream_join_id |
| task2 >> downstream_join_id |
| task3 >> downstream_join_id |
| downstream_join_id >> upstream_join_id |
| upstream_join_id >> task4 |
| upstream_join_id >> task5 |
| upstream_join_id >> task6 |
| """ |
| # Edges to add between TaskGroup |
| edges_to_add = set() |
| # Edges to remove between individual tasks that are replaced by edges_to_add. |
| edges_to_skip = set() |
| |
| task_group_map = dag.task_group.get_task_group_dict() |
| |
| def collect_edges(task_group): |
| """Update edges_to_add and edges_to_skip according to TaskGroups.""" |
| if isinstance(task_group, BaseOperator): |
| return |
| |
| for target_id in task_group.downstream_group_ids: |
| # For every TaskGroup immediately downstream, add edges between downstream_join_id |
| # and upstream_join_id. Skip edges between individual tasks of the TaskGroups. |
| target_group = task_group_map[target_id] |
| edges_to_add.add((task_group.downstream_join_id, target_group.upstream_join_id)) |
| |
| for child in task_group.get_leaves(): |
| edges_to_add.add((child.task_id, task_group.downstream_join_id)) |
| for target in target_group.get_roots(): |
| edges_to_skip.add((child.task_id, target.task_id)) |
| edges_to_skip.add((child.task_id, target_group.upstream_join_id)) |
| |
| for child in target_group.get_roots(): |
| edges_to_add.add((target_group.upstream_join_id, child.task_id)) |
| edges_to_skip.add((task_group.downstream_join_id, child.task_id)) |
| |
| # For every individual task immediately downstream, add edges between downstream_join_id and |
| # the downstream task. Skip edges between individual tasks of the TaskGroup and the |
| # downstream task. |
| for target_id in task_group.downstream_task_ids: |
| edges_to_add.add((task_group.downstream_join_id, target_id)) |
| |
| for child in task_group.get_leaves(): |
| edges_to_add.add((child.task_id, task_group.downstream_join_id)) |
| edges_to_skip.add((child.task_id, target_id)) |
| |
| # For every individual task immediately upstream, add edges between the upstream task |
| # and upstream_join_id. Skip edges between the upstream task and individual tasks |
| # of the TaskGroup. |
| for source_id in task_group.upstream_task_ids: |
| edges_to_add.add((source_id, task_group.upstream_join_id)) |
| for child in task_group.get_roots(): |
| edges_to_add.add((task_group.upstream_join_id, child.task_id)) |
| edges_to_skip.add((source_id, child.task_id)) |
| |
| for child in task_group.children.values(): |
| collect_edges(child) |
| |
| collect_edges(dag.task_group) |
| |
| # Collect all the edges between individual tasks |
| edges = set() |
| |
| def get_downstream(task): |
| for child in task.downstream_list: |
| edge = (task.task_id, child.task_id) |
| if edge not in edges: |
| edges.add(edge) |
| get_downstream(child) |
| |
| for root in dag.roots: |
| get_downstream(root) |
| |
| result = [] |
| # Build result dicts with the two ends of the edge, plus any extra metadata |
| # if we have it. |
| for source_id, target_id in sorted(edges.union(edges_to_add) - edges_to_skip): |
| record = {"source_id": source_id, "target_id": target_id} |
| label = dag.get_edge_info(source_id, target_id).get("label") |
| if label: |
| record["label"] = label |
| result.append(record) |
| return result |
| |
| |
| ###################################################################################### |
| # Error handlers |
| ###################################################################################### |
| |
| |
| def not_found(error): |
| """Show Not Found on screen for any error in the Webserver""" |
| return ( |
| render_template( |
| 'airflow/not_found.html', |
| hostname=socket.getfqdn() |
| if conf.getboolean('webserver', 'EXPOSE_HOSTNAME', fallback=True) |
| else 'redact', |
| ), |
| 404, |
| ) |
| |
| |
| def show_traceback(error): |
| """Show Traceback for a given error""" |
| return ( |
| render_template( |
| 'airflow/traceback.html', |
| python_version=sys.version.split(" ")[0], |
| airflow_version=version, |
| hostname=socket.getfqdn() |
| if conf.getboolean('webserver', 'EXPOSE_HOSTNAME', fallback=True) |
| else 'redact', |
| info=traceback.format_exc() |
| if conf.getboolean('webserver', 'EXPOSE_STACKTRACE', fallback=True) |
| else 'Error! Please contact server admin.', |
| ), |
| 500, |
| ) |
| |
| |
| ###################################################################################### |
| # BaseViews |
| ###################################################################################### |
| |
| |
| class AirflowBaseView(BaseView): |
| """Base View to set Airflow related properties""" |
| |
| from airflow import macros |
| |
| route_base = '' |
| |
| extra_args = { |
| # Make our macros available to our UI templates too. |
| 'macros': macros, |
| 'get_docs_url': get_docs_url, |
| } |
| |
| if not conf.getboolean('core', 'unit_test_mode'): |
| extra_args['sqlite_warning'] = settings.engine.dialect.name == 'sqlite' |
| extra_args['sequential_executor_warning'] = conf.get('core', 'executor') == 'SequentialExecutor' |
| |
| line_chart_attr = { |
| 'legend.maxKeyLength': 200, |
| } |
| |
| def render_template(self, *args, **kwargs): |
| # Add triggerer_job only if we need it |
| if TriggererJob.is_needed(): |
| kwargs["triggerer_job"] = lazy_object_proxy.Proxy(TriggererJob.most_recent_job) |
| return super().render_template( |
| *args, |
| # Cache this at most once per request, not for the lifetime of the view instance |
| scheduler_job=lazy_object_proxy.Proxy(SchedulerJob.most_recent_job), |
| **kwargs, |
| ) |
| |
| |
| def add_user_permissions_to_dag(sender, template, context, **extra): |
| """ |
| Adds `.can_edit`, `.can_trigger`, and `.can_delete` properties |
| to DAG based on current user's permissions. |
| Located in `views.py` rather than the DAG model to keep |
| permissions logic out of the Airflow core. |
| """ |
| if 'dag' in context: |
| dag = context['dag'] |
| can_create_dag_run = current_app.appbuilder.sm.has_access( |
| permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN |
| ) |
| |
| dag.can_edit = current_app.appbuilder.sm.can_edit_dag(dag.dag_id) |
| dag.can_trigger = dag.can_edit and can_create_dag_run |
| dag.can_delete = current_app.appbuilder.sm.has_access( |
| permissions.ACTION_CAN_DELETE, |
| permissions.RESOURCE_DAG, |
| ) |
| context['dag'] = dag |
| |
| |
| before_render_template.connect(add_user_permissions_to_dag) |
| |
| |
| class Airflow(AirflowBaseView): |
| """Main Airflow application.""" |
| |
| @expose('/health') |
| def health(self): |
| """ |
| An endpoint helping check the health status of the Airflow instance, |
| including metadatabase and scheduler. |
| """ |
| payload = {'metadatabase': {'status': 'unhealthy'}} |
| |
| latest_scheduler_heartbeat = None |
| scheduler_status = 'unhealthy' |
| payload['metadatabase'] = {'status': 'healthy'} |
| try: |
| scheduler_job = SchedulerJob.most_recent_job() |
| |
| if scheduler_job: |
| latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat() |
| if scheduler_job.is_alive(): |
| scheduler_status = 'healthy' |
| except Exception: |
| payload['metadatabase']['status'] = 'unhealthy' |
| |
| payload['scheduler'] = { |
| 'status': scheduler_status, |
| 'latest_scheduler_heartbeat': latest_scheduler_heartbeat, |
| } |
| |
| return wwwutils.json_response(payload) |
| |
| @expose('/home') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), |
| ] |
| ) |
| def index(self): |
| """Home view.""" |
| hide_paused_dags_by_default = conf.getboolean('webserver', 'hide_paused_dags_by_default') |
| default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') |
| |
| num_runs = request.args.get('num_runs', default=default_dag_run, type=int) |
| current_page = request.args.get('page', default=0, type=int) |
| arg_search_query = request.args.get('search') |
| arg_tags_filter = request.args.getlist('tags') |
| arg_status_filter = request.args.get('status') |
| |
| if request.args.get('reset_tags') is not None: |
| flask_session[FILTER_TAGS_COOKIE] = None |
| # Remove the reset_tags=reset from the URL |
| return redirect(url_for('Airflow.index')) |
| |
| cookie_val = flask_session.get(FILTER_TAGS_COOKIE) |
| if arg_tags_filter: |
| flask_session[FILTER_TAGS_COOKIE] = ','.join(arg_tags_filter) |
| elif cookie_val: |
| # If tags exist in cookie, but not URL, add them to the URL |
| return redirect(url_for('Airflow.index', tags=cookie_val.split(','))) |
| |
| if arg_status_filter is None: |
| cookie_val = flask_session.get(FILTER_STATUS_COOKIE) |
| if cookie_val: |
| arg_status_filter = cookie_val |
| else: |
| arg_status_filter = 'active' if hide_paused_dags_by_default else 'all' |
| flask_session[FILTER_STATUS_COOKIE] = arg_status_filter |
| else: |
| status = arg_status_filter.strip().lower() |
| flask_session[FILTER_STATUS_COOKIE] = status |
| arg_status_filter = status |
| |
| dags_per_page = PAGE_SIZE |
| |
| start = current_page * dags_per_page |
| end = start + dags_per_page |
| |
| # Get all the dag id the user could access |
| filter_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids(g.user) |
| |
| with create_session() as session: |
| # read orm_dags from the db |
| dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active) |
| |
| if arg_search_query: |
| dags_query = dags_query.filter( |
| DagModel.dag_id.ilike('%' + arg_search_query + '%') |
| | DagModel.owners.ilike('%' + arg_search_query + '%') |
| ) |
| |
| if arg_tags_filter: |
| dags_query = dags_query.filter(DagModel.tags.any(DagTag.name.in_(arg_tags_filter))) |
| |
| dags_query = dags_query.filter(DagModel.dag_id.in_(filter_dag_ids)) |
| |
| all_dags = dags_query |
| active_dags = dags_query.filter(~DagModel.is_paused) |
| paused_dags = dags_query.filter(DagModel.is_paused) |
| |
| is_paused_count = dict( |
| all_dags.with_entities(DagModel.is_paused, func.count(DagModel.dag_id)) |
| .group_by(DagModel.is_paused) |
| .all() |
| ) |
| status_count_active = is_paused_count.get(False, 0) |
| status_count_paused = is_paused_count.get(True, 0) |
| all_dags_count = status_count_active + status_count_paused |
| if arg_status_filter == 'active': |
| current_dags = active_dags |
| num_of_all_dags = status_count_active |
| elif arg_status_filter == 'paused': |
| current_dags = paused_dags |
| num_of_all_dags = status_count_paused |
| else: |
| current_dags = all_dags |
| num_of_all_dags = all_dags_count |
| |
| dags = ( |
| current_dags.order_by(DagModel.dag_id) |
| .options(joinedload(DagModel.tags)) |
| .offset(start) |
| .limit(dags_per_page) |
| .all() |
| ) |
| |
| user_permissions = current_app.appbuilder.sm.get_current_user_permissions() |
| all_dags_editable = (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG) in user_permissions |
| can_create_dag_run = ( |
| permissions.ACTION_CAN_CREATE, |
| permissions.RESOURCE_DAG_RUN, |
| ) in user_permissions |
| |
| can_delete_dag = ( |
| permissions.ACTION_CAN_DELETE, |
| permissions.RESOURCE_DAG, |
| ) in user_permissions |
| |
| for dag in dags: |
| if all_dags_editable: |
| dag.can_edit = True |
| else: |
| dag_resource_name = permissions.RESOURCE_DAG_PREFIX + dag.dag_id |
| dag.can_edit = (permissions.ACTION_CAN_EDIT, dag_resource_name) in user_permissions |
| dag.can_trigger = dag.can_edit and can_create_dag_run |
| dag.can_delete = can_delete_dag |
| |
| dagtags = session.query(DagTag.name).distinct(DagTag.name).all() |
| tags = [ |
| {"name": name, "selected": bool(arg_tags_filter and name in arg_tags_filter)} |
| for name, in dagtags |
| ] |
| |
| import_errors = session.query(errors.ImportError).order_by(errors.ImportError.id) |
| |
| if (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG) not in user_permissions: |
| # if the user doesn't have access to all DAGs, only display errors from visible DAGs |
| import_errors = import_errors.join( |
| DagModel, DagModel.fileloc == errors.ImportError.filename |
| ).filter(DagModel.dag_id.in_(filter_dag_ids)) |
| |
| for import_error in import_errors: |
| flash( |
| "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=import_error), |
| "dag_import_error", |
| ) |
| |
| from airflow.plugins_manager import import_errors as plugin_import_errors |
| |
| for filename, stacktrace in plugin_import_errors.items(): |
| flash( |
| f"Broken plugin: [{filename}] {stacktrace}", |
| "error", |
| ) |
| |
| num_of_pages = int(math.ceil(num_of_all_dags / float(dags_per_page))) |
| |
| state_color_mapping = State.state_color.copy() |
| state_color_mapping["null"] = state_color_mapping.pop(None) |
| |
| page_title = conf.get(section="webserver", key="instance_name", fallback="DAGs") |
| |
| dashboard_alerts = [ |
| fm for fm in settings.DASHBOARD_UIALERTS if fm.should_show(current_app.appbuilder.sm) |
| ] |
| |
| def _iter_parsed_moved_data_table_names(): |
| for table_name in inspect(session.get_bind()).get_table_names(): |
| segments = table_name.split("__", 2) |
| if len(segments) < 3: |
| continue |
| if segments[0] != settings.AIRFLOW_MOVED_TABLE_PREFIX: |
| continue |
| # Second segment is a version marker that we don't need to show. |
| yield segments[2], table_name |
| |
| if ( |
| permissions.ACTION_CAN_ACCESS_MENU, |
| permissions.RESOURCE_ADMIN_MENU, |
| ) in user_permissions and conf.getboolean("webserver", "warn_deployment_exposure"): |
| robots_file_access_count = ( |
| session.query(Log) |
| .filter(Log.event == "robots") |
| .filter(Log.dttm > (utcnow() - timedelta(days=7))) |
| .count() |
| ) |
| if robots_file_access_count > 0: |
| flash( |
| Markup( |
| 'Recent requests have been made to /robots.txt. ' |
| 'This indicates that this deployment may be accessible to the public internet. ' |
| 'This warning can be disabled by setting webserver.warn_deployment_exposure=False in ' |
| 'airflow.cfg. Read more about web deployment security <a href=' |
| f'"{get_docs_url("security/webserver.html")}">' |
| 'here</a>' |
| ), |
| "warning", |
| ) |
| |
| return self.render_template( |
| 'airflow/dags.html', |
| dags=dags, |
| dashboard_alerts=dashboard_alerts, |
| migration_moved_data_alerts=sorted(set(_iter_parsed_moved_data_table_names())), |
| current_page=current_page, |
| search_query=arg_search_query if arg_search_query else '', |
| page_title=page_title, |
| page_size=dags_per_page, |
| num_of_pages=num_of_pages, |
| num_dag_from=min(start + 1, num_of_all_dags), |
| num_dag_to=min(end, num_of_all_dags), |
| num_of_all_dags=num_of_all_dags, |
| paging=wwwutils.generate_pages( |
| current_page, |
| num_of_pages, |
| search=escape(arg_search_query) if arg_search_query else None, |
| status=arg_status_filter if arg_status_filter else None, |
| tags=arg_tags_filter if arg_tags_filter else None, |
| ), |
| num_runs=num_runs, |
| tags=tags, |
| state_color=state_color_mapping, |
| status_filter=arg_status_filter, |
| status_count_all=all_dags_count, |
| status_count_active=status_count_active, |
| status_count_paused=status_count_paused, |
| tags_filter=arg_tags_filter, |
| ) |
| |
| @expose('/dag_stats', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), |
| ] |
| ) |
| @provide_session |
| def dag_stats(self, session=None): |
| """Dag statistics.""" |
| dr = models.DagRun |
| |
| allowed_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids(g.user) |
| |
| dag_state_stats = session.query(dr.dag_id, dr.state, sqla.func.count(dr.state)).group_by( |
| dr.dag_id, dr.state |
| ) |
| |
| # Filter by post parameters |
| selected_dag_ids = {unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id} |
| |
| if selected_dag_ids: |
| filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids) |
| else: |
| filter_dag_ids = allowed_dag_ids |
| |
| if not filter_dag_ids: |
| return wwwutils.json_response({}) |
| |
| payload = {} |
| dag_state_stats = dag_state_stats.filter(dr.dag_id.in_(filter_dag_ids)) |
| data = {} |
| |
| for dag_id, state, count in dag_state_stats: |
| if dag_id not in data: |
| data[dag_id] = {} |
| data[dag_id][state] = count |
| |
| for dag_id in filter_dag_ids: |
| payload[dag_id] = [] |
| for state in State.dag_states: |
| count = data.get(dag_id, {}).get(state, 0) |
| payload[dag_id].append({'state': state, 'count': count}) |
| |
| return wwwutils.json_response(payload) |
| |
| @expose('/task_stats', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @provide_session |
| def task_stats(self, session=None): |
| """Task Statistics""" |
| allowed_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids(g.user) |
| |
| if not allowed_dag_ids: |
| return wwwutils.json_response({}) |
| |
| # Filter by post parameters |
| selected_dag_ids = {unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id} |
| |
| if selected_dag_ids: |
| filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids) |
| else: |
| filter_dag_ids = allowed_dag_ids |
| |
| running_dag_run_query_result = ( |
| session.query(DagRun.dag_id, DagRun.run_id) |
| .join(DagModel, DagModel.dag_id == DagRun.dag_id) |
| .filter(DagRun.state == State.RUNNING, DagModel.is_active) |
| ) |
| |
| running_dag_run_query_result = running_dag_run_query_result.filter(DagRun.dag_id.in_(filter_dag_ids)) |
| |
| running_dag_run_query_result = running_dag_run_query_result.subquery('running_dag_run') |
| |
| # Select all task_instances from active dag_runs. |
| running_task_instance_query_result = session.query( |
| TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state') |
| ).join( |
| running_dag_run_query_result, |
| and_( |
| running_dag_run_query_result.c.dag_id == TaskInstance.dag_id, |
| running_dag_run_query_result.c.run_id == TaskInstance.run_id, |
| ), |
| ) |
| |
| if conf.getboolean('webserver', 'SHOW_RECENT_STATS_FOR_COMPLETED_RUNS', fallback=True): |
| |
| last_dag_run = ( |
| session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date')) |
| .join(DagModel, DagModel.dag_id == DagRun.dag_id) |
| .filter(DagRun.state != State.RUNNING, DagModel.is_active) |
| .group_by(DagRun.dag_id) |
| ) |
| |
| last_dag_run = last_dag_run.filter(DagRun.dag_id.in_(filter_dag_ids)) |
| last_dag_run = last_dag_run.subquery('last_dag_run') |
| |
| # Select all task_instances from active dag_runs. |
| # If no dag_run is active, return task instances from most recent dag_run. |
| last_task_instance_query_result = ( |
| session.query(TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state')) |
| .join(TaskInstance.dag_run) |
| .join( |
| last_dag_run, |
| and_( |
| last_dag_run.c.dag_id == TaskInstance.dag_id, |
| last_dag_run.c.execution_date == DagRun.execution_date, |
| ), |
| ) |
| ) |
| |
| final_task_instance_query_result = union_all( |
| last_task_instance_query_result, running_task_instance_query_result |
| ).alias('final_ti') |
| else: |
| final_task_instance_query_result = running_task_instance_query_result.subquery('final_ti') |
| |
| qry = session.query( |
| final_task_instance_query_result.c.dag_id, |
| final_task_instance_query_result.c.state, |
| sqla.func.count(), |
| ).group_by(final_task_instance_query_result.c.dag_id, final_task_instance_query_result.c.state) |
| |
| data = {} |
| for dag_id, state, count in qry: |
| if dag_id not in data: |
| data[dag_id] = {} |
| data[dag_id][state] = count |
| |
| payload = {} |
| for dag_id in filter_dag_ids: |
| payload[dag_id] = [] |
| for state in State.task_states: |
| count = data.get(dag_id, {}).get(state, 0) |
| payload[dag_id].append({'state': state, 'count': count}) |
| return wwwutils.json_response(payload) |
| |
| @expose('/last_dagruns', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), |
| ] |
| ) |
| @provide_session |
| def last_dagruns(self, session=None): |
| """Last DAG runs""" |
| allowed_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids(g.user) |
| |
| # Filter by post parameters |
| selected_dag_ids = {unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id} |
| |
| if selected_dag_ids: |
| filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids) |
| else: |
| filter_dag_ids = allowed_dag_ids |
| |
| if not filter_dag_ids: |
| return wwwutils.json_response({}) |
| |
| last_runs_subquery = ( |
| session.query( |
| DagRun.dag_id, |
| sqla.func.max(DagRun.execution_date).label("max_execution_date"), |
| ) |
| .group_by(DagRun.dag_id) |
| .filter(DagRun.dag_id.in_(filter_dag_ids)) # Only include accessible/selected DAGs. |
| .subquery("last_runs") |
| ) |
| |
| query = session.query( |
| DagRun.dag_id, |
| DagRun.start_date, |
| DagRun.end_date, |
| DagRun.state, |
| DagRun.execution_date, |
| DagRun.data_interval_start, |
| DagRun.data_interval_end, |
| ).join( |
| last_runs_subquery, |
| and_( |
| last_runs_subquery.c.dag_id == DagRun.dag_id, |
| last_runs_subquery.c.max_execution_date == DagRun.execution_date, |
| ), |
| ) |
| |
| resp = { |
| r.dag_id.replace('.', '__dot__'): { |
| "dag_id": r.dag_id, |
| "state": r.state, |
| "execution_date": wwwutils.datetime_to_string(r.execution_date), |
| "start_date": wwwutils.datetime_to_string(r.start_date), |
| "end_date": wwwutils.datetime_to_string(r.end_date), |
| "data_interval_start": wwwutils.datetime_to_string(r.data_interval_start), |
| "data_interval_end": wwwutils.datetime_to_string(r.data_interval_end), |
| } |
| for r in query |
| } |
| return wwwutils.json_response(resp) |
| |
| @expose('/code') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE), |
| ] |
| ) |
| @provide_session |
| def code(self, session=None): |
| """Dag Code.""" |
| all_errors = "" |
| dag_orm = None |
| dag_id = None |
| |
| try: |
| dag_id = request.args.get('dag_id') |
| dag_orm = DagModel.get_dagmodel(dag_id, session=session) |
| code = DagCode.get_code_by_fileloc(dag_orm.fileloc) |
| html_code = Markup(highlight(code, lexers.PythonLexer(), HtmlFormatter(linenos=True))) |
| |
| except Exception as e: |
| all_errors += ( |
| "Exception encountered during " |
| + f"dag_id retrieval/dag retrieval fallback/code highlighting:\n\n{e}\n" |
| ) |
| html_code = Markup('<p>Failed to load DAG file Code.</p><p>Details: {}</p>').format( |
| escape(all_errors) |
| ) |
| |
| wwwutils.check_import_errors(dag_orm.fileloc, session) |
| |
| return self.render_template( |
| 'airflow/dag_code.html', |
| html_code=html_code, |
| dag=dag_orm, |
| dag_model=dag_orm, |
| title=dag_id, |
| root=request.args.get('root'), |
| wrapped=conf.getboolean('webserver', 'default_wrap'), |
| ) |
| |
| @expose('/dag_details') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), |
| ] |
| ) |
| @provide_session |
| def dag_details(self, session=None): |
| """Get Dag details.""" |
| dag_id = request.args.get('dag_id') |
| dag = current_app.dag_bag.get_dag(dag_id) |
| dag_model = DagModel.get_dagmodel(dag_id) |
| |
| title = "DAG Details" |
| root = request.args.get('root', '') |
| |
| wwwutils.check_import_errors(dag.fileloc, session) |
| |
| states = ( |
| session.query(TaskInstance.state, sqla.func.count(TaskInstance.dag_id)) |
| .filter(TaskInstance.dag_id == dag_id) |
| .group_by(TaskInstance.state) |
| .all() |
| ) |
| |
| active_runs = models.DagRun.find(dag_id=dag_id, state=State.RUNNING, external_trigger=False) |
| |
| tags = session.query(models.DagTag).filter(models.DagTag.dag_id == dag_id).all() |
| |
| return self.render_template( |
| 'airflow/dag_details.html', |
| dag=dag, |
| title=title, |
| root=root, |
| states=states, |
| State=State, |
| active_runs=active_runs, |
| tags=tags, |
| dag_model=dag_model, |
| ) |
| |
| @expose('/rendered-templates') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| @provide_session |
| def rendered_templates(self, session): |
| """Get rendered Dag.""" |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| dttm = timezone.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| root = request.args.get('root', '') |
| |
| logging.info("Retrieving rendered templates.") |
| dag = current_app.dag_bag.get_dag(dag_id) |
| dag_run = dag.get_dagrun(execution_date=dttm, session=session) |
| task = copy.copy(dag.get_task(task_id)) |
| |
| if dag_run is None: |
| # No DAG run matching given logical date. This usually means this |
| # DAG has never been run. Task instance rendering does not really |
| # make sense in this situation, but "works" prior to AIP-39. This |
| # "fakes" a temporary DagRun-TaskInstance association (not saved to |
| # database) for presentation only. |
| ti = TaskInstance(task) |
| ti.dag_run = DagRun(dag_id=dag_id, execution_date=dttm) |
| else: |
| ti = dag_run.get_task_instance(task_id=task.task_id, session=session) |
| ti.refresh_from_task(task) |
| |
| try: |
| ti.get_rendered_template_fields(session=session) |
| except AirflowException as e: |
| msg = "Error rendering template: " + escape(e) |
| if e.__cause__: |
| msg += Markup("<br><br>OriginalError: ") + escape(e.__cause__) |
| flash(msg, "error") |
| except Exception as e: |
| flash("Error rendering template: " + str(e), "error") |
| |
| title = "Rendered Template" |
| html_dict = {} |
| renderers = wwwutils.get_attr_renderer() |
| |
| for template_field in task.template_fields: |
| content = getattr(task, template_field) |
| renderer = task.template_fields_renderers.get(template_field, template_field) |
| if renderer in renderers: |
| if isinstance(content, (dict, list)): |
| json_content = json.dumps(content, sort_keys=True, indent=4) |
| html_dict[template_field] = renderers[renderer](json_content) |
| else: |
| html_dict[template_field] = renderers[renderer](content) |
| else: |
| html_dict[template_field] = Markup("<pre><code>{}</pre></code>").format(pformat(content)) |
| |
| if isinstance(content, dict): |
| if template_field == 'op_kwargs': |
| for key, value in content.items(): |
| renderer = task.template_fields_renderers.get(key, key) |
| if renderer in renderers: |
| html_dict['.'.join([template_field, key])] = renderers[renderer](value) |
| else: |
| html_dict['.'.join([template_field, key])] = Markup( |
| "<pre><code>{}</pre></code>" |
| ).format(pformat(value)) |
| else: |
| for dict_keys in get_key_paths(content): |
| template_path = '.'.join((template_field, dict_keys)) |
| renderer = task.template_fields_renderers.get(template_path, template_path) |
| if renderer in renderers: |
| content_value = get_value_from_path(dict_keys, content) |
| html_dict[template_path] = renderers[renderer](content_value) |
| |
| return self.render_template( |
| 'airflow/ti_code.html', |
| html_dict=html_dict, |
| dag=dag, |
| task_id=task_id, |
| execution_date=execution_date, |
| form=form, |
| root=root, |
| title=title, |
| ) |
| |
| @expose('/rendered-k8s') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| def rendered_k8s(self): |
| """Get rendered k8s yaml.""" |
| if not settings.IS_K8S_OR_K8SCELERY_EXECUTOR: |
| abort(404) |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| dttm = timezone.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| root = request.args.get('root', '') |
| logging.info("Retrieving rendered templates.") |
| dag = current_app.dag_bag.get_dag(dag_id) |
| task = dag.get_task(task_id) |
| dag_run = dag.get_dagrun(execution_date=dttm) |
| ti = dag_run.get_task_instance(task_id=task.task_id) |
| |
| pod_spec = None |
| try: |
| pod_spec = ti.get_rendered_k8s_spec() |
| except AirflowException as e: |
| msg = "Error rendering Kubernetes POD Spec: " + escape(e) |
| if e.__cause__: |
| msg += Markup("<br><br>OriginalError: ") + escape(e.__cause__) |
| flash(msg, "error") |
| except Exception as e: |
| flash("Error rendering Kubernetes Pod Spec: " + str(e), "error") |
| title = "Rendered K8s Pod Spec" |
| html_dict = {} |
| renderers = wwwutils.get_attr_renderer() |
| if pod_spec: |
| content = yaml.dump(pod_spec) |
| content = renderers["yaml"](content) |
| else: |
| content = Markup("<pre><code>Error rendering Kubernetes POD Spec</pre></code>") |
| html_dict['k8s'] = content |
| |
| return self.render_template( |
| 'airflow/ti_code.html', |
| html_dict=html_dict, |
| dag=dag, |
| task_id=task_id, |
| execution_date=execution_date, |
| form=form, |
| root=root, |
| title=title, |
| ) |
| |
| @expose('/get_logs_with_metadata') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG), |
| ] |
| ) |
| @action_logging |
| @provide_session |
| def get_logs_with_metadata(self, session=None): |
| """Retrieve logs including metadata.""" |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| try_number = request.args.get('try_number', type=int) |
| metadata = request.args.get('metadata') |
| metadata = json.loads(metadata) |
| response_format = request.args.get('format', 'json') |
| |
| # metadata may be null |
| if not metadata: |
| metadata = {} |
| |
| # Convert string datetime into actual datetime |
| try: |
| execution_date = timezone.parse(execution_date) |
| except ValueError: |
| error_message = ( |
| f'Given execution date, {execution_date}, could not be identified as a date. ' |
| 'Example date format: 2015-11-16T14:34:15+00:00' |
| ) |
| response = jsonify({'error': error_message}) |
| response.status_code = 400 |
| |
| return response |
| |
| task_log_reader = TaskLogReader() |
| if not task_log_reader.supports_read: |
| return jsonify( |
| message="Task log handler does not support read logs.", |
| error=True, |
| metadata={"end_of_log": True}, |
| ) |
| |
| ti = ( |
| session.query(models.TaskInstance) |
| .filter( |
| models.TaskInstance.dag_id == dag_id, |
| models.TaskInstance.task_id == task_id, |
| models.TaskInstance.execution_date == execution_date, |
| ) |
| .first() |
| ) |
| |
| if ti is None: |
| return jsonify( |
| message="*** Task instance did not exist in the DB\n", |
| error=True, |
| metadata={"end_of_log": True}, |
| ) |
| |
| try: |
| dag = current_app.dag_bag.get_dag(dag_id) |
| if dag: |
| ti.task = dag.get_task(ti.task_id) |
| |
| if response_format == 'json': |
| logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata) |
| message = logs[0] if try_number is not None else logs |
| return jsonify(message=message, metadata=metadata) |
| |
| metadata['download_logs'] = True |
| attachment_filename = task_log_reader.render_log_filename(ti, try_number, session=session) |
| log_stream = task_log_reader.read_log_stream(ti, try_number, metadata) |
| return Response( |
| response=log_stream, |
| mimetype="text/plain", |
| headers={"Content-Disposition": f"attachment; filename={attachment_filename}"}, |
| ) |
| except AttributeError as e: |
| error_message = [f"Task log handler does not support read logs.\n{str(e)}\n"] |
| metadata['end_of_log'] = True |
| return jsonify(message=error_message, error=True, metadata=metadata) |
| |
| @expose('/log') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG), |
| ] |
| ) |
| @action_logging |
| @provide_session |
| def log(self, session=None): |
| """Retrieve log.""" |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| dttm = timezone.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| dag_model = DagModel.get_dagmodel(dag_id) |
| |
| ti = ( |
| session.query(models.TaskInstance) |
| .filter( |
| models.TaskInstance.dag_id == dag_id, |
| models.TaskInstance.task_id == task_id, |
| models.TaskInstance.execution_date == dttm, |
| ) |
| .first() |
| ) |
| |
| num_logs = 0 |
| if ti is not None: |
| num_logs = ti.next_try_number - 1 |
| if ti.state in (State.UP_FOR_RESCHEDULE, State.DEFERRED): |
| # Tasks in reschedule state decremented the try number |
| num_logs += 1 |
| logs = [''] * num_logs |
| root = request.args.get('root', '') |
| return self.render_template( |
| 'airflow/ti_log.html', |
| logs=logs, |
| dag=dag_model, |
| title="Log by attempts", |
| dag_id=dag_id, |
| task_id=task_id, |
| execution_date=execution_date, |
| form=form, |
| root=root, |
| wrapped=conf.getboolean('webserver', 'default_wrap'), |
| ) |
| |
| @expose('/redirect_to_external_log') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG), |
| ] |
| ) |
| @action_logging |
| @provide_session |
| def redirect_to_external_log(self, session=None): |
| """Redirects to external log.""" |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| dttm = timezone.parse(execution_date) |
| try_number = request.args.get('try_number', 1) |
| |
| ti = ( |
| session.query(models.TaskInstance) |
| .filter( |
| models.TaskInstance.dag_id == dag_id, |
| models.TaskInstance.task_id == task_id, |
| models.TaskInstance.execution_date == dttm, |
| ) |
| .first() |
| ) |
| |
| if not ti: |
| flash(f"Task [{dag_id}.{task_id}] does not exist", "error") |
| return redirect(url_for('Airflow.index')) |
| |
| task_log_reader = TaskLogReader() |
| if not task_log_reader.supports_external_link: |
| flash("Task log handler does not support external links", "error") |
| return redirect(url_for('Airflow.index')) |
| |
| handler = task_log_reader.log_handler |
| url = handler.get_external_log_url(ti, try_number) |
| return redirect(url) |
| |
| @expose('/task') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| @provide_session |
| def task(self, session): |
| """Retrieve task.""" |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| dttm = timezone.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| root = request.args.get('root', '') |
| dag = current_app.dag_bag.get_dag(dag_id) |
| |
| if not dag or task_id not in dag.task_ids: |
| flash(f"Task [{dag_id}.{task_id}] doesn't seem to exist at the moment", "error") |
| return redirect(url_for('Airflow.index')) |
| task = copy.copy(dag.get_task(task_id)) |
| task.resolve_template_files() |
| |
| ti: Optional[TaskInstance] = ( |
| session.query(TaskInstance) |
| .options( |
| # HACK: Eager-load relationships. This is needed because |
| # multiple properties mis-use provide_session() that destroys |
| # the session object ti is bounded to. |
| joinedload(TaskInstance.queued_by_job, innerjoin=False), |
| joinedload(TaskInstance.trigger, innerjoin=False), |
| ) |
| .join(TaskInstance.dag_run) |
| .filter( |
| DagRun.execution_date == dttm, |
| TaskInstance.dag_id == dag_id, |
| TaskInstance.task_id == task_id, |
| ) |
| .one_or_none() |
| ) |
| if ti is None: |
| ti_attrs: Optional[List[Tuple[str, Any]]] = None |
| else: |
| ti.refresh_from_task(task) |
| # Some fields on TI are deprecated, but we don't want those warnings here. |
| with warnings.catch_warnings(): |
| warnings.simplefilter("ignore", DeprecationWarning) |
| all_ti_attrs = ((name, getattr(ti, name)) for name in dir(ti) if not name.startswith("_")) |
| ti_attrs = sorted((name, attr) for name, attr in all_ti_attrs if not callable(attr)) |
| |
| attr_renderers = wwwutils.get_attr_renderer() |
| task_attrs = [ |
| (attr_name, attr) |
| for attr_name, attr in ( |
| (attr_name, getattr(task, attr_name)) |
| for attr_name in dir(task) |
| if not attr_name.startswith("_") and attr_name not in attr_renderers |
| ) |
| if not callable(attr) |
| ] |
| |
| # Color coding the special attributes that are code |
| special_attrs_rendered = { |
| attr_name: renderer(getattr(task, attr_name)) |
| for attr_name, renderer in attr_renderers.items() |
| if hasattr(task, attr_name) |
| } |
| |
| no_failed_deps_result = [ |
| ( |
| "Unknown", |
| "All dependencies are met but the task instance is not running. In most " |
| "cases this just means that the task will probably be scheduled soon " |
| "unless:<br>\n- The scheduler is down or under heavy load<br>\n{}\n" |
| "<br>\nIf this task instance does not start soon please contact your " |
| "Airflow administrator for assistance.".format( |
| "- This task instance already ran and had it's state changed manually " |
| "(e.g. cleared in the UI)<br>" |
| if ti and ti.state == State.NONE |
| else "" |
| ), |
| ) |
| ] |
| |
| # Use the scheduler's context to figure out which dependencies are not met |
| if ti is None: |
| failed_dep_reasons: List[Tuple[str, str]] = [] |
| else: |
| dep_context = DepContext(SCHEDULER_QUEUED_DEPS) |
| failed_dep_reasons = [ |
| (dep.dep_name, dep.reason) for dep in ti.get_failed_dep_statuses(dep_context=dep_context) |
| ] |
| |
| title = "Task Instance Details" |
| return self.render_template( |
| 'airflow/task.html', |
| task_attrs=task_attrs, |
| ti_attrs=ti_attrs, |
| failed_dep_reasons=failed_dep_reasons or no_failed_deps_result, |
| task_id=task_id, |
| execution_date=execution_date, |
| special_attrs_rendered=special_attrs_rendered, |
| form=form, |
| root=root, |
| dag=dag, |
| title=title, |
| ) |
| |
| @expose('/xcom') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_XCOM), |
| ] |
| ) |
| @action_logging |
| @provide_session |
| def xcom(self, session=None): |
| """Retrieve XCOM.""" |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| # Carrying execution_date through, even though it's irrelevant for |
| # this context |
| execution_date = request.args.get('execution_date') |
| dttm = timezone.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| root = request.args.get('root', '') |
| ti_db = models.TaskInstance |
| dag = DagModel.get_dagmodel(dag_id) |
| ti = session.query(ti_db).filter(and_(ti_db.dag_id == dag_id, ti_db.task_id == task_id)).first() |
| |
| if not ti: |
| flash(f"Task [{dag_id}.{task_id}] doesn't seem to exist at the moment", "error") |
| return redirect(url_for('Airflow.index')) |
| |
| xcomlist = ( |
| session.query(XCom) |
| .filter(XCom.dag_id == dag_id, XCom.task_id == task_id, XCom.execution_date == dttm) |
| .all() |
| ) |
| |
| attributes = [] |
| for xcom in xcomlist: |
| if not xcom.key.startswith('_'): |
| attributes.append((xcom.key, xcom.value)) |
| |
| title = "XCom" |
| return self.render_template( |
| 'airflow/xcom.html', |
| attributes=attributes, |
| task_id=task_id, |
| execution_date=execution_date, |
| form=form, |
| root=root, |
| dag=dag, |
| title=title, |
| ) |
| |
| @expose('/run', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| def run(self): |
| """Runs Task Instance.""" |
| dag_id = request.form.get('dag_id') |
| task_id = request.form.get('task_id') |
| origin = get_safe_url(request.form.get('origin')) |
| dag = current_app.dag_bag.get_dag(dag_id) |
| task = dag.get_task(task_id) |
| |
| execution_date = request.form.get('execution_date') |
| execution_date = timezone.parse(execution_date) |
| ignore_all_deps = request.form.get('ignore_all_deps') == "true" |
| ignore_task_deps = request.form.get('ignore_task_deps') == "true" |
| ignore_ti_state = request.form.get('ignore_ti_state') == "true" |
| |
| executor = ExecutorLoader.get_default_executor() |
| |
| if not getattr(executor, "supports_ad_hoc_ti_run", False): |
| flash("Only works with the Celery, CeleryKubernetes or Kubernetes executors, sorry", "error") |
| return redirect(origin) |
| |
| dag_run = dag.get_dagrun(execution_date=execution_date) |
| ti = dag_run.get_task_instance(task_id=task.task_id) |
| if not ti: |
| flash( |
| "Could not queue task instance for execution, task instance is missing", |
| "error", |
| ) |
| return redirect(origin) |
| |
| ti.refresh_from_task(task) |
| |
| # Make sure the task instance can be run |
| dep_context = DepContext( |
| deps=RUNNING_DEPS, |
| ignore_all_deps=ignore_all_deps, |
| ignore_task_deps=ignore_task_deps, |
| ignore_ti_state=ignore_ti_state, |
| ) |
| failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context)) |
| if failed_deps: |
| failed_deps_str = ", ".join(f"{dep.dep_name}: {dep.reason}" for dep in failed_deps) |
| flash( |
| f"Could not queue task instance for execution, dependencies not met: {failed_deps_str}", |
| "error", |
| ) |
| return redirect(origin) |
| |
| executor.job_id = "manual" |
| executor.start() |
| executor.queue_task_instance( |
| ti, |
| ignore_all_deps=ignore_all_deps, |
| ignore_task_deps=ignore_task_deps, |
| ignore_ti_state=ignore_ti_state, |
| ) |
| executor.heartbeat() |
| flash(f"Sent {ti} to the message queue, it should start any moment now.") |
| return redirect(origin) |
| |
| @expose('/delete', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG), |
| ] |
| ) |
| @action_logging |
| def delete(self): |
| """Deletes DAG.""" |
| from airflow.api.common import delete_dag |
| from airflow.exceptions import DagNotFound |
| |
| dag_id = request.values.get('dag_id') |
| origin = get_safe_url(request.values.get('origin')) |
| |
| try: |
| delete_dag.delete_dag(dag_id) |
| except DagNotFound: |
| flash(f"DAG with id {dag_id} not found. Cannot delete", 'error') |
| return redirect(request.referrer) |
| except AirflowException: |
| flash( |
| f"Cannot delete DAG with id {dag_id} because some task instances of the DAG " |
| "are still running. Please mark the task instances as " |
| "failed/succeeded before deleting the DAG", |
| "error", |
| ) |
| return redirect(request.referrer) |
| |
| flash(f"Deleting DAG with id {dag_id}. May take a couple minutes to fully disappear.") |
| |
| # Upon success return to origin. |
| return redirect(origin) |
| |
| @expose('/trigger', methods=['POST', 'GET']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN), |
| ] |
| ) |
| @action_logging |
| @provide_session |
| def trigger(self, session=None): |
| """Triggers DAG Run.""" |
| dag_id = request.values.get('dag_id') |
| origin = get_safe_url(request.values.get('origin')) |
| unpause = request.values.get('unpause') |
| request_conf = request.values.get('conf') |
| request_execution_date = request.values.get('execution_date', default=timezone.utcnow().isoformat()) |
| is_dag_run_conf_overrides_params = conf.getboolean('core', 'dag_run_conf_overrides_params') |
| dag = current_app.dag_bag.get_dag(dag_id) |
| dag_orm = session.query(models.DagModel).filter(models.DagModel.dag_id == dag_id).first() |
| if not dag_orm: |
| flash(f"Cannot find dag {dag_id}") |
| return redirect(origin) |
| |
| if dag_orm.has_import_errors: |
| flash(f"Cannot create dagruns because the dag {dag_id} has import errors", "error") |
| return redirect(origin) |
| |
| if request.method == 'GET': |
| # Populate conf textarea with conf requests parameter, or dag.params |
| default_conf = '' |
| |
| doc_md = wwwutils.wrapped_markdown(getattr(dag, 'doc_md', None)) |
| form = DateTimeForm(data={'execution_date': request_execution_date}) |
| |
| if request_conf: |
| default_conf = request_conf |
| else: |
| try: |
| default_conf = json.dumps( |
| {str(k): v.resolve(suppress_exception=True) for k, v in dag.params.items()}, indent=4 |
| ) |
| except TypeError: |
| flash("Could not pre-populate conf field due to non-JSON-serializable data-types") |
| return self.render_template( |
| 'airflow/trigger.html', |
| dag_id=dag_id, |
| origin=origin, |
| conf=default_conf, |
| doc_md=doc_md, |
| form=form, |
| is_dag_run_conf_overrides_params=is_dag_run_conf_overrides_params, |
| ) |
| |
| try: |
| execution_date = timezone.parse(request_execution_date) |
| except ParserError: |
| flash("Invalid execution date", "error") |
| form = DateTimeForm(data={'execution_date': timezone.utcnow().isoformat()}) |
| return self.render_template( |
| 'airflow/trigger.html', |
| dag_id=dag_id, |
| origin=origin, |
| conf=request_conf, |
| form=form, |
| is_dag_run_conf_overrides_params=is_dag_run_conf_overrides_params, |
| ) |
| |
| dr = DagRun.find(dag_id=dag_id, execution_date=execution_date, run_type=DagRunType.MANUAL) |
| if dr: |
| flash(f"This run_id {dr.run_id} already exists") |
| return redirect(origin) |
| |
| run_conf = {} |
| if request_conf: |
| try: |
| run_conf = json.loads(request_conf) |
| if not isinstance(run_conf, dict): |
| flash("Invalid JSON configuration, must be a dict", "error") |
| form = DateTimeForm(data={'execution_date': execution_date}) |
| return self.render_template( |
| 'airflow/trigger.html', |
| dag_id=dag_id, |
| origin=origin, |
| conf=request_conf, |
| form=form, |
| is_dag_run_conf_overrides_params=is_dag_run_conf_overrides_params, |
| ) |
| except json.decoder.JSONDecodeError: |
| flash("Invalid JSON configuration, not parseable", "error") |
| form = DateTimeForm(data={'execution_date': execution_date}) |
| return self.render_template( |
| 'airflow/trigger.html', |
| dag_id=dag_id, |
| origin=origin, |
| conf=request_conf, |
| form=form, |
| is_dag_run_conf_overrides_params=is_dag_run_conf_overrides_params, |
| ) |
| |
| if unpause and dag.is_paused: |
| models.DagModel.get_dagmodel(dag_id).set_is_paused(is_paused=False) |
| |
| try: |
| dag.create_dagrun( |
| run_type=DagRunType.MANUAL, |
| execution_date=execution_date, |
| data_interval=dag.timetable.infer_manual_data_interval(run_after=execution_date), |
| state=State.QUEUED, |
| conf=run_conf, |
| external_trigger=True, |
| dag_hash=current_app.dag_bag.dags_hash.get(dag_id), |
| ) |
| except ValueError as ve: |
| flash(f"{ve}", "error") |
| form = DateTimeForm(data={'execution_date': execution_date}) |
| return self.render_template( |
| 'airflow/trigger.html', |
| dag_id=dag_id, |
| origin=origin, |
| conf=request_conf, |
| form=form, |
| is_dag_run_conf_overrides_params=is_dag_run_conf_overrides_params, |
| ) |
| |
| flash(f"Triggered {dag_id}, it should start any moment now.") |
| return redirect(origin) |
| |
| def _clear_dag_tis( |
| self, dag, start_date, end_date, origin, recursive=False, confirmed=False, only_failed=False |
| ): |
| if confirmed: |
| count = dag.clear( |
| start_date=start_date, |
| end_date=end_date, |
| include_subdags=recursive, |
| include_parentdag=recursive, |
| only_failed=only_failed, |
| ) |
| |
| flash(f"{count} task instances have been cleared") |
| return redirect(origin) |
| |
| try: |
| tis = dag.clear( |
| start_date=start_date, |
| end_date=end_date, |
| include_subdags=recursive, |
| include_parentdag=recursive, |
| only_failed=only_failed, |
| dry_run=True, |
| ) |
| except AirflowException as ex: |
| flash(str(ex), 'error') |
| return redirect(origin) |
| |
| if not tis: |
| flash("No task instances to clear", 'error') |
| response = redirect(origin) |
| else: |
| details = "\n".join(str(t) for t in tis) |
| |
| response = self.render_template( |
| 'airflow/confirm.html', |
| endpoint=None, |
| message="Here's the list of task instances you are about to clear:", |
| details=details, |
| ) |
| |
| return response |
| |
| @expose('/clear', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| def clear(self): |
| """Clears the Dag.""" |
| dag_id = request.form.get('dag_id') |
| task_id = request.form.get('task_id') |
| origin = get_safe_url(request.form.get('origin')) |
| dag = current_app.dag_bag.get_dag(dag_id) |
| |
| execution_date = request.form.get('execution_date') |
| execution_date = timezone.parse(execution_date) |
| confirmed = request.form.get('confirmed') == "true" |
| upstream = request.form.get('upstream') == "true" |
| downstream = request.form.get('downstream') == "true" |
| future = request.form.get('future') == "true" |
| past = request.form.get('past') == "true" |
| recursive = request.form.get('recursive') == "true" |
| only_failed = request.form.get('only_failed') == "true" |
| |
| dag = dag.partial_subset( |
| task_ids_or_regex=fr"^{task_id}$", |
| include_downstream=downstream, |
| include_upstream=upstream, |
| ) |
| |
| end_date = execution_date if not future else None |
| start_date = execution_date if not past else None |
| |
| return self._clear_dag_tis( |
| dag, |
| start_date, |
| end_date, |
| origin, |
| recursive=recursive, |
| confirmed=confirmed, |
| only_failed=only_failed, |
| ) |
| |
| @expose('/dagrun_clear', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| def dagrun_clear(self): |
| """Clears the DagRun""" |
| dag_id = request.form.get('dag_id') |
| origin = get_safe_url(request.form.get('origin')) |
| execution_date = request.form.get('execution_date') |
| confirmed = request.form.get('confirmed') == "true" |
| |
| dag = current_app.dag_bag.get_dag(dag_id) |
| execution_date = timezone.parse(execution_date) |
| start_date = execution_date |
| end_date = execution_date |
| |
| return self._clear_dag_tis(dag, start_date, end_date, origin, recursive=True, confirmed=confirmed) |
| |
| @expose('/blocked', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), |
| ] |
| ) |
| @provide_session |
| def blocked(self, session=None): |
| """Mark Dag Blocked.""" |
| allowed_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids(g.user) |
| |
| # Filter by post parameters |
| selected_dag_ids = {unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id} |
| |
| if selected_dag_ids: |
| filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids) |
| else: |
| filter_dag_ids = allowed_dag_ids |
| |
| if not filter_dag_ids: |
| return wwwutils.json_response([]) |
| |
| dags = ( |
| session.query(DagRun.dag_id, sqla.func.count(DagRun.id)) |
| .filter(DagRun.state == State.RUNNING) |
| .filter(DagRun.dag_id.in_(filter_dag_ids)) |
| .group_by(DagRun.dag_id) |
| ) |
| |
| payload = [] |
| for dag_id, active_dag_runs in dags: |
| max_active_runs = 0 |
| dag = current_app.dag_bag.get_dag(dag_id) |
| if dag: |
| # TODO: Make max_active_runs a column so we can query for it directly |
| max_active_runs = dag.max_active_runs |
| payload.append( |
| { |
| 'dag_id': dag_id, |
| 'active_dag_run': active_dag_runs, |
| 'max_active_runs': max_active_runs, |
| } |
| ) |
| return wwwutils.json_response(payload) |
| |
| def _mark_dagrun_state_as_failed(self, dag_id, execution_date, confirmed, origin): |
| if not execution_date: |
| flash('Invalid execution date', 'error') |
| return redirect(origin) |
| |
| execution_date = timezone.parse(execution_date) |
| dag = current_app.dag_bag.get_dag(dag_id) |
| |
| if not dag: |
| flash(f'Cannot find DAG: {dag_id}', 'error') |
| return redirect(origin) |
| |
| new_dag_state = set_dag_run_state_to_failed(dag, execution_date, commit=confirmed) |
| |
| if confirmed: |
| flash(f'Marked failed on {len(new_dag_state)} task instances') |
| return redirect(origin) |
| |
| else: |
| details = '\n'.join(str(t) for t in new_dag_state) |
| |
| response = self.render_template( |
| 'airflow/confirm.html', |
| message="Here's the list of task instances you are about to mark as failed", |
| details=details, |
| ) |
| |
| return response |
| |
| def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed, origin): |
| if not execution_date: |
| flash('Invalid execution date', 'error') |
| return redirect(origin) |
| |
| execution_date = timezone.parse(execution_date) |
| dag = current_app.dag_bag.get_dag(dag_id) |
| |
| if not dag: |
| flash(f'Cannot find DAG: {dag_id}', 'error') |
| return redirect(origin) |
| |
| new_dag_state = set_dag_run_state_to_success(dag, execution_date, commit=confirmed) |
| |
| if confirmed: |
| flash(f'Marked success on {len(new_dag_state)} task instances') |
| return redirect(origin) |
| |
| else: |
| details = '\n'.join(str(t) for t in new_dag_state) |
| |
| response = self.render_template( |
| 'airflow/confirm.html', |
| message="Here's the list of task instances you are about to mark as success", |
| details=details, |
| ) |
| |
| return response |
| |
| @expose('/dagrun_failed', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN), |
| ] |
| ) |
| @action_logging |
| def dagrun_failed(self): |
| """Mark DagRun failed.""" |
| dag_id = request.form.get('dag_id') |
| execution_date = request.form.get('execution_date') |
| confirmed = request.form.get('confirmed') == 'true' |
| origin = get_safe_url(request.form.get('origin')) |
| return self._mark_dagrun_state_as_failed(dag_id, execution_date, confirmed, origin) |
| |
| @expose('/dagrun_success', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN), |
| ] |
| ) |
| @action_logging |
| def dagrun_success(self): |
| """Mark DagRun success""" |
| dag_id = request.form.get('dag_id') |
| execution_date = request.form.get('execution_date') |
| confirmed = request.form.get('confirmed') == 'true' |
| origin = get_safe_url(request.form.get('origin')) |
| return self._mark_dagrun_state_as_success(dag_id, execution_date, confirmed, origin) |
| |
| @expose("/dagrun_details") |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), |
| ] |
| ) |
| @action_logging |
| @provide_session |
| def dagrun_details(self, session=None): |
| """Retrieve DAG Run details.""" |
| dag_id = request.args.get("dag_id") |
| run_id = request.args.get("run_id") |
| |
| dag = current_app.dag_bag.get_dag(dag_id) |
| dag_run: Optional[DagRun] = ( |
| session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == run_id).one_or_none() |
| ) |
| |
| if dag_run is None: |
| flash(f"No DAG run found for DAG id {dag_id} and run id {run_id}", "error") |
| return redirect(request.referrer or url_for('Airflow.index')) |
| else: |
| try: |
| duration = dag_run.end_date - dag_run.start_date |
| except TypeError: |
| # Raised if end_date is None e.g. when DAG is still running |
| duration = None |
| |
| dagrun_attributes = [ |
| ("Logical date", wwwutils.datetime_html(dag_run.execution_date)), |
| ("Queued at", wwwutils.datetime_html(dag_run.queued_at)), |
| ("Start date", wwwutils.datetime_html(dag_run.start_date)), |
| ("End date", wwwutils.datetime_html(dag_run.end_date)), |
| ("Duration", str(duration)), |
| ("Current state", wwwutils.state_token(dag_run.state)), |
| ("Run type", dag_run.run_type), |
| ("Externally triggered", dag_run.external_trigger), |
| ("Config", wwwutils.json_render(dag_run.conf, lexers.JsonLexer)), |
| ] |
| |
| return self.render_template( |
| "airflow/dagrun_details.html", |
| dag=dag, |
| dag_id=dag_id, |
| run_id=run_id, |
| execution_date=dag_run.execution_date.isoformat(), |
| dagrun_attributes=dagrun_attributes, |
| ) |
| |
| def _mark_task_instance_state( |
| self, |
| dag_id, |
| task_id, |
| origin, |
| execution_date, |
| upstream, |
| downstream, |
| future, |
| past, |
| state, |
| ): |
| dag = current_app.dag_bag.get_dag(dag_id) |
| latest_execution_date = dag.get_latest_execution_date() |
| |
| if not latest_execution_date: |
| flash(f"Cannot mark tasks as {state}, seem that dag {dag_id} has never run", "error") |
| return redirect(origin) |
| |
| execution_date = timezone.parse(execution_date) |
| |
| altered = dag.set_task_instance_state( |
| task_id, execution_date, state, upstream=upstream, downstream=downstream, future=future, past=past |
| ) |
| |
| flash(f"Marked {state} on {len(altered)} task instances") |
| return redirect(origin) |
| |
| @expose('/confirm', methods=['GET']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| def confirm(self): |
| """Show confirmation page for marking tasks as success or failed.""" |
| args = request.args |
| dag_id = args.get('dag_id') |
| task_id = args.get('task_id') |
| execution_date = args.get('execution_date') |
| state = args.get('state') |
| |
| upstream = to_boolean(args.get('upstream')) |
| downstream = to_boolean(args.get('downstream')) |
| future = to_boolean(args.get('future')) |
| past = to_boolean(args.get('past')) |
| |
| dag = current_app.dag_bag.get_dag(dag_id) |
| if not dag: |
| flash(f'DAG {dag_id} not found', "error") |
| return redirect(request.referrer or url_for('Airflow.index')) |
| |
| try: |
| task = dag.get_task(task_id) |
| except airflow.exceptions.TaskNotFound: |
| flash(f"Task {task_id} not found", "error") |
| return redirect(request.referrer or url_for('Airflow.index')) |
| |
| task.dag = dag |
| |
| if state not in ( |
| 'success', |
| 'failed', |
| ): |
| flash(f"Invalid state {state}, must be either 'success' or 'failed'", "error") |
| return redirect(request.referrer or url_for('Airflow.index')) |
| |
| latest_execution_date = dag.get_latest_execution_date() |
| if not latest_execution_date: |
| flash(f"Cannot mark tasks as {state}, seem that dag {dag_id} has never run", "error") |
| return redirect(request.referrer or url_for('Airflow.index')) |
| |
| execution_date = timezone.parse(execution_date) |
| |
| from airflow.api.common.experimental.mark_tasks import set_state |
| |
| to_be_altered = set_state( |
| tasks=[task], |
| execution_date=execution_date, |
| upstream=upstream, |
| downstream=downstream, |
| future=future, |
| past=past, |
| state=state, |
| commit=False, |
| ) |
| |
| details = "\n".join(str(t) for t in to_be_altered) |
| |
| response = self.render_template( |
| "airflow/confirm.html", |
| endpoint=url_for(f'Airflow.{state}'), |
| message=f"Here's the list of task instances you are about to mark as {state}:", |
| details=details, |
| ) |
| |
| return response |
| |
| @expose('/failed', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| def failed(self): |
| """Mark task as failed.""" |
| args = request.form |
| dag_id = args.get('dag_id') |
| task_id = args.get('task_id') |
| origin = get_safe_url(args.get('origin')) |
| execution_date = args.get('execution_date') |
| |
| upstream = to_boolean(args.get('upstream')) |
| downstream = to_boolean(args.get('downstream')) |
| future = to_boolean(args.get('future')) |
| past = to_boolean(args.get('past')) |
| |
| return self._mark_task_instance_state( |
| dag_id, |
| task_id, |
| origin, |
| execution_date, |
| upstream, |
| downstream, |
| future, |
| past, |
| State.FAILED, |
| ) |
| |
| @expose('/success', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| def success(self): |
| """Mark task as success.""" |
| args = request.form |
| dag_id = args.get('dag_id') |
| task_id = args.get('task_id') |
| origin = get_safe_url(args.get('origin')) |
| execution_date = args.get('execution_date') |
| |
| upstream = to_boolean(args.get('upstream')) |
| downstream = to_boolean(args.get('downstream')) |
| future = to_boolean(args.get('future')) |
| past = to_boolean(args.get('past')) |
| |
| return self._mark_task_instance_state( |
| dag_id, |
| task_id, |
| origin, |
| execution_date, |
| upstream, |
| downstream, |
| future, |
| past, |
| State.SUCCESS, |
| ) |
| |
| @expose('/tree') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG), |
| ] |
| ) |
| @gzipped |
| @action_logging |
| @provide_session |
| def tree(self, session=None): |
| """Get Dag as tree.""" |
| dag_id = request.args.get('dag_id') |
| dag = current_app.dag_bag.get_dag(dag_id) |
| dag_model = DagModel.get_dagmodel(dag_id) |
| if not dag: |
| flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error") |
| return redirect(url_for('Airflow.index')) |
| wwwutils.check_import_errors(dag.fileloc, session) |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.partial_subset(task_ids_or_regex=root, include_downstream=False, include_upstream=True) |
| |
| num_runs = request.args.get('num_runs', type=int) |
| if num_runs is None: |
| num_runs = conf.getint('webserver', 'default_dag_run_display_number') |
| |
| try: |
| base_date = timezone.parse(request.args["base_date"]) |
| except (KeyError, ValueError): |
| base_date = dag.get_latest_execution_date() or timezone.utcnow() |
| |
| dag_runs = ( |
| session.query(DagRun) |
| .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date) |
| .order_by(DagRun.execution_date.desc()) |
| .limit(num_runs) |
| .all() |
| ) |
| dag_runs.reverse() |
| encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs] |
| dag_run_dates = {dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs} |
| |
| max_date = max(dag_run_dates, default=None) |
| |
| form = DateTimeWithNumRunsForm( |
| data={ |
| 'base_date': max_date or timezone.utcnow(), |
| 'num_runs': num_runs, |
| } |
| ) |
| |
| doc_md = wwwutils.wrapped_markdown(getattr(dag, 'doc_md', None)) |
| |
| task_log_reader = TaskLogReader() |
| if task_log_reader.supports_external_link: |
| external_log_name = task_log_reader.log_handler.log_name |
| else: |
| external_log_name = None |
| |
| min_date = min(dag_run_dates, default=None) |
| |
| tis = dag.get_task_instances(start_date=min_date, end_date=base_date, session=session) |
| |
| data = { |
| 'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis), |
| 'dag_runs': encoded_runs, |
| } |
| |
| # avoid spaces to reduce payload size |
| data = htmlsafe_json_dumps(data, separators=(',', ':')) |
| |
| return self.render_template( |
| 'airflow/tree.html', |
| operators=sorted({op.task_type: op for op in dag.tasks}.values(), key=lambda x: x.task_type), |
| root=root, |
| form=form, |
| dag=dag, |
| doc_md=doc_md, |
| data=data, |
| num_runs=num_runs, |
| show_external_log_redirect=task_log_reader.supports_external_link, |
| external_log_name=external_log_name, |
| dag_model=dag_model, |
| auto_refresh_interval=conf.getint('webserver', 'auto_refresh_interval'), |
| ) |
| |
| @expose('/calendar') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @gzipped |
| @action_logging |
| @provide_session |
| def calendar(self, session=None): |
| """Get DAG runs as calendar""" |
| |
| def _convert_to_date(session, column): |
| """Convert column to date.""" |
| if session.bind.dialect.name == 'mssql': |
| return column.cast(Date) |
| else: |
| return func.date(column) |
| |
| dag_id = request.args.get('dag_id') |
| dag = current_app.dag_bag.get_dag(dag_id) |
| dag_model = DagModel.get_dagmodel(dag_id) |
| if not dag: |
| flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error") |
| return redirect(url_for('Airflow.index')) |
| |
| wwwutils.check_import_errors(dag.fileloc, session) |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.partial_subset(task_ids_or_regex=root, include_downstream=False, include_upstream=True) |
| |
| dag_states = ( |
| session.query( |
| (_convert_to_date(session, DagRun.execution_date)).label('date'), |
| DagRun.state, |
| func.count('*').label('count'), |
| ) |
| .filter(DagRun.dag_id == dag.dag_id) |
| .group_by(_convert_to_date(session, DagRun.execution_date), DagRun.state) |
| .order_by(_convert_to_date(session, DagRun.execution_date).asc()) |
| .all() |
| ) |
| |
| dag_states = [ |
| { |
| # DATE() in SQLite and MySQL behave differently: |
| # SQLite returns a string, MySQL returns a date. |
| 'date': dr.date if isinstance(dr.date, str) else dr.date.isoformat(), |
| 'state': dr.state, |
| 'count': dr.count, |
| } |
| for dr in dag_states |
| ] |
| |
| data = { |
| 'dag_states': dag_states, |
| 'start_date': (dag.start_date or DateTime.utcnow()).date().isoformat(), |
| 'end_date': (dag.end_date or DateTime.utcnow()).date().isoformat(), |
| } |
| |
| doc_md = wwwutils.wrapped_markdown(getattr(dag, 'doc_md', None)) |
| |
| # avoid spaces to reduce payload size |
| data = htmlsafe_json_dumps(data, separators=(',', ':')) |
| |
| return self.render_template( |
| 'airflow/calendar.html', |
| dag=dag, |
| doc_md=doc_md, |
| data=data, |
| root=root, |
| dag_model=dag_model, |
| ) |
| |
| @expose('/graph') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG), |
| ] |
| ) |
| @gzipped |
| @action_logging |
| @provide_session |
| def graph(self, session=None): |
| """Get DAG as Graph.""" |
| dag_id = request.args.get('dag_id') |
| dag = current_app.dag_bag.get_dag(dag_id) |
| dag_model = DagModel.get_dagmodel(dag_id) |
| if not dag: |
| flash(f'DAG "{dag_id}" seems to be missing.', "error") |
| return redirect(url_for('Airflow.index')) |
| wwwutils.check_import_errors(dag.fileloc, session) |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False) |
| arrange = request.args.get('arrange', dag.orientation) |
| |
| nodes = task_group_to_dict(dag.task_group) |
| edges = dag_edges(dag) |
| |
| dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag) |
| dt_nr_dr_data['arrange'] = arrange |
| dttm = dt_nr_dr_data['dttm'] |
| |
| class GraphForm(DateTimeWithNumRunsWithDagRunsForm): |
| """Graph Form class.""" |
| |
| arrange = SelectField( |
| "Layout", |
| choices=( |
| ('LR', "Left > Right"), |
| ('RL', "Right > Left"), |
| ('TB', "Top > Bottom"), |
| ('BT', "Bottom > Top"), |
| ), |
| ) |
| |
| form = GraphForm(data=dt_nr_dr_data) |
| form.execution_date.choices = dt_nr_dr_data['dr_choices'] |
| |
| task_instances = {ti.task_id: alchemy_to_dict(ti) for ti in dag.get_task_instances(dttm, dttm)} |
| tasks = { |
| t.task_id: { |
| 'dag_id': t.dag_id, |
| 'task_type': t.task_type, |
| 'extra_links': t.extra_links, |
| } |
| for t in dag.tasks |
| } |
| if not tasks: |
| flash("No tasks found", "error") |
| session.commit() |
| doc_md = wwwutils.wrapped_markdown(getattr(dag, 'doc_md', None)) |
| |
| task_log_reader = TaskLogReader() |
| if task_log_reader.supports_external_link: |
| external_log_name = task_log_reader.log_handler.log_name |
| else: |
| external_log_name = None |
| |
| return self.render_template( |
| 'airflow/graph.html', |
| dag=dag, |
| form=form, |
| width=request.args.get('width', "100%"), |
| height=request.args.get('height', "800"), |
| execution_date=dttm.isoformat(), |
| state_token=wwwutils.state_token(dt_nr_dr_data['dr_state']), |
| doc_md=doc_md, |
| arrange=arrange, |
| operators=sorted({op.task_type: op for op in dag.tasks}.values(), key=lambda x: x.task_type), |
| root=root or '', |
| task_instances=task_instances, |
| tasks=tasks, |
| nodes=nodes, |
| edges=edges, |
| show_external_log_redirect=task_log_reader.supports_external_link, |
| external_log_name=external_log_name, |
| dag_run_state=dt_nr_dr_data['dr_state'], |
| dag_model=dag_model, |
| auto_refresh_interval=conf.getint('webserver', 'auto_refresh_interval'), |
| ) |
| |
| @expose('/duration') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| @provide_session |
| def duration(self, session=None): |
| """Get Dag as duration graph.""" |
| default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') |
| dag_id = request.args.get('dag_id') |
| dag_model = DagModel.get_dagmodel(dag_id) |
| |
| dag: Optional[DAG] = current_app.dag_bag.get_dag(dag_id) |
| if dag is None: |
| flash(f'DAG "{dag_id}" seems to be missing.', "error") |
| return redirect(url_for('Airflow.index')) |
| |
| wwwutils.check_import_errors(dag.fileloc, session) |
| |
| base_date = request.args.get('base_date') |
| num_runs = request.args.get('num_runs', default=default_dag_run, type=int) |
| |
| if base_date: |
| base_date = timezone.parse(base_date) |
| else: |
| base_date = dag.get_latest_execution_date() or timezone.utcnow() |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False) |
| chart_height = wwwutils.get_chart_height(dag) |
| chart = nvd3.lineChart( |
| name="lineChart", |
| x_custom_format=True, |
| x_axis_date=True, |
| x_axis_format=LINECHART_X_AXIS_TICKFORMAT, |
| height=chart_height, |
| chart_attr=self.line_chart_attr, |
| ) |
| cum_chart = nvd3.lineChart( |
| name="cumLineChart", |
| x_custom_format=True, |
| x_axis_date=True, |
| x_axis_format=LINECHART_X_AXIS_TICKFORMAT, |
| height=chart_height, |
| chart_attr=self.line_chart_attr, |
| ) |
| |
| y_points = defaultdict(list) |
| x_points = defaultdict(list) |
| cumulative_y = defaultdict(list) |
| |
| task_instances = dag.get_task_instances_before(base_date, num_runs, session=session) |
| if task_instances: |
| min_date = task_instances[0].execution_date |
| else: |
| min_date = timezone.utc_epoch() |
| ti_fails = ( |
| session.query(TaskFail) |
| .filter( |
| TaskFail.dag_id == dag.dag_id, |
| TaskFail.execution_date >= min_date, |
| TaskFail.execution_date <= base_date, |
| TaskFail.task_id.in_([t.task_id for t in dag.tasks]), |
| ) |
| .all() |
| ) |
| |
| fails_totals = defaultdict(int) |
| for failed_task_instance in ti_fails: |
| dict_key = ( |
| failed_task_instance.dag_id, |
| failed_task_instance.task_id, |
| failed_task_instance.execution_date, |
| ) |
| if failed_task_instance.duration: |
| fails_totals[dict_key] += failed_task_instance.duration |
| |
| for task_instance in task_instances: |
| if task_instance.duration: |
| date_time = wwwutils.epoch(task_instance.execution_date) |
| x_points[task_instance.task_id].append(date_time) |
| y_points[task_instance.task_id].append(float(task_instance.duration)) |
| fails_dict_key = (task_instance.dag_id, task_instance.task_id, task_instance.execution_date) |
| fails_total = fails_totals[fails_dict_key] |
| cumulative_y[task_instance.task_id].append(float(task_instance.duration + fails_total)) |
| |
| # determine the most relevant time unit for the set of task instance |
| # durations for the DAG |
| y_unit = infer_time_unit([d for t in y_points.values() for d in t]) |
| cum_y_unit = infer_time_unit([d for t in cumulative_y.values() for d in t]) |
| # update the y Axis on both charts to have the correct time units |
| chart.create_y_axis('yAxis', format='.02f', custom_format=False, label=f'Duration ({y_unit})') |
| chart.axislist['yAxis']['axisLabelDistance'] = '-15' |
| cum_chart.create_y_axis('yAxis', format='.02f', custom_format=False, label=f'Duration ({cum_y_unit})') |
| cum_chart.axislist['yAxis']['axisLabelDistance'] = '-15' |
| |
| for task_id in x_points: |
| chart.add_serie( |
| name=task_id, |
| x=x_points[task_id], |
| y=scale_time_units(y_points[task_id], y_unit), |
| ) |
| cum_chart.add_serie( |
| name=task_id, |
| x=x_points[task_id], |
| y=scale_time_units(cumulative_y[task_id], cum_y_unit), |
| ) |
| |
| dates = sorted({ti.execution_date for ti in task_instances}) |
| max_date = max(ti.execution_date for ti in task_instances) if dates else None |
| |
| session.commit() |
| |
| form = DateTimeWithNumRunsForm( |
| data={ |
| 'base_date': max_date or timezone.utcnow(), |
| 'num_runs': num_runs, |
| } |
| ) |
| chart.buildcontent() |
| cum_chart.buildcontent() |
| s_index = cum_chart.htmlcontent.rfind('});') |
| cum_chart.htmlcontent = ( |
| cum_chart.htmlcontent[:s_index] |
| + "$( document ).trigger('chartload')" |
| + cum_chart.htmlcontent[s_index:] |
| ) |
| |
| return self.render_template( |
| 'airflow/duration_chart.html', |
| dag=dag, |
| root=root, |
| form=form, |
| chart=Markup(chart.htmlcontent), |
| cum_chart=Markup(cum_chart.htmlcontent), |
| dag_model=dag_model, |
| ) |
| |
| @expose('/tries') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| @provide_session |
| def tries(self, session=None): |
| """Shows all tries.""" |
| default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') |
| dag_id = request.args.get('dag_id') |
| dag = current_app.dag_bag.get_dag(dag_id) |
| dag_model = DagModel.get_dagmodel(dag_id) |
| base_date = request.args.get('base_date') |
| num_runs = request.args.get('num_runs', default=default_dag_run, type=int) |
| |
| if base_date: |
| base_date = timezone.parse(base_date) |
| else: |
| base_date = dag.get_latest_execution_date() or timezone.utcnow() |
| |
| wwwutils.check_import_errors(dag.fileloc, session) |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False) |
| |
| chart_height = wwwutils.get_chart_height(dag) |
| chart = nvd3.lineChart( |
| name="lineChart", |
| x_custom_format=True, |
| x_axis_date=True, |
| x_axis_format=LINECHART_X_AXIS_TICKFORMAT, |
| height=chart_height, |
| chart_attr=self.line_chart_attr, |
| ) |
| |
| tis = dag.get_task_instances_before(base_date, num_runs, session=session) |
| for task in dag.tasks: |
| y_points = [] |
| x_points = [] |
| for ti in tis: |
| dttm = wwwutils.epoch(ti.execution_date) |
| x_points.append(dttm) |
| # y value should reflect completed tries to have a 0 baseline. |
| y_points.append(ti.prev_attempted_tries) |
| if x_points: |
| chart.add_serie(name=task.task_id, x=x_points, y=y_points) |
| |
| tries = sorted({ti.try_number for ti in tis}) |
| max_date = max(ti.execution_date for ti in tis) if tries else None |
| chart.create_y_axis('yAxis', format='.02f', custom_format=False, label='Tries') |
| chart.axislist['yAxis']['axisLabelDistance'] = '-15' |
| |
| session.commit() |
| |
| form = DateTimeWithNumRunsForm( |
| data={ |
| 'base_date': max_date or timezone.utcnow(), |
| 'num_runs': num_runs, |
| } |
| ) |
| |
| chart.buildcontent() |
| |
| return self.render_template( |
| 'airflow/chart.html', |
| dag=dag, |
| root=root, |
| form=form, |
| chart=Markup(chart.htmlcontent), |
| tab_title='Tries', |
| dag_model=dag_model, |
| ) |
| |
| @expose('/landing_times') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| @provide_session |
| def landing_times(self, session=None): |
| """Shows landing times.""" |
| default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') |
| dag_id = request.args.get('dag_id') |
| dag: DAG = current_app.dag_bag.get_dag(dag_id) |
| dag_model = DagModel.get_dagmodel(dag_id) |
| base_date = request.args.get('base_date') |
| num_runs = request.args.get('num_runs', default=default_dag_run, type=int) |
| |
| if base_date: |
| base_date = timezone.parse(base_date) |
| else: |
| base_date = dag.get_latest_execution_date() or timezone.utcnow() |
| |
| wwwutils.check_import_errors(dag.fileloc, session) |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False) |
| |
| tis = dag.get_task_instances_before(base_date, num_runs, session=session) |
| |
| chart_height = wwwutils.get_chart_height(dag) |
| chart = nvd3.lineChart( |
| name="lineChart", |
| x_custom_format=True, |
| x_axis_date=True, |
| x_axis_format=LINECHART_X_AXIS_TICKFORMAT, |
| height=chart_height, |
| chart_attr=self.line_chart_attr, |
| ) |
| y_points = {} |
| x_points = {} |
| for task in dag.tasks: |
| task_id = task.task_id |
| y_points[task_id] = [] |
| x_points[task_id] = [] |
| for ti in tis: |
| ts = dag.get_run_data_interval(ti.dag_run).end |
| if ti.end_date: |
| dttm = wwwutils.epoch(ti.execution_date) |
| secs = (ti.end_date - ts).total_seconds() |
| x_points[task_id].append(dttm) |
| y_points[task_id].append(secs) |
| |
| # determine the most relevant time unit for the set of landing times |
| # for the DAG |
| y_unit = infer_time_unit([d for t in y_points.values() for d in t]) |
| # update the y Axis to have the correct time units |
| chart.create_y_axis('yAxis', format='.02f', custom_format=False, label=f'Landing Time ({y_unit})') |
| chart.axislist['yAxis']['axisLabelDistance'] = '-15' |
| |
| for task_id in x_points: |
| chart.add_serie( |
| name=task_id, |
| x=x_points[task_id], |
| y=scale_time_units(y_points[task_id], y_unit), |
| ) |
| |
| dates = sorted({ti.execution_date for ti in tis}) |
| max_date = max(ti.execution_date for ti in tis) if dates else None |
| |
| session.commit() |
| |
| form = DateTimeWithNumRunsForm( |
| data={ |
| 'base_date': max_date or timezone.utcnow(), |
| 'num_runs': num_runs, |
| } |
| ) |
| chart.buildcontent() |
| return self.render_template( |
| 'airflow/chart.html', |
| dag=dag, |
| chart=Markup(chart.htmlcontent), |
| height=str(chart_height + 100) + "px", |
| root=root, |
| form=form, |
| tab_title='Landing times', |
| dag_model=dag_model, |
| ) |
| |
| @expose('/paused', methods=['POST']) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), |
| ] |
| ) |
| @action_logging |
| def paused(self): |
| """Toggle paused.""" |
| dag_id = request.args.get('dag_id') |
| is_paused = request.args.get('is_paused') == 'false' |
| models.DagModel.get_dagmodel(dag_id).set_is_paused(is_paused=is_paused) |
| return "OK" |
| |
| @expose('/gantt') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| @provide_session |
| def gantt(self, session=None): |
| """Show GANTT chart.""" |
| dag_id = request.args.get('dag_id') |
| dag = current_app.dag_bag.get_dag(dag_id) |
| dag_model = DagModel.get_dagmodel(dag_id) |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False) |
| |
| wwwutils.check_import_errors(dag.fileloc, session) |
| |
| dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag) |
| dttm = dt_nr_dr_data['dttm'] |
| |
| form = DateTimeWithNumRunsWithDagRunsForm(data=dt_nr_dr_data) |
| form.execution_date.choices = dt_nr_dr_data['dr_choices'] |
| |
| tis = ( |
| session.query(TaskInstance) |
| .join(TaskInstance.dag_run) |
| .filter( |
| DagRun.execution_date == dttm, |
| TaskInstance.dag_id == dag_id, |
| TaskInstance.start_date.isnot(None), |
| TaskInstance.state.isnot(None), |
| ) |
| .order_by(TaskInstance.start_date) |
| ) |
| |
| ti_fails = ( |
| session.query(TaskFail) |
| .join(DagRun, DagRun.execution_date == TaskFail.execution_date) |
| .filter(DagRun.execution_date == dttm, TaskFail.dag_id == dag_id) |
| ) |
| |
| tasks = [] |
| for ti in tis: |
| # prev_attempted_tries will reflect the currently running try_number |
| # or the try_number of the last complete run |
| # https://issues.apache.org/jira/browse/AIRFLOW-2143 |
| try_count = ti.prev_attempted_tries if ti.prev_attempted_tries != 0 else ti.try_number |
| task_dict = alchemy_to_dict(ti) |
| task_dict['end_date'] = task_dict['end_date'] or timezone.utcnow() |
| task_dict['extraLinks'] = dag.get_task(ti.task_id).extra_links |
| task_dict['try_number'] = try_count |
| task_dict['execution_date'] = dttm.isoformat() |
| tasks.append(task_dict) |
| |
| tf_count = 0 |
| try_count = 1 |
| prev_task_id = "" |
| for failed_task_instance in ti_fails: |
| if tf_count != 0 and failed_task_instance.task_id == prev_task_id: |
| try_count += 1 |
| else: |
| try_count = 1 |
| prev_task_id = failed_task_instance.task_id |
| tf_count += 1 |
| task = dag.get_task(failed_task_instance.task_id) |
| task_dict = alchemy_to_dict(failed_task_instance) |
| end_date = task_dict['end_date'] or timezone.utcnow() |
| task_dict['end_date'] = end_date |
| task_dict['start_date'] = task_dict['start_date'] or end_date |
| task_dict['state'] = State.FAILED |
| task_dict['operator'] = task.task_type |
| task_dict['try_number'] = try_count |
| task_dict['extraLinks'] = task.extra_links |
| task_dict['execution_date'] = dttm.isoformat() |
| tasks.append(task_dict) |
| |
| task_names = [ti.task_id for ti in tis] |
| data = { |
| 'taskNames': task_names, |
| 'tasks': tasks, |
| 'height': len(task_names) * 25 + 25, |
| } |
| |
| session.commit() |
| |
| return self.render_template( |
| 'airflow/gantt.html', |
| dag=dag, |
| execution_date=dttm.isoformat(), |
| form=form, |
| data=data, |
| base_date='', |
| root=root, |
| dag_model=dag_model, |
| ) |
| |
| @expose('/extra_links') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| def extra_links(self): |
| """ |
| A restful endpoint that returns external links for a given Operator |
| |
| It queries the operator that sent the request for the links it wishes |
| to provide for a given external link name. |
| |
| API: GET |
| Args: dag_id: The id of the dag containing the task in question |
| task_id: The id of the task in question |
| execution_date: The date of execution of the task |
| link_name: The name of the link reference to find the actual URL for |
| |
| Returns: |
| 200: {url: <url of link>, error: None} - returned when there was no problem |
| finding the URL |
| 404: {url: None, error: <error message>} - returned when the operator does |
| not return a URL |
| """ |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| link_name = request.args.get('link_name') |
| dttm = timezone.parse(execution_date) |
| dag = current_app.dag_bag.get_dag(dag_id) |
| |
| if not dag or task_id not in dag.task_ids: |
| response = jsonify( |
| { |
| 'url': None, |
| 'error': f"can't find dag {dag} or task_id {task_id}", |
| } |
| ) |
| response.status_code = 404 |
| return response |
| |
| task = dag.get_task(task_id) |
| try: |
| url = task.get_extra_links(dttm, link_name) |
| except ValueError as err: |
| response = jsonify({'url': None, 'error': str(err)}) |
| response.status_code = 404 |
| return response |
| if url: |
| response = jsonify({'error': None, 'url': url}) |
| response.status_code = 200 |
| return response |
| else: |
| response = jsonify({'url': None, 'error': f'No URL found for {link_name}'}) |
| response.status_code = 404 |
| return response |
| |
| @expose('/object/task_instances') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| def task_instances(self): |
| """Shows task instances.""" |
| dag_id = request.args.get('dag_id') |
| dag = current_app.dag_bag.get_dag(dag_id) |
| |
| dttm = request.args.get('execution_date') |
| if dttm: |
| dttm = timezone.parse(dttm) |
| else: |
| return "Error: Invalid execution_date" |
| |
| task_instances = {ti.task_id: alchemy_to_dict(ti) for ti in dag.get_task_instances(dttm, dttm)} |
| |
| return json.dumps(task_instances, cls=utils_json.AirflowJsonEncoder) |
| |
| @expose('/object/tree_data') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| ] |
| ) |
| @action_logging |
| def tree_data(self): |
| """Returns tree data""" |
| dag_id = request.args.get('dag_id') |
| dag = current_app.dag_bag.get_dag(dag_id) |
| |
| if not dag: |
| response = jsonify({'error': f"can't find dag {dag_id}"}) |
| response.status_code = 404 |
| return response |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.partial_subset(task_ids_or_regex=root, include_downstream=False, include_upstream=True) |
| |
| num_runs = request.args.get('num_runs', type=int) |
| if num_runs is None: |
| num_runs = conf.getint('webserver', 'default_dag_run_display_number') |
| |
| try: |
| base_date = timezone.parse(request.args["base_date"]) |
| except (KeyError, ValueError): |
| base_date = dag.get_latest_execution_date() or timezone.utcnow() |
| |
| with create_session() as session: |
| dag_runs = ( |
| session.query(DagRun) |
| .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date) |
| .order_by(DagRun.execution_date.desc()) |
| .limit(num_runs) |
| .all() |
| ) |
| dag_runs.reverse() |
| encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs] |
| dag_run_dates = {dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs} |
| min_date = min(dag_run_dates, default=None) |
| tis = dag.get_task_instances(start_date=min_date, end_date=base_date, session=session) |
| data = { |
| 'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis), |
| 'dag_runs': encoded_runs, |
| } |
| |
| # avoid spaces to reduce payload size |
| return htmlsafe_json_dumps(data, separators=(',', ':')) |
| |
| @expose('/robots.txt') |
| @action_logging |
| def robots(self): |
| """ |
| Returns a robots.txt file for blocking certain search engine crawlers. This mitigates some |
| of the risk associated with exposing Airflow to the public internet, however it does not |
| address the real security risks associated with such a deployment. |
| """ |
| return send_from_directory(current_app.static_folder, 'robots.txt') |
| |
| |
| class ConfigurationView(AirflowBaseView): |
| """View to show Airflow Configurations""" |
| |
| default_view = 'conf' |
| |
| class_permission_name = permissions.RESOURCE_CONFIG |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| @expose('/configuration') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_CONFIG), |
| ] |
| ) |
| def conf(self): |
| """Shows configuration.""" |
| raw = request.args.get('raw') == "true" |
| title = "Airflow Configuration" |
| subtitle = AIRFLOW_CONFIG |
| # Don't show config when expose_config variable is False in airflow config |
| if conf.getboolean("webserver", "expose_config"): |
| with open(AIRFLOW_CONFIG) as file: |
| config = file.read() |
| table = [ |
| (section, key, value, source) |
| for section, parameters in conf.as_dict(True, True).items() |
| for key, (value, source) in parameters.items() |
| ] |
| else: |
| config = ( |
| "# Your Airflow administrator chose not to expose the " |
| "configuration, most likely for security reasons." |
| ) |
| table = None |
| |
| if raw: |
| return Response(response=config, status=200, mimetype="application/text") |
| else: |
| code_html = Markup( |
| highlight( |
| config, |
| lexers.IniLexer(), # Lexer call |
| HtmlFormatter(noclasses=True), |
| ) |
| ) |
| return self.render_template( |
| 'airflow/config.html', |
| pre_subtitle=settings.HEADER + " v" + airflow.__version__, |
| code_html=code_html, |
| title=title, |
| subtitle=subtitle, |
| table=table, |
| ) |
| |
| |
| class RedocView(AirflowBaseView): |
| """Redoc Open API documentation""" |
| |
| default_view = 'redoc' |
| |
| @expose('/redoc') |
| def redoc(self): |
| """Redoc API documentation.""" |
| openapi_spec_url = url_for("/api/v1./api/v1_openapi_yaml") |
| return self.render_template('airflow/redoc.html', openapi_spec_url=openapi_spec_url) |
| |
| |
| ###################################################################################### |
| # ModelViews |
| ###################################################################################### |
| |
| |
| class DagFilter(BaseFilter): |
| """Filter using DagIDs""" |
| |
| def apply(self, query, func): |
| if current_app.appbuilder.sm.has_all_dags_access(): |
| return query |
| filter_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids(g.user) |
| return query.filter(self.model.dag_id.in_(filter_dag_ids)) |
| |
| |
| class DagEditFilter(BaseFilter): |
| """Filter using DagIDs""" |
| |
| def apply(self, query, func): |
| filter_dag_ids = current_app.appbuilder.sm.get_editable_dag_ids(g.user) |
| return query.filter(self.model.dag_id.in_(filter_dag_ids)) |
| |
| |
| class AirflowModelView(ModelView): |
| """Airflow Mode View.""" |
| |
| list_widget = AirflowModelListWidget |
| page_size = PAGE_SIZE |
| |
| CustomSQLAInterface = wwwutils.CustomSQLAInterface |
| |
| |
| class AirflowPrivilegeVerifierModelView(AirflowModelView): |
| """ |
| This ModelView prevents ability to pass primary keys of objects relating to DAGs you shouldn't be able to |
| edit. This only holds for the add, update and delete operations. |
| You will still need to use the `action_has_dag_edit_access()` for actions. |
| """ |
| |
| @staticmethod |
| def validate_dag_edit_access(item: Union[DagRun, TaskInstance]): |
| """Validates whether the user has 'can_edit' access for this specific DAG.""" |
| if not current_app.appbuilder.sm.can_edit_dag(item.dag_id): |
| raise AirflowException(f"Access denied for dag_id {item.dag_id}") |
| |
| def pre_add(self, item: Union[DagRun, TaskInstance]): |
| self.validate_dag_edit_access(item) |
| |
| def pre_update(self, item: Union[DagRun, TaskInstance]): |
| self.validate_dag_edit_access(item) |
| |
| def pre_delete(self, item: Union[DagRun, TaskInstance]): |
| self.validate_dag_edit_access(item) |
| |
| def post_add_redirect(self): # Required to prevent redirect loop |
| return redirect(self.get_default_url()) |
| |
| def post_edit_redirect(self): # Required to prevent redirect loop |
| return redirect(self.get_default_url()) |
| |
| def post_delete_redirect(self): # Required to prevent redirect loop |
| return redirect(self.get_default_url()) |
| |
| |
| def action_has_dag_edit_access(action_func: Callable) -> Callable: |
| """Decorator for actions which verifies you have DAG edit access on the given tis/drs.""" |
| |
| @wraps(action_func) |
| def check_dag_edit_acl_for_actions( |
| self, |
| items: Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]], |
| *args, |
| **kwargs, |
| ) -> None: |
| if items is None: |
| dag_ids: Set[str] = set() |
| elif isinstance(items, list): |
| dag_ids = {item.dag_id for item in items if item is not None} |
| elif isinstance(items, TaskInstance) or isinstance(items, DagRun): |
| dag_ids = {items.dag_id} |
| else: |
| raise ValueError( |
| "Was expecting the first argument of the action to be of type " |
| "Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]]." |
| f"Was of type: {type(items)}" |
| ) |
| |
| for dag_id in dag_ids: |
| if not current_app.appbuilder.sm.can_edit_dag(dag_id): |
| flash(f"Access denied for dag_id {dag_id}", "danger") |
| logging.warning("User %s tried to modify %s without having access.", g.user.username, dag_id) |
| return redirect(self.get_default_url()) |
| return action_func(self, items, *args, **kwargs) |
| |
| return check_dag_edit_acl_for_actions |
| |
| |
| class SlaMissModelView(AirflowModelView): |
| """View to show SlaMiss table""" |
| |
| route_base = '/slamiss' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(SlaMiss) # type: ignore |
| |
| class_permission_name = permissions.RESOURCE_SLA_MISS |
| method_permission_name = { |
| 'list': 'read', |
| } |
| |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| list_columns = ['dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp'] |
| |
| label_columns = { |
| 'execution_date': 'Logical Date', |
| } |
| |
| add_columns = ['dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp'] |
| edit_columns = ['dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp'] |
| search_columns = ['dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date'] |
| base_order = ('execution_date', 'desc') |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| formatters_columns = { |
| 'task_id': wwwutils.task_instance_link, |
| 'execution_date': wwwutils.datetime_f('execution_date'), |
| 'timestamp': wwwutils.datetime_f('timestamp'), |
| 'dag_id': wwwutils.dag_link, |
| } |
| |
| |
| class XComModelView(AirflowModelView): |
| """View to show records from XCom table""" |
| |
| route_base = '/xcom' |
| |
| list_title = 'List XComs' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(XCom) |
| |
| class_permission_name = permissions.RESOURCE_XCOM |
| method_permission_name = { |
| 'list': 'read', |
| 'delete': 'delete', |
| 'action_muldelete': 'delete', |
| } |
| base_permissions = [ |
| permissions.ACTION_CAN_CREATE, |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_DELETE, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| label_columns = { |
| 'execution_date': 'Logical Date', |
| } |
| |
| search_columns = ['key', 'value', 'timestamp', 'execution_date', 'task_id', 'dag_id'] |
| list_columns = ['key', 'value', 'timestamp', 'execution_date', 'task_id', 'dag_id'] |
| base_order = ('execution_date', 'desc') |
| |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| formatters_columns = { |
| 'task_id': wwwutils.task_instance_link, |
| 'execution_date': wwwutils.datetime_f('execution_date'), |
| 'timestamp': wwwutils.datetime_f('timestamp'), |
| 'dag_id': wwwutils.dag_link, |
| } |
| |
| @action('muldelete', 'Delete', "Are you sure you want to delete selected records?", single=False) |
| def action_muldelete(self, items): |
| """Multiple delete action.""" |
| self.datamodel.delete_all(items) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| def pre_add(self, item): |
| """Pre add hook.""" |
| item.execution_date = timezone.make_aware(item.execution_date) |
| item.value = XCom.serialize_value(item.value) |
| |
| def pre_update(self, item): |
| """Pre update hook.""" |
| item.execution_date = timezone.make_aware(item.execution_date) |
| item.value = XCom.serialize_value(item.value) |
| |
| |
| def lazy_add_provider_discovered_options_to_connection_form(): |
| """Adds provider-discovered connection parameters as late as possible""" |
| |
| def _get_connection_types() -> List[Tuple[str, str]]: |
| """Returns connection types available.""" |
| _connection_types = [ |
| ('fs', 'File (path)'), |
| ('mesos_framework-id', 'Mesos Framework ID'), |
| ('email', 'Email'), |
| ] |
| providers_manager = ProvidersManager() |
| for connection_type, provider_info in providers_manager.hooks.items(): |
| if provider_info: |
| _connection_types.append((connection_type, provider_info.hook_name)) |
| return _connection_types |
| |
| ConnectionForm.conn_type = SelectField( |
| lazy_gettext('Connection Type'), |
| choices=sorted(_get_connection_types(), key=itemgetter(1)), |
| widget=Select2Widget(), |
| validators=[InputRequired()], |
| description=""" |
| Connection Type missing? |
| Make sure you've installed the corresponding Airflow Provider Package. |
| """, |
| ) |
| for key, value in ProvidersManager().connection_form_widgets.items(): |
| setattr(ConnectionForm, key, value.field) |
| ConnectionModelView.add_columns.append(key) |
| ConnectionModelView.edit_columns.append(key) |
| ConnectionModelView.extra_fields.append(key) |
| |
| |
| # Used to store a dictionary of field behaviours used to dynamically change available |
| # fields in ConnectionForm based on type of connection chosen |
| # See airflow.hooks.base_hook.DiscoverableHook for details on how to customize your Hooks. |
| # |
| # Additionally, a list of connection types that support testing via Airflow REST API is stored to dynamically |
| # enable/disable the Test Connection button. |
| # |
| # These field behaviours and testable connection types are rendered as scripts in the conn_create.html and |
| # conn_edit.html templates. |
| class ConnectionFormWidget(FormWidget): |
| """Form widget used to display connection""" |
| |
| @cached_property |
| def field_behaviours(self): |
| return json.dumps(ProvidersManager().field_behaviours) |
| |
| @cached_property |
| def testable_connection_types(self): |
| return [ |
| connection_type |
| for connection_type, provider_info in ProvidersManager().hooks.items() |
| if provider_info.connection_testable |
| ] |
| |
| |
| class ConnectionModelView(AirflowModelView): |
| """View to show records from Connections table""" |
| |
| route_base = '/connection' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(Connection) # type: ignore |
| |
| class_permission_name = permissions.RESOURCE_CONNECTION |
| method_permission_name = { |
| 'add': 'create', |
| 'list': 'read', |
| 'edit': 'edit', |
| 'delete': 'delete', |
| 'action_muldelete': 'delete', |
| 'action_mulduplicate': 'create', |
| } |
| |
| base_permissions = [ |
| permissions.ACTION_CAN_CREATE, |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_EDIT, |
| permissions.ACTION_CAN_DELETE, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| list_columns = [ |
| 'conn_id', |
| 'conn_type', |
| 'description', |
| 'host', |
| 'port', |
| 'is_encrypted', |
| 'is_extra_encrypted', |
| ] |
| add_columns = [ |
| 'conn_id', |
| 'conn_type', |
| 'description', |
| 'host', |
| 'schema', |
| 'login', |
| 'password', |
| 'port', |
| 'extra', |
| ] |
| edit_columns = add_columns.copy() |
| |
| # Initialized later by lazy_add_provider_discovered_options_to_connection_form |
| extra_fields: List[str] = [] |
| |
| add_form = edit_form = ConnectionForm |
| add_template = 'airflow/conn_create.html' |
| edit_template = 'airflow/conn_edit.html' |
| |
| add_widget = ConnectionFormWidget |
| edit_widget = ConnectionFormWidget |
| |
| base_order = ('conn_id', 'asc') |
| |
| @action('muldelete', 'Delete', 'Are you sure you want to delete selected records?', single=False) |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), |
| ] |
| ) |
| def action_muldelete(self, items): |
| """Multiple delete.""" |
| self.datamodel.delete_all(items) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| @action( |
| 'mulduplicate', |
| 'Duplicate', |
| 'Are you sure you want to duplicate the selected connections?', |
| single=False, |
| ) |
| @provide_session |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_CONNECTION), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_CONNECTION), |
| ] |
| ) |
| def action_mulduplicate(self, connections, session=None): |
| """Duplicate Multiple connections""" |
| for selected_conn in connections: |
| new_conn_id = selected_conn.conn_id |
| match = re.search(r"_copy(\d+)$", selected_conn.conn_id) |
| |
| base_conn_id = selected_conn.conn_id |
| if match: |
| base_conn_id = base_conn_id.split('_copy')[0] |
| |
| potential_connection_ids = [f"{base_conn_id}_copy{i}" for i in range(1, 11)] |
| |
| query = session.query(Connection.conn_id).filter(Connection.conn_id.in_(potential_connection_ids)) |
| |
| found_conn_id_set = {conn_id for conn_id, in query} |
| |
| possible_conn_id_iter = ( |
| connection_id |
| for connection_id in potential_connection_ids |
| if connection_id not in found_conn_id_set |
| ) |
| try: |
| new_conn_id = next(possible_conn_id_iter) |
| except StopIteration: |
| flash( |
| f"Connection {new_conn_id} can't be added because it already exists, " |
| f"Please rename the existing connections", |
| "warning", |
| ) |
| else: |
| |
| dup_conn = Connection( |
| new_conn_id, |
| selected_conn.conn_type, |
| selected_conn.description, |
| selected_conn.host, |
| selected_conn.login, |
| selected_conn.password, |
| selected_conn.schema, |
| selected_conn.port, |
| selected_conn.extra, |
| ) |
| |
| try: |
| session.add(dup_conn) |
| session.commit() |
| flash(f"Connection {new_conn_id} added successfully.", "success") |
| except IntegrityError: |
| flash( |
| f"Connection {new_conn_id} can't be added. Integrity error, " |
| f"probably unique constraint.", |
| "warning", |
| ) |
| session.rollback() |
| |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| def process_form(self, form, is_created): |
| """Process form data.""" |
| conn_type = form.data['conn_type'] |
| conn_id = form.data["conn_id"] |
| extra = { |
| key: form.data[key] |
| for key in self.extra_fields |
| if key in form.data and key.startswith(f"extra__{conn_type}__") |
| } |
| |
| # If parameters are added to the classic `Extra` field, include these values along with |
| # custom-field extras. |
| extra_conn_params = form.data.get("extra") |
| |
| if extra_conn_params: |
| try: |
| extra.update(json.loads(extra_conn_params)) |
| except (JSONDecodeError, TypeError): |
| flash( |
| Markup( |
| "<p>The <em>Extra</em> connection field contained an invalid value for Conn ID: " |
| f"<q>{conn_id}</q>.</p>" |
| "<p>If connection parameters need to be added to <em>Extra</em>, " |
| "please make sure they are in the form of a single, valid JSON object.</p><br>" |
| "The following <em>Extra</em> parameters were <b>not</b> added to the connection:<br>" |
| f"{extra_conn_params}", |
| ), |
| category="error", |
| ) |
| |
| if extra.keys(): |
| form.extra.data = json.dumps(extra) |
| |
| def prefill_form(self, form, pk): |
| """Prefill the form.""" |
| try: |
| extra = form.data.get('extra') |
| if extra is None: |
| extra_dictionary = {} |
| else: |
| extra_dictionary = json.loads(extra) |
| except JSONDecodeError: |
| extra_dictionary = {} |
| |
| if not isinstance(extra_dictionary, dict): |
| logging.warning('extra field for %s is not a dictionary', form.data.get('conn_id', '<unknown>')) |
| return |
| |
| for field in self.extra_fields: |
| value = extra_dictionary.get(field, '') |
| if value: |
| field = getattr(form, field) |
| field.data = value |
| |
| |
| class PluginView(AirflowBaseView): |
| """View to show Airflow Plugins""" |
| |
| default_view = 'list' |
| |
| class_permission_name = permissions.RESOURCE_PLUGIN |
| |
| method_permission_name = { |
| 'list': 'read', |
| } |
| |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| plugins_attributes_to_dump = [ |
| "hooks", |
| "executors", |
| "macros", |
| "admin_views", |
| "flask_blueprints", |
| "menu_links", |
| "appbuilder_views", |
| "appbuilder_menu_items", |
| "global_operator_extra_links", |
| "operator_extra_links", |
| "source", |
| ] |
| |
| @expose('/plugin') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_PLUGIN), |
| ] |
| ) |
| def list(self): |
| """List loaded plugins.""" |
| plugins_manager.ensure_plugins_loaded() |
| plugins_manager.integrate_executor_plugins() |
| plugins_manager.initialize_extra_operators_links_plugins() |
| plugins_manager.initialize_web_ui_plugins() |
| |
| plugins = [] |
| for plugin_no, plugin in enumerate(plugins_manager.plugins, 1): |
| plugin_data = { |
| 'plugin_no': plugin_no, |
| 'plugin_name': plugin.name, |
| 'attrs': {}, |
| } |
| for attr_name in self.plugins_attributes_to_dump: |
| attr_value = getattr(plugin, attr_name) |
| plugin_data['attrs'][attr_name] = attr_value |
| |
| plugins.append(plugin_data) |
| |
| title = "Airflow Plugins" |
| doc_url = get_docs_url("plugins.html") |
| return self.render_template( |
| 'airflow/plugin.html', |
| plugins=plugins, |
| title=title, |
| doc_url=doc_url, |
| ) |
| |
| |
| class ProviderView(AirflowBaseView): |
| """View to show Airflow Providers""" |
| |
| default_view = 'list' |
| |
| class_permission_name = permissions.RESOURCE_PROVIDER |
| |
| method_permission_name = { |
| 'list': 'read', |
| } |
| |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| @expose('/provider') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_PROVIDER), |
| ] |
| ) |
| def list(self): |
| """List providers.""" |
| providers_manager = ProvidersManager() |
| |
| providers = [] |
| for pi in providers_manager.providers.values(): |
| provider_info = pi[1] |
| provider_data = { |
| "package_name": provider_info["package-name"], |
| "description": self._clean_description(provider_info["description"]), |
| "version": pi[0], |
| "documentation_url": get_doc_url_for_provider(provider_info["package-name"], pi[0]), |
| } |
| providers.append(provider_data) |
| |
| title = "Providers" |
| doc_url = get_docs_url("apache-airflow-providers/index.html") |
| return self.render_template( |
| 'airflow/providers.html', |
| providers=providers, |
| title=title, |
| doc_url=doc_url, |
| ) |
| |
| def _clean_description(self, description): |
| def _build_link(match_obj): |
| text = match_obj.group(1) |
| url = match_obj.group(2) |
| return markupsafe.Markup(f'<a href="{url}">{text}</a>') |
| |
| cd = markupsafe.escape(description) |
| cd = re.sub(r"`(.*)[\s+]+<(.*)>`__", _build_link, cd) |
| cd = re.sub(r"\n", r"<br>", cd) |
| return markupsafe.Markup(cd) |
| |
| |
| class PoolModelView(AirflowModelView): |
| """View to show records from Pool table""" |
| |
| route_base = '/pool' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(models.Pool) # type: ignore |
| |
| class_permission_name = permissions.RESOURCE_POOL |
| method_permission_name = { |
| 'add': 'create', |
| 'list': 'read', |
| 'edit': 'edit', |
| 'delete': 'delete', |
| 'action_muldelete': 'delete', |
| } |
| |
| base_permissions = [ |
| permissions.ACTION_CAN_CREATE, |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_EDIT, |
| permissions.ACTION_CAN_DELETE, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| list_columns = ['pool', 'slots', 'running_slots', 'queued_slots'] |
| add_columns = ['pool', 'slots', 'description'] |
| edit_columns = ['pool', 'slots', 'description'] |
| |
| base_order = ('pool', 'asc') |
| |
| @action('muldelete', 'Delete', 'Are you sure you want to delete selected records?', single=False) |
| def action_muldelete(self, items): |
| """Multiple delete.""" |
| if any(item.pool == models.Pool.DEFAULT_POOL_NAME for item in items): |
| flash("default_pool cannot be deleted", 'error') |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| self.datamodel.delete_all(items) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| def pool_link(self): |
| """Pool link rendering.""" |
| pool_id = self.get('pool') |
| if pool_id is not None: |
| url = url_for('TaskInstanceModelView.list', _flt_3_pool=pool_id) |
| return Markup("<a href='{url}'>{pool_id}</a>").format(url=url, pool_id=pool_id) |
| else: |
| return Markup('<span class="label label-danger">Invalid</span>') |
| |
| def frunning_slots(self): |
| """Running slots rendering.""" |
| pool_id = self.get('pool') |
| running_slots = self.get('running_slots') |
| if pool_id is not None and running_slots is not None: |
| url = url_for('TaskInstanceModelView.list', _flt_3_pool=pool_id, _flt_3_state='running') |
| return Markup("<a href='{url}'>{running_slots}</a>").format(url=url, running_slots=running_slots) |
| else: |
| return Markup('<span class="label label-danger">Invalid</span>') |
| |
| def fqueued_slots(self): |
| """Queued slots rendering.""" |
| pool_id = self.get('pool') |
| queued_slots = self.get('queued_slots') |
| if pool_id is not None and queued_slots is not None: |
| url = url_for('TaskInstanceModelView.list', _flt_3_pool=pool_id, _flt_3_state='queued') |
| return Markup("<a href='{url}'>{queued_slots}</a>").format(url=url, queued_slots=queued_slots) |
| else: |
| return Markup('<span class="label label-danger">Invalid</span>') |
| |
| formatters_columns = {'pool': pool_link, 'running_slots': frunning_slots, 'queued_slots': fqueued_slots} |
| |
| validators_columns = {'pool': [validators.DataRequired()], 'slots': [validators.NumberRange(min=-1)]} |
| |
| |
| def _can_create_variable() -> bool: |
| return current_app.appbuilder.sm.has_access(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_VARIABLE) |
| |
| |
| class VariableModelView(AirflowModelView): |
| """View to show records from Variable table""" |
| |
| route_base = '/variable' |
| |
| list_template = 'airflow/variable_list.html' |
| edit_template = 'airflow/variable_edit.html' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(models.Variable) # type: ignore |
| |
| class_permission_name = permissions.RESOURCE_VARIABLE |
| method_permission_name = { |
| 'add': 'create', |
| 'list': 'read', |
| 'edit': 'edit', |
| 'delete': 'delete', |
| 'action_muldelete': 'delete', |
| 'action_varexport': 'read', |
| } |
| base_permissions = [ |
| permissions.ACTION_CAN_CREATE, |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_EDIT, |
| permissions.ACTION_CAN_DELETE, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| list_columns = ['key', 'val', 'description', 'is_encrypted'] |
| add_columns = ['key', 'val', 'description'] |
| edit_columns = ['key', 'val', 'description'] |
| search_columns = ['key', 'val'] |
| |
| base_order = ('key', 'asc') |
| |
| def hidden_field_formatter(self): |
| """Formats hidden fields""" |
| key = self.get('key') |
| val = self.get('val') |
| if secrets_masker.should_hide_value_for_key(key): |
| return Markup('*' * 8) |
| if val: |
| return val |
| else: |
| return Markup('<span class="label label-danger">Invalid</span>') |
| |
| formatters_columns = { |
| 'val': hidden_field_formatter, |
| } |
| |
| validators_columns = {'key': [validators.DataRequired()]} |
| |
| def prefill_form(self, form, request_id): |
| if secrets_masker.should_hide_value_for_key(form.key.data): |
| form.val.data = '*' * 8 |
| |
| extra_args = {"can_create_variable": _can_create_variable} |
| |
| @action('muldelete', 'Delete', 'Are you sure you want to delete selected records?', single=False) |
| def action_muldelete(self, items): |
| """Multiple delete.""" |
| self.datamodel.delete_all(items) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| @action('varexport', 'Export', '', single=False) |
| def action_varexport(self, items): |
| """Export variables.""" |
| var_dict = {} |
| decoder = json.JSONDecoder() |
| for var in items: |
| try: |
| val = decoder.decode(var.val) |
| except Exception: |
| val = var.val |
| var_dict[var.key] = val |
| |
| response = make_response(json.dumps(var_dict, sort_keys=True, indent=4)) |
| response.headers["Content-Disposition"] = "attachment; filename=variables.json" |
| response.headers["Content-Type"] = "application/json; charset=utf-8" |
| return response |
| |
| @expose('/varimport', methods=["POST"]) |
| @auth.has_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_VARIABLE)]) |
| @action_logging |
| def varimport(self): |
| """Import variables""" |
| try: |
| variable_dict = json.loads(request.files['file'].read()) |
| except Exception: |
| self.update_redirect() |
| flash("Missing file or syntax error.", 'error') |
| return redirect(self.get_redirect()) |
| else: |
| suc_count = fail_count = 0 |
| for k, v in variable_dict.items(): |
| try: |
| models.Variable.set(k, v, serialize_json=not isinstance(v, str)) |
| except Exception as e: |
| logging.info('Variable import failed: %s', repr(e)) |
| fail_count += 1 |
| else: |
| suc_count += 1 |
| flash(f"{suc_count} variable(s) successfully updated.") |
| if fail_count: |
| flash(f"{fail_count} variable(s) failed to be updated.", 'error') |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| |
| class JobModelView(AirflowModelView): |
| """View to show records from Job table""" |
| |
| route_base = '/job' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(BaseJob) # type: ignore |
| |
| class_permission_name = permissions.RESOURCE_JOB |
| method_permission_name = { |
| 'list': 'read', |
| } |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| list_columns = [ |
| 'id', |
| 'dag_id', |
| 'state', |
| 'job_type', |
| 'start_date', |
| 'end_date', |
| 'latest_heartbeat', |
| 'executor_class', |
| 'hostname', |
| 'unixname', |
| ] |
| search_columns = [ |
| 'id', |
| 'dag_id', |
| 'state', |
| 'job_type', |
| 'start_date', |
| 'end_date', |
| 'latest_heartbeat', |
| 'executor_class', |
| 'hostname', |
| 'unixname', |
| ] |
| |
| base_order = ('start_date', 'desc') |
| |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| formatters_columns = { |
| 'start_date': wwwutils.datetime_f('start_date'), |
| 'end_date': wwwutils.datetime_f('end_date'), |
| 'hostname': wwwutils.nobr_f('hostname'), |
| 'state': wwwutils.state_f, |
| 'latest_heartbeat': wwwutils.datetime_f('latest_heartbeat'), |
| } |
| |
| |
| class DagRunModelView(AirflowPrivilegeVerifierModelView): |
| """View to show records from DagRun table""" |
| |
| route_base = '/dagrun' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(models.DagRun) # type: ignore |
| |
| class_permission_name = permissions.RESOURCE_DAG_RUN |
| method_permission_name = { |
| 'list': 'read', |
| 'action_clear': 'edit', |
| 'action_muldelete': 'delete', |
| 'action_set_queued': 'edit', |
| 'action_set_running': 'edit', |
| 'action_set_failed': 'edit', |
| 'action_set_success': 'edit', |
| } |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_EDIT, |
| permissions.ACTION_CAN_DELETE, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| list_columns = [ |
| 'state', |
| 'dag_id', |
| 'execution_date', |
| 'run_id', |
| 'run_type', |
| 'queued_at', |
| 'start_date', |
| 'end_date', |
| 'external_trigger', |
| 'conf', |
| 'duration', |
| ] |
| search_columns = [ |
| 'state', |
| 'dag_id', |
| 'execution_date', |
| 'run_id', |
| 'run_type', |
| 'start_date', |
| 'end_date', |
| 'external_trigger', |
| ] |
| label_columns = { |
| 'execution_date': 'Logical Date', |
| } |
| edit_columns = ['state', 'dag_id', 'execution_date', 'start_date', 'end_date', 'run_id', 'conf'] |
| |
| # duration is not a DB column, its derived |
| order_columns = [ |
| 'state', |
| 'dag_id', |
| 'execution_date', |
| 'run_id', |
| 'run_type', |
| 'queued_at', |
| 'start_date', |
| 'end_date', |
| 'external_trigger', |
| 'conf', |
| ] |
| |
| base_order = ('execution_date', 'desc') |
| |
| base_filters = [['dag_id', DagEditFilter, lambda: []]] |
| |
| edit_form = DagRunEditForm |
| |
| def duration_f(self): |
| """Duration calculation.""" |
| end_date = self.get('end_date') |
| start_date = self.get('start_date') |
| |
| difference = '0s' |
| if start_date and end_date: |
| difference = td_format(end_date - start_date) |
| |
| return difference |
| |
| formatters_columns = { |
| 'execution_date': wwwutils.datetime_f('execution_date'), |
| 'state': wwwutils.state_f, |
| 'start_date': wwwutils.datetime_f('start_date'), |
| 'end_date': wwwutils.datetime_f('end_date'), |
| 'queued_at': wwwutils.datetime_f('queued_at'), |
| 'dag_id': wwwutils.dag_link, |
| 'run_id': wwwutils.dag_run_link, |
| 'conf': wwwutils.json_f('conf'), |
| 'duration': duration_f, |
| } |
| |
| @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False) |
| @action_has_dag_edit_access |
| def action_muldelete(self, items: List[DagRun]): |
| """Multiple delete.""" |
| self.datamodel.delete_all(items) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| @action('set_queued', "Set state to 'queued'", '', single=False) |
| @action_has_dag_edit_access |
| def action_set_queued(self, drs: List[DagRun]): |
| """Set state to queued.""" |
| return self._set_dag_runs_to_active_state(drs, State.QUEUED) |
| |
| @action('set_running', "Set state to 'running'", '', single=False) |
| @action_has_dag_edit_access |
| def action_set_running(self, drs: List[DagRun]): |
| """Set state to running.""" |
| return self._set_dag_runs_to_active_state(drs, State.RUNNING) |
| |
| @provide_session |
| def _set_dag_runs_to_active_state(self, drs: List[DagRun], state: str, session=None): |
| """This routine only supports Running and Queued state.""" |
| try: |
| count = 0 |
| for dr in session.query(DagRun).filter(DagRun.id.in_([dagrun.id for dagrun in drs])): |
| count += 1 |
| if state == State.RUNNING: |
| dr.start_date = timezone.utcnow() |
| dr.state = state |
| session.commit() |
| flash(f"{count} dag runs were set to {state}.") |
| except Exception as ex: |
| flash(str(ex), 'error') |
| flash('Failed to set state', 'error') |
| return redirect(self.get_default_url()) |
| |
| @action( |
| 'set_failed', |
| "Set state to 'failed'", |
| "All running task instances would also be marked as failed, are you sure?", |
| single=False, |
| ) |
| @action_has_dag_edit_access |
| @provide_session |
| def action_set_failed(self, drs: List[DagRun], session=None): |
| """Set state to failed.""" |
| try: |
| count = 0 |
| altered_tis = [] |
| for dr in session.query(DagRun).filter(DagRun.id.in_([dagrun.id for dagrun in drs])).all(): |
| count += 1 |
| altered_tis += set_dag_run_state_to_failed( |
| current_app.dag_bag.get_dag(dr.dag_id), dr.execution_date, commit=True, session=session |
| ) |
| altered_ti_count = len(altered_tis) |
| flash(f"{count} dag runs and {altered_ti_count} task instances were set to failed") |
| except Exception: |
| flash('Failed to set state', 'error') |
| return redirect(self.get_default_url()) |
| |
| @action( |
| 'set_success', |
| "Set state to 'success'", |
| "All task instances would also be marked as success, are you sure?", |
| single=False, |
| ) |
| @action_has_dag_edit_access |
| @provide_session |
| def action_set_success(self, drs: List[DagRun], session=None): |
| """Set state to success.""" |
| try: |
| count = 0 |
| altered_tis = [] |
| for dr in session.query(DagRun).filter(DagRun.id.in_([dagrun.id for dagrun in drs])).all(): |
| count += 1 |
| altered_tis += set_dag_run_state_to_success( |
| current_app.dag_bag.get_dag(dr.dag_id), dr.execution_date, commit=True, session=session |
| ) |
| altered_ti_count = len(altered_tis) |
| flash(f"{count} dag runs and {altered_ti_count} task instances were set to success") |
| except Exception: |
| flash('Failed to set state', 'error') |
| return redirect(self.get_default_url()) |
| |
| @action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False) |
| @action_has_dag_edit_access |
| @provide_session |
| def action_clear(self, drs: List[DagRun], session=None): |
| """Clears the state.""" |
| try: |
| count = 0 |
| cleared_ti_count = 0 |
| dag_to_tis: Dict[DAG, List[TaskInstance]] = {} |
| for dr in session.query(DagRun).filter(DagRun.id.in_([dagrun.id for dagrun in drs])).all(): |
| count += 1 |
| dag = current_app.dag_bag.get_dag(dr.dag_id) |
| tis_to_clear = dag_to_tis.setdefault(dag, []) |
| tis_to_clear += dr.get_task_instances() |
| |
| for dag, tis in dag_to_tis.items(): |
| cleared_ti_count += len(tis) |
| models.clear_task_instances(tis, session, dag=dag) |
| |
| flash(f"{count} dag runs and {cleared_ti_count} task instances were cleared") |
| except Exception: |
| flash('Failed to clear state', 'error') |
| return redirect(self.get_default_url()) |
| |
| |
| class LogModelView(AirflowModelView): |
| """View to show records from Log table""" |
| |
| route_base = '/log' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(Log) # type:ignore |
| |
| class_permission_name = permissions.RESOURCE_AUDIT_LOG |
| method_permission_name = { |
| 'list': 'read', |
| } |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| list_columns = ['id', 'dttm', 'dag_id', 'task_id', 'event', 'execution_date', 'owner', 'extra'] |
| search_columns = ['dag_id', 'task_id', 'event', 'execution_date', 'owner', 'extra'] |
| |
| label_columns = { |
| 'execution_date': 'Logical Date', |
| } |
| |
| base_order = ('dttm', 'desc') |
| |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| formatters_columns = { |
| 'dttm': wwwutils.datetime_f('dttm'), |
| 'execution_date': wwwutils.datetime_f('execution_date'), |
| 'dag_id': wwwutils.dag_link, |
| } |
| |
| |
| class TaskRescheduleModelView(AirflowModelView): |
| """View to show records from Task Reschedule table""" |
| |
| route_base = '/taskreschedule' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(models.TaskReschedule) # type: ignore |
| related_views = [DagRunModelView] |
| |
| class_permission_name = permissions.RESOURCE_TASK_RESCHEDULE |
| method_permission_name = { |
| 'list': 'read', |
| } |
| |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| list_columns = [ |
| 'id', |
| 'dag_id', |
| 'run_id', |
| 'dag_run.execution_date', |
| 'task_id', |
| 'try_number', |
| 'start_date', |
| 'end_date', |
| 'duration', |
| 'reschedule_date', |
| ] |
| |
| label_columns = { |
| 'dag_run.execution_date': 'Logical Date', |
| } |
| |
| search_columns = [ |
| 'dag_id', |
| 'task_id', |
| 'run_id', |
| 'execution_date', |
| 'start_date', |
| 'end_date', |
| 'reschedule_date', |
| ] |
| |
| base_order = ('id', 'desc') |
| |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| def duration_f(self): |
| """Duration calculation.""" |
| end_date = self.get('end_date') |
| duration = self.get('duration') |
| if end_date and duration: |
| return td_format(timedelta(seconds=duration)) |
| return None |
| |
| formatters_columns = { |
| 'dag_id': wwwutils.dag_link, |
| 'task_id': wwwutils.task_instance_link, |
| 'start_date': wwwutils.datetime_f('start_date'), |
| 'end_date': wwwutils.datetime_f('end_date'), |
| 'dag_run.execution_date': wwwutils.datetime_f('dag_run.execution_date'), |
| 'reschedule_date': wwwutils.datetime_f('reschedule_date'), |
| 'duration': duration_f, |
| } |
| |
| |
| class TriggerModelView(AirflowModelView): |
| """View to show records from Task Reschedule table""" |
| |
| route_base = '/triggerview' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(models.Trigger) # type: ignore |
| |
| class_permission_name = permissions.RESOURCE_TRIGGER |
| method_permission_name = { |
| 'list': 'read', |
| } |
| |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| list_columns = [ |
| 'id', |
| 'classpath', |
| 'created_date', |
| 'triggerer_id', |
| ] |
| |
| search_columns = [ |
| 'id', |
| 'classpath', |
| 'created_date', |
| 'triggerer_id', |
| ] |
| |
| # add_exclude_columns = ["kwargs"] |
| |
| base_order = ('id', 'created_date') |
| |
| formatters_columns = { |
| 'created_date': wwwutils.datetime_f('created_date'), |
| } |
| |
| |
| class TaskInstanceModelView(AirflowPrivilegeVerifierModelView): |
| """View to show records from TaskInstance table""" |
| |
| route_base = '/taskinstance' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(models.TaskInstance) # type: ignore |
| |
| class_permission_name = permissions.RESOURCE_TASK_INSTANCE |
| method_permission_name = { |
| 'list': 'read', |
| 'action_clear': 'edit', |
| 'action_muldelete': 'delete', |
| 'action_set_running': 'edit', |
| 'action_set_failed': 'edit', |
| 'action_set_success': 'edit', |
| 'action_set_retry': 'edit', |
| } |
| base_permissions = [ |
| permissions.ACTION_CAN_CREATE, |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_EDIT, |
| permissions.ACTION_CAN_DELETE, |
| permissions.ACTION_CAN_ACCESS_MENU, |
| ] |
| |
| page_size = PAGE_SIZE |
| |
| list_columns = [ |
| 'state', |
| 'dag_id', |
| 'task_id', |
| 'run_id', |
| 'dag_run.execution_date', |
| 'operator', |
| 'start_date', |
| 'end_date', |
| 'duration', |
| 'job_id', |
| 'hostname', |
| 'unixname', |
| 'priority_weight', |
| 'queue', |
| 'queued_dttm', |
| 'try_number', |
| 'pool', |
| 'queued_by_job_id', |
| 'external_executor_id', |
| 'log_url', |
| ] |
| |
| order_columns = [ |
| item for item in list_columns if item not in ['try_number', 'log_url', 'external_executor_id'] |
| ] |
| |
| label_columns = { |
| 'dag_run.execution_date': 'Logical Date', |
| } |
| |
| search_columns = [ |
| 'state', |
| 'dag_id', |
| 'task_id', |
| 'run_id', |
| 'execution_date', |
| 'operator', |
| 'start_date', |
| 'end_date', |
| 'hostname', |
| 'priority_weight', |
| 'queue', |
| 'queued_dttm', |
| 'try_number', |
| 'pool', |
| 'queued_by_job_id', |
| ] |
| |
| edit_columns = [ |
| 'state', |
| 'start_date', |
| 'end_date', |
| ] |
| |
| add_exclude_columns = ["next_method", "next_kwargs", "trigger_id"] |
| |
| edit_form = TaskInstanceEditForm |
| |
| base_order = ('job_id', 'asc') |
| |
| base_filters = [['dag_id', DagEditFilter, lambda: []]] |
| |
| def log_url_formatter(self): |
| """Formats log URL.""" |
| log_url = self.get('log_url') |
| return Markup( |
| '<a href="{log_url}"><span class="material-icons" aria-hidden="true">reorder</span></a>' |
| ).format(log_url=log_url) |
| |
| def duration_f(self): |
| """Formats duration.""" |
| end_date = self.get('end_date') |
| duration = self.get('duration') |
| if end_date and duration: |
| return td_format(timedelta(seconds=duration)) |
| return None |
| |
| formatters_columns = { |
| 'log_url': log_url_formatter, |
| 'task_id': wwwutils.task_instance_link, |
| 'run_id': wwwutils.dag_run_link, |
| 'hostname': wwwutils.nobr_f('hostname'), |
| 'state': wwwutils.state_f, |
| 'dag_run.execution_date': wwwutils.datetime_f('dag_run.execution_date'), |
| 'start_date': wwwutils.datetime_f('start_date'), |
| 'end_date': wwwutils.datetime_f('end_date'), |
| 'queued_dttm': wwwutils.datetime_f('queued_dttm'), |
| 'dag_id': wwwutils.dag_link, |
| 'duration': duration_f, |
| } |
| |
| @action( |
| 'clear', |
| lazy_gettext('Clear'), |
| lazy_gettext( |
| 'Are you sure you want to clear the state of the selected task' |
| ' instance(s) and set their dagruns to the QUEUED state?' |
| ), |
| single=False, |
| ) |
| @action_has_dag_edit_access |
| @provide_session |
| def action_clear(self, task_instances, session=None): |
| """Clears the action.""" |
| try: |
| dag_to_tis = collections.defaultdict(list) |
| |
| for ti in task_instances: |
| dag = current_app.dag_bag.get_dag(ti.dag_id) |
| dag_to_tis[dag].append(ti) |
| |
| for dag, task_instances_list in dag_to_tis.items(): |
| models.clear_task_instances(task_instances_list, session, dag=dag) |
| |
| session.commit() |
| flash(f"{len(task_instances)} task instances have been cleared") |
| except Exception as e: |
| flash(f'Failed to clear task instances: "{e}"', 'error') |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| @action('muldelete', 'Delete', "Are you sure you want to delete selected records?", single=False) |
| @action_has_dag_edit_access |
| def action_muldelete(self, items): |
| self.datamodel.delete_all(items) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| @provide_session |
| def set_task_instance_state(self, tis, target_state, session=None): |
| """Set task instance state.""" |
| try: |
| count = len(tis) |
| for ti in tis: |
| ti.set_state(target_state, session) |
| session.commit() |
| flash(f"{count} task instances were set to '{target_state}'") |
| except Exception: |
| flash('Failed to set state', 'error') |
| |
| @action('set_running', "Set state to 'running'", '', single=False) |
| @action_has_dag_edit_access |
| def action_set_running(self, tis): |
| """Set state to 'running'""" |
| self.set_task_instance_state(tis, State.RUNNING) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| @action('set_failed', "Set state to 'failed'", '', single=False) |
| @action_has_dag_edit_access |
| def action_set_failed(self, tis): |
| """Set state to 'failed'""" |
| self.set_task_instance_state(tis, State.FAILED) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| @action('set_success', "Set state to 'success'", '', single=False) |
| @action_has_dag_edit_access |
| def action_set_success(self, tis): |
| """Set state to 'success'""" |
| self.set_task_instance_state(tis, State.SUCCESS) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| @action('set_retry', "Set state to 'up_for_retry'", '', single=False) |
| @action_has_dag_edit_access |
| def action_set_retry(self, tis): |
| """Set state to 'up_for_retry'""" |
| self.set_task_instance_state(tis, State.UP_FOR_RETRY) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| |
| class AutocompleteView(AirflowBaseView): |
| """View to provide autocomplete results""" |
| |
| @auth.has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)]) |
| @provide_session |
| @expose('/dagmodel/autocomplete') |
| def autocomplete(self, session=None): |
| """Autocomplete.""" |
| query = unquote(request.args.get('query', '')) |
| |
| if not query: |
| return wwwutils.json_response([]) |
| |
| # Provide suggestions of dag_ids and owners |
| dag_ids_query = session.query(DagModel.dag_id.label('item')).filter( |
| ~DagModel.is_subdag, DagModel.is_active, DagModel.dag_id.ilike('%' + query + '%') |
| ) |
| |
| owners_query = session.query(func.distinct(DagModel.owners).label('item')).filter( |
| ~DagModel.is_subdag, DagModel.is_active, DagModel.owners.ilike('%' + query + '%') |
| ) |
| |
| # Hide DAGs if not showing status: "all" |
| status = flask_session.get(FILTER_STATUS_COOKIE) |
| if status == 'active': |
| dag_ids_query = dag_ids_query.filter(~DagModel.is_paused) |
| owners_query = owners_query.filter(~DagModel.is_paused) |
| elif status == 'paused': |
| dag_ids_query = dag_ids_query.filter(DagModel.is_paused) |
| owners_query = owners_query.filter(DagModel.is_paused) |
| |
| filter_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids(g.user) |
| |
| dag_ids_query = dag_ids_query.filter(DagModel.dag_id.in_(filter_dag_ids)) |
| owners_query = owners_query.filter(DagModel.dag_id.in_(filter_dag_ids)) |
| |
| payload = [row[0] for row in dag_ids_query.union(owners_query).limit(10).all()] |
| |
| return wwwutils.json_response(payload) |
| |
| |
| class DagDependenciesView(AirflowBaseView): |
| """View to show dependencies between DAGs""" |
| |
| refresh_interval = timedelta( |
| seconds=conf.getint( |
| "webserver", |
| "dag_dependencies_refresh_interval", |
| fallback=conf.getint("scheduler", "dag_dir_list_interval"), |
| ) |
| ) |
| last_refresh = timezone.utcnow() - refresh_interval |
| nodes: List[Dict[str, Any]] = [] |
| edges: List[Dict[str, str]] = [] |
| |
| @expose('/dag-dependencies') |
| @auth.has_access( |
| [ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES), |
| ] |
| ) |
| @gzipped |
| @action_logging |
| def list(self): |
| """Display DAG dependencies""" |
| title = "DAG Dependencies" |
| |
| if not self.nodes or not self.edges: |
| self._calculate_graph() |
| self.last_refresh = timezone.utcnow() |
| elif timezone.utcnow() > self.last_refresh + self.refresh_interval: |
| max_last_updated = SerializedDagModel.get_max_last_updated_datetime() |
| if max_last_updated is None or max_last_updated > self.last_refresh: |
| self._calculate_graph() |
| self.last_refresh = timezone.utcnow() |
| |
| return self.render_template( |
| "airflow/dag_dependencies.html", |
| title=title, |
| nodes=self.nodes, |
| edges=self.edges, |
| last_refresh=self.last_refresh, |
| arrange=conf.get("webserver", "dag_orientation"), |
| width=request.args.get("width", "100%"), |
| height=request.args.get("height", "800"), |
| ) |
| |
| def _calculate_graph(self): |
| |
| nodes: List[Dict[str, Any]] = [] |
| edges: List[Dict[str, str]] = [] |
| |
| for dag, dependencies in SerializedDagModel.get_dag_dependencies().items(): |
| dag_node_id = f"dag:{dag}" |
| nodes.append(self._node_dict(dag_node_id, dag, "dag")) |
| |
| for dep in dependencies: |
| |
| nodes.append(self._node_dict(dep.node_id, dep.dependency_id, dep.dependency_type)) |
| edges.extend( |
| [ |
| {"u": f"dag:{dep.source}", "v": dep.node_id}, |
| {"u": dep.node_id, "v": f"dag:{dep.target}"}, |
| ] |
| ) |
| |
| self.nodes = nodes |
| self.edges = edges |
| |
| @staticmethod |
| def _node_dict(node_id, label, node_class): |
| return { |
| "id": node_id, |
| "value": {"label": label, "rx": 5, "ry": 5, "class": node_class}, |
| } |
| |
| |
| class ActionModelView(PermissionModelView): |
| """Customize permission names for FAB's builtin PermissionModelView.""" |
| |
| class_permission_name = permissions.RESOURCE_ACTION |
| route_base = "/actions" |
| method_permission_name = { |
| 'list': 'read', |
| } |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| ] |
| |
| list_title = lazy_gettext("List Actions") |
| show_title = lazy_gettext("Show Action") |
| add_title = lazy_gettext("Add Action") |
| edit_title = lazy_gettext("Edit Action") |
| |
| label_columns = {"name": lazy_gettext("Name")} |
| |
| |
| class PermissionPairModelView(PermissionViewModelView): |
| """Customize permission names for FAB's builtin PermissionViewModelView.""" |
| |
| class_permission_name = permissions.RESOURCE_PERMISSION |
| route_base = "/permissions" |
| method_permission_name = { |
| 'list': 'read', |
| } |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| ] |
| |
| list_title = lazy_gettext("List Permissions") |
| show_title = lazy_gettext("Show Permission") |
| add_title = lazy_gettext("Add Permission") |
| edit_title = lazy_gettext("Edit Permission") |
| |
| label_columns = { |
| "action": lazy_gettext("Action"), |
| "resource": lazy_gettext("Resource"), |
| } |
| list_columns = ["action", "resource"] |
| |
| |
| class CustomResetMyPasswordView(ResetMyPasswordView): |
| """Customize permission names for FAB's builtin ResetMyPasswordView.""" |
| |
| class_permission_name = permissions.RESOURCE_MY_PASSWORD |
| method_permission_name = { |
| 'this_form_get': 'read', |
| 'this_form_post': 'edit', |
| } |
| base_permissions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ] |
| |
| |
| class CustomResetPasswordView(ResetPasswordView): |
| """Customize permission names for FAB's builtin ResetPasswordView.""" |
| |
| class_permission_name = permissions.RESOURCE_PASSWORD |
| method_permission_name = { |
| 'this_form_get': 'read', |
| 'this_form_post': 'edit', |
| } |
| |
| base_permissions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ] |
| |
| |
| class CustomRoleModelView(RoleModelView): |
| """Customize permission names for FAB's builtin RoleModelView.""" |
| |
| class_permission_name = permissions.RESOURCE_ROLE |
| method_permission_name = { |
| 'delete': 'delete', |
| 'download': 'read', |
| 'show': 'read', |
| 'list': 'read', |
| 'edit': 'edit', |
| 'add': 'create', |
| 'copy_role': 'create', |
| } |
| base_permissions = [ |
| permissions.ACTION_CAN_CREATE, |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_EDIT, |
| permissions.ACTION_CAN_DELETE, |
| ] |
| |
| |
| class ResourceModelView(ViewMenuModelView): |
| """Customize permission names for FAB's builtin ViewMenuModelView.""" |
| |
| class_permission_name = permissions.RESOURCE_RESOURCE |
| route_base = "/resources" |
| method_permission_name = { |
| 'list': 'read', |
| } |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| ] |
| |
| list_title = lazy_gettext("List Resources") |
| show_title = lazy_gettext("Show Resource") |
| add_title = lazy_gettext("Add Resource") |
| edit_title = lazy_gettext("Edit Resource") |
| |
| label_columns = {"name": lazy_gettext("Name")} |
| |
| |
| class CustomUserInfoEditView(UserInfoEditView): |
| """Customize permission names for FAB's builtin UserInfoEditView.""" |
| |
| class_permission_name = permissions.RESOURCE_MY_PROFILE |
| route_base = "/userinfoeditview" |
| method_permission_name = { |
| 'this_form_get': 'edit', |
| 'this_form_post': 'edit', |
| } |
| base_permissions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ] |
| |
| |
| class CustomUserStatsChartView(UserStatsChartView): |
| """Customize permission names for FAB's builtin UserStatsChartView.""" |
| |
| class_permission_name = permissions.RESOURCE_USER_STATS_CHART |
| route_base = "/userstatschartview" |
| method_permission_name = { |
| 'chart': 'read', |
| 'list': 'read', |
| } |
| base_permissions = [permissions.ACTION_CAN_READ] |
| |
| |
| class MultiResourceUserMixin: |
| """Remaps UserModelView permissions to new resources and actions.""" |
| |
| _class_permission_name = permissions.RESOURCE_USER |
| |
| class_permission_name_mapping = { |
| 'userinfoedit': permissions.RESOURCE_MY_PROFILE, |
| 'userinfo': permissions.RESOURCE_MY_PROFILE, |
| } |
| |
| method_permission_name = { |
| 'userinfo': 'read', |
| 'download': 'read', |
| 'show': 'read', |
| 'list': 'read', |
| 'edit': 'edit', |
| 'userinfoedit': 'edit', |
| 'delete': 'delete', |
| } |
| |
| base_permissions = [ |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_EDIT, |
| permissions.ACTION_CAN_DELETE, |
| ] |
| |
| @property |
| def class_permission_name(self): |
| """Returns appropriate permission name depending on request method name.""" |
| if request: |
| action_name = request.view_args.get("name") |
| _, method_name = request.url_rule.endpoint.rsplit(".", 1) |
| if method_name == 'action' and action_name: |
| return self.class_permission_name_mapping.get(action_name, self._class_permission_name) |
| if method_name: |
| return self.class_permission_name_mapping.get(method_name, self._class_permission_name) |
| return self._class_permission_name |
| |
| @class_permission_name.setter |
| def class_permission_name(self, name): |
| self._class_permission_name = name |
| |
| @expose("/show/<pk>", methods=["GET"]) |
| @has_access |
| def show(self, pk): |
| pk = self._deserialize_pk_if_composite(pk) |
| widgets = self._show(pk) |
| widgets['show'].template_args['actions'].pop('userinfoedit') |
| return self.render_template( |
| self.show_template, |
| pk=pk, |
| title=self.show_title, |
| widgets=widgets, |
| related_views=self._related_views, |
| ) |
| |
| |
| class CustomUserDBModelView(MultiResourceUserMixin, UserDBModelView): |
| """Customize permission names for FAB's builtin UserDBModelView.""" |
| |
| _class_permission_name = permissions.RESOURCE_USER |
| |
| class_permission_name_mapping = { |
| 'resetmypassword': permissions.RESOURCE_MY_PASSWORD, |
| 'resetpasswords': permissions.RESOURCE_PASSWORD, |
| 'userinfoedit': permissions.RESOURCE_MY_PROFILE, |
| 'userinfo': permissions.RESOURCE_MY_PROFILE, |
| } |
| |
| method_permission_name = { |
| 'add': 'create', |
| 'download': 'read', |
| 'show': 'read', |
| 'list': 'read', |
| 'edit': 'edit', |
| 'delete': 'delete', |
| 'resetmypassword': 'read', |
| 'resetpasswords': 'read', |
| 'userinfo': 'read', |
| 'userinfoedit': 'read', |
| } |
| |
| base_permissions = [ |
| permissions.ACTION_CAN_CREATE, |
| permissions.ACTION_CAN_READ, |
| permissions.ACTION_CAN_EDIT, |
| permissions.ACTION_CAN_DELETE, |
| ] |
| |
| |
| class CustomUserLDAPModelView(MultiResourceUserMixin, UserLDAPModelView): |
| """Customize permission names for FAB's builtin UserLDAPModelView.""" |
| |
| |
| class CustomUserOAuthModelView(MultiResourceUserMixin, UserOAuthModelView): |
| """Customize permission names for FAB's builtin UserOAuthModelView.""" |
| |
| |
| class CustomUserOIDModelView(MultiResourceUserMixin, UserOIDModelView): |
| """Customize permission names for FAB's builtin UserOIDModelView.""" |
| |
| |
| class CustomUserRemoteUserModelView(MultiResourceUserMixin, UserRemoteUserModelView): |
| """Customize permission names for FAB's builtin UserRemoteUserModelView.""" |