| # -*- coding: utf-8 -*- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import copy |
| import itertools |
| import json |
| import logging |
| import math |
| import os |
| import socket |
| import traceback |
| from collections import defaultdict |
| from datetime import timedelta |
| from urllib.parse import unquote |
| |
| import six |
| from six.moves.urllib.parse import quote, urlparse |
| |
| import pendulum |
| import sqlalchemy as sqla |
| from flask import ( |
| Markup, Response, escape, flash, jsonify, make_response, redirect, render_template, request, |
| session as flask_session, url_for, |
| ) |
| from flask._compat import PY2 |
| from flask_appbuilder import BaseView, ModelView, expose, has_access, permission_name |
| from flask_appbuilder.actions import action |
| from flask_appbuilder.models.sqla.filters import BaseFilter |
| from flask_babel import lazy_gettext |
| import lazy_object_proxy |
| from jinja2.utils import htmlsafe_json_dumps # type: ignore |
| from pygments import highlight, lexers |
| from pygments.formatters import HtmlFormatter |
| from sqlalchemy import and_, desc, func, or_, union_all |
| from sqlalchemy.orm import joinedload |
| from wtforms import SelectField, validators |
| |
| import nvd3 |
| |
| import airflow |
| from airflow import models, jobs |
| from airflow import settings, configuration |
| from airflow.configuration import conf |
| from airflow.api.common.experimental.mark_tasks import (set_dag_run_state_to_success, |
| set_dag_run_state_to_failed) |
| from airflow.models import Connection, DagModel, DagRun, DagTag, Log, SlaMiss, TaskFail, XCom, errors |
| from airflow.exceptions import AirflowException |
| from airflow.models.dagcode import DagCode |
| from airflow.settings import STATE_COLORS, STORE_SERIALIZED_DAGS |
| from airflow.ti_deps.dep_context import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS, DepContext |
| from airflow.utils import timezone |
| from airflow.utils.dates import infer_time_unit, scale_time_units |
| from airflow.utils.db import provide_session, create_session |
| from airflow.utils.helpers import alchemy_to_dict, render_log_filename |
| from airflow.utils.state import State |
| from airflow.www_rbac import utils as wwwutils |
| from airflow.www_rbac.app import app, appbuilder |
| from airflow.www_rbac.decorators import action_logging, gzipped, has_dag_access |
| from airflow.www_rbac.forms import (DateTimeForm, DateTimeWithNumRunsForm, |
| DateTimeWithNumRunsWithDagRunsForm, |
| DagRunForm, ConnectionForm) |
| from airflow.www_rbac.widgets import AirflowModelListWidget |
| |
| |
| PAGE_SIZE = conf.getint('webserver', 'page_size') |
| FILTER_TAGS_COOKIE = 'tags_filter' |
| FILTER_STATUS_COOKIE = 'dag_status_filter' |
| |
| if os.environ.get('SKIP_DAGS_PARSING') != 'True': |
| dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS) |
| else: |
| dagbag = models.DagBag(os.devnull, include_examples=False) |
| |
| |
| def get_safe_url(url): |
| """Given a user-supplied URL, ensure it points to our web server""" |
| try: |
| valid_schemes = ['http', 'https', ''] |
| valid_netlocs = [request.host, ''] |
| |
| parsed = urlparse(url) |
| if parsed.scheme in valid_schemes and parsed.netloc in valid_netlocs: |
| return url |
| except Exception as e: # pylint: disable=broad-except |
| logging.debug("Error validating value in origin parameter passed to URL: %s", url) |
| logging.debug("Error: %s", e) |
| pass |
| |
| return url_for('Airflow.index') |
| |
| |
| def get_date_time_num_runs_dag_runs_form_data(request, session, dag): |
| dttm = request.args.get('execution_date') |
| if dttm: |
| dttm = pendulum.parse(dttm) |
| else: |
| dttm = dag.latest_execution_date or timezone.utcnow() |
| |
| base_date = request.args.get('base_date') |
| if base_date: |
| base_date = timezone.parse(base_date) |
| else: |
| # The DateTimeField widget truncates milliseconds and would loose |
| # the first dag run. Round to next second. |
| base_date = (dttm + timedelta(seconds=1)).replace(microsecond=0) |
| |
| default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') |
| num_runs = request.args.get('num_runs') |
| num_runs = int(num_runs) if num_runs else default_dag_run |
| |
| DR = models.DagRun |
| drs = ( |
| session.query(DR) |
| .filter( |
| DR.dag_id == dag.dag_id, |
| DR.execution_date <= base_date) |
| .order_by(desc(DR.execution_date)) |
| .limit(num_runs) |
| .all() |
| ) |
| dr_choices = [] |
| dr_state = None |
| for dr in drs: |
| dr_choices.append((dr.execution_date.isoformat(), dr.run_id)) |
| if dttm == dr.execution_date: |
| dr_state = dr.state |
| |
| # Happens if base_date was changed and the selected dag run is not in result |
| if not dr_state and drs: |
| dr = drs[0] |
| dttm = dr.execution_date |
| dr_state = dr.state |
| |
| return { |
| 'dttm': dttm, |
| 'base_date': base_date, |
| 'num_runs': num_runs, |
| 'execution_date': dttm.isoformat(), |
| 'dr_choices': dr_choices, |
| 'dr_state': dr_state, |
| } |
| |
| |
| ###################################################################################### |
| # BaseViews |
| ###################################################################################### |
| @app.errorhandler(404) |
| def circles(error): |
| return render_template( |
| 'airflow/circles.html', hostname=socket.getfqdn() if conf.getboolean( |
| 'webserver', |
| 'EXPOSE_HOSTNAME', |
| fallback=True) else 'redact'), 404 |
| |
| |
| @app.errorhandler(500) |
| def show_traceback(error): |
| from airflow.utils import asciiart as ascii_ |
| return render_template( |
| 'airflow/traceback.html', |
| hostname=socket.getfqdn() if conf.getboolean( |
| 'webserver', |
| 'EXPOSE_HOSTNAME', |
| fallback=True) else 'redact', |
| nukular=ascii_.nukular, |
| info=traceback.format_exc() if conf.getboolean( |
| 'webserver', |
| 'EXPOSE_STACKTRACE', |
| fallback=True) else 'Error! Please contact server admin'), 500 |
| |
| |
| class AirflowBaseView(BaseView): |
| route_base = '' |
| |
| # Make our macros available to our UI templates too. |
| extra_args = { |
| 'macros': airflow.macros, |
| } |
| |
| def render_template(self, *args, **kwargs): |
| return super(AirflowBaseView, self).render_template( |
| *args, |
| # Cache this at most once per request, not for the lifetime of the view instanc |
| scheduler_job=lazy_object_proxy.Proxy(jobs.SchedulerJob.most_recent_job), |
| **kwargs |
| ) |
| |
| |
| class Airflow(AirflowBaseView): |
| @expose('/health') |
| def health(self): |
| """ |
| An endpoint helping check the health status of the Airflow instance, |
| including metadatabase and scheduler. |
| """ |
| |
| payload = { |
| 'metadatabase': {'status': 'unhealthy'} |
| } |
| latest_scheduler_heartbeat = None |
| scheduler_status = 'unhealthy' |
| payload['metadatabase'] = {'status': 'healthy'} |
| try: |
| scheduler_job = jobs.SchedulerJob.most_recent_job() |
| |
| if scheduler_job: |
| latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat() |
| if scheduler_job.is_alive(): |
| scheduler_status = 'healthy' |
| except Exception: |
| payload['metadatabase']['status'] = 'unhealthy' |
| |
| payload['scheduler'] = {'status': scheduler_status, |
| 'latest_scheduler_heartbeat': latest_scheduler_heartbeat} |
| |
| return wwwutils.json_response(payload) |
| |
| @expose('/home') |
| @has_access |
| def index(self): |
| hide_paused_dags_by_default = conf.getboolean('webserver', |
| 'hide_paused_dags_by_default') |
| |
| default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') |
| num_runs = request.args.get('num_runs') |
| num_runs = int(num_runs) if num_runs else default_dag_run |
| |
| def get_int_arg(value, default=0): |
| try: |
| return int(value) |
| except ValueError: |
| return default |
| |
| arg_current_page = request.args.get('page', '0') |
| arg_search_query = request.args.get('search', None) |
| arg_tags_filter = request.args.getlist('tags', None) |
| arg_status_filter = request.args.get('status', None) |
| |
| if request.args.get('reset_tags') is not None: |
| flask_session[FILTER_TAGS_COOKIE] = None |
| arg_tags_filter = None |
| else: |
| cookie_val = flask_session.get(FILTER_TAGS_COOKIE) |
| if arg_tags_filter: |
| flask_session[FILTER_TAGS_COOKIE] = ','.join(arg_tags_filter) |
| elif cookie_val: |
| arg_tags_filter = cookie_val.split(',') |
| |
| if arg_status_filter is None: |
| cookie_val = flask_session.get(FILTER_STATUS_COOKIE) |
| if cookie_val: |
| arg_status_filter = cookie_val |
| else: |
| arg_status_filter = 'active' if hide_paused_dags_by_default else 'all' |
| flask_session[FILTER_STATUS_COOKIE] = arg_status_filter |
| else: |
| status = arg_status_filter.strip().lower() |
| flask_session[FILTER_STATUS_COOKIE] = status |
| arg_status_filter = status |
| |
| dags_per_page = PAGE_SIZE |
| current_page = get_int_arg(arg_current_page, default=0) |
| |
| start = current_page * dags_per_page |
| end = start + dags_per_page |
| |
| # Get all the dag id the user could access |
| filter_dag_ids = appbuilder.sm.get_accessible_dag_ids() |
| |
| with create_session() as session: |
| # read orm_dags from the db |
| dags_query = session.query(DagModel).filter( |
| ~DagModel.is_subdag, DagModel.is_active |
| ) |
| |
| if arg_search_query: |
| dags_query = dags_query.filter( |
| DagModel.dag_id.ilike('%' + arg_search_query + '%') | |
| DagModel.owners.ilike('%' + arg_search_query + '%') |
| ) |
| |
| if arg_tags_filter: |
| dags_query = dags_query.filter(DagModel.tags.any(DagTag.name.in_(arg_tags_filter))) |
| |
| if 'all_dags' not in filter_dag_ids: |
| dags_query = dags_query.filter(DagModel.dag_id.in_(filter_dag_ids)) |
| |
| all_dags = dags_query |
| active_dags = dags_query.filter(~DagModel.is_paused) |
| paused_dags = dags_query.filter(DagModel.is_paused) |
| |
| is_paused_count = dict( |
| all_dags.with_entities(DagModel.is_paused, func.count(DagModel.dag_id)) |
| .group_by(DagModel.is_paused).all() |
| ) |
| status_count_active = is_paused_count.get(False, 0) |
| status_count_paused = is_paused_count.get(True, 0) |
| all_dags_count = status_count_active + status_count_paused |
| if arg_status_filter == 'active': |
| current_dags = active_dags |
| num_of_all_dags = status_count_active |
| elif arg_status_filter == 'paused': |
| current_dags = paused_dags |
| num_of_all_dags = status_count_paused |
| else: |
| current_dags = all_dags |
| num_of_all_dags = all_dags_count |
| |
| dags = current_dags.order_by(DagModel.dag_id).options( |
| joinedload(DagModel.tags)).offset(start).limit(dags_per_page).all() |
| |
| dagtags = session.query(DagTag.name).distinct(DagTag.name).all() |
| tags = [ |
| {"name": name, "selected": bool(arg_tags_filter and name in arg_tags_filter)} |
| for name, in dagtags |
| ] |
| |
| import_errors = session.query(errors.ImportError).all() |
| |
| for ie in import_errors: |
| flash( |
| "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie), |
| "dag_import_error") |
| |
| from airflow.plugins_manager import import_errors as plugin_import_errors |
| for filename, stacktrace in plugin_import_errors.items(): |
| flash( |
| "Broken plugin: [{filename}] {stacktrace}".format( |
| stacktrace=stacktrace, |
| filename=filename), |
| "error") |
| |
| num_of_pages = int(math.ceil(num_of_all_dags / float(dags_per_page))) |
| |
| state_color_mapping = State.state_color.copy() |
| state_color_mapping["null"] = state_color_mapping.pop(None) |
| state_color_mapping.update(STATE_COLORS) |
| |
| return self.render_template( |
| 'airflow/dags.html', |
| dags=dags, |
| current_page=current_page, |
| search_query=arg_search_query if arg_search_query else '', |
| page_size=dags_per_page, |
| num_of_pages=num_of_pages, |
| num_dag_from=min(start + 1, num_of_all_dags), |
| num_dag_to=min(end, num_of_all_dags), |
| num_of_all_dags=num_of_all_dags, |
| paging=wwwutils.generate_pages(current_page, |
| num_of_pages, |
| search=escape(arg_search_query) if arg_search_query else None, |
| status=arg_status_filter if arg_status_filter else None), |
| num_runs=num_runs, |
| tags=tags, |
| state_color=state_color_mapping, |
| status_filter=arg_status_filter, |
| status_count_all=all_dags_count, |
| status_count_active=status_count_active, |
| status_count_paused=status_count_paused) |
| |
| @expose('/dag_stats', methods=['POST']) |
| @has_access |
| @provide_session |
| def dag_stats(self, session=None): |
| dr = models.DagRun |
| |
| allowed_dag_ids = appbuilder.sm.get_accessible_dag_ids() |
| |
| if 'all_dags' in allowed_dag_ids: |
| allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)] |
| |
| dag_state_stats = session.query(dr.dag_id, dr.state, sqla.func.count(dr.state))\ |
| .group_by(dr.dag_id, dr.state) |
| |
| # Filter by post parameters |
| selected_dag_ids = { |
| unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id |
| } |
| |
| if selected_dag_ids: |
| filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids) |
| else: |
| filter_dag_ids = allowed_dag_ids |
| |
| if not filter_dag_ids: |
| return wwwutils.json_response({}) |
| |
| payload = {} |
| dag_state_stats = dag_state_stats.filter(dr.dag_id.in_(filter_dag_ids)) |
| data = {} |
| |
| for dag_id, state, count in dag_state_stats: |
| if dag_id not in data: |
| data[dag_id] = {} |
| data[dag_id][state] = count |
| |
| for dag_id in filter_dag_ids: |
| payload[dag_id] = [] |
| for state in State.dag_states: |
| count = data.get(dag_id, {}).get(state, 0) |
| payload[dag_id].append({ |
| 'state': state, |
| 'count': count |
| }) |
| |
| return wwwutils.json_response(payload) |
| |
| @expose('/task_stats', methods=['POST']) |
| @has_access |
| @provide_session |
| def task_stats(self, session=None): |
| TI = models.TaskInstance |
| DagRun = models.DagRun |
| Dag = models.DagModel |
| allowed_dag_ids = set(appbuilder.sm.get_accessible_dag_ids()) |
| |
| if not allowed_dag_ids: |
| return wwwutils.json_response({}) |
| |
| if 'all_dags' in allowed_dag_ids: |
| allowed_dag_ids = {dag_id for dag_id, in session.query(models.DagModel.dag_id)} |
| |
| # Filter by post parameters |
| selected_dag_ids = { |
| unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id |
| } |
| |
| if selected_dag_ids: |
| filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids) |
| else: |
| filter_dag_ids = allowed_dag_ids |
| |
| LastDagRun = ( |
| session.query( |
| DagRun.dag_id, |
| sqla.func.max(DagRun.execution_date).label('execution_date')) |
| .join(Dag, Dag.dag_id == DagRun.dag_id) |
| .filter(DagRun.state != State.RUNNING) |
| .filter(Dag.is_active == True) # noqa |
| .group_by(DagRun.dag_id) |
| ) |
| |
| RunningDagRun = ( |
| session.query(DagRun.dag_id, DagRun.execution_date) |
| .join(Dag, Dag.dag_id == DagRun.dag_id) |
| .filter(DagRun.state == State.RUNNING) |
| .filter(Dag.is_active == True) # noqa |
| ) |
| |
| if selected_dag_ids: |
| LastDagRun = LastDagRun.filter(DagRun.dag_id.in_(filter_dag_ids)) |
| RunningDagRun = RunningDagRun.filter(DagRun.dag_id.in_(filter_dag_ids)) |
| |
| LastDagRun = LastDagRun.subquery('last_dag_run') |
| RunningDagRun = RunningDagRun.subquery('running_dag_run') |
| |
| # Select all task_instances from active dag_runs. |
| # If no dag_run is active, return task instances from most recent dag_run. |
| LastTI = ( |
| session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) |
| .join(LastDagRun, |
| and_(LastDagRun.c.dag_id == TI.dag_id, |
| LastDagRun.c.execution_date == TI.execution_date)) |
| ) |
| RunningTI = ( |
| session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) |
| .join(RunningDagRun, |
| and_(RunningDagRun.c.dag_id == TI.dag_id, |
| RunningDagRun.c.execution_date == TI.execution_date)) |
| ) |
| |
| if selected_dag_ids: |
| LastTI = LastTI.filter(TI.dag_id.in_(filter_dag_ids)) |
| RunningTI = RunningTI.filter(TI.dag_id.in_(filter_dag_ids)) |
| |
| UnionTI = union_all(LastTI, RunningTI).alias('union_ti') |
| |
| qry = ( |
| session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count()) |
| .group_by(UnionTI.c.dag_id, UnionTI.c.state) |
| ) |
| |
| data = {} |
| for dag_id, state, count in qry: |
| if dag_id not in data: |
| data[dag_id] = {} |
| data[dag_id][state] = count |
| |
| payload = {} |
| for dag_id in filter_dag_ids: |
| payload[dag_id] = [] |
| for state in State.task_states: |
| count = data.get(dag_id, {}).get(state, 0) |
| payload[dag_id].append({ |
| 'state': state, |
| 'count': count |
| }) |
| return wwwutils.json_response(payload) |
| |
| @expose('/last_dagruns', methods=['POST']) |
| @has_access |
| @provide_session |
| def last_dagruns(self, session=None): |
| DagRun = models.DagRun |
| |
| allowed_dag_ids = appbuilder.sm.get_accessible_dag_ids() |
| |
| if 'all_dags' in allowed_dag_ids: |
| allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)] |
| |
| # Filter by post parameters |
| selected_dag_ids = { |
| unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id |
| } |
| |
| if selected_dag_ids: |
| filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids) |
| else: |
| filter_dag_ids = allowed_dag_ids |
| |
| if not filter_dag_ids: |
| return wwwutils.json_response({}) |
| |
| query = session.query( |
| DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('last_run') |
| ).group_by(DagRun.dag_id) |
| |
| # Filter to only ask for accessible and selected dags |
| query = query.filter(DagRun.dag_id.in_(filter_dag_ids)) |
| |
| resp = { |
| r.dag_id.replace('.', '__dot__'): { |
| 'dag_id': r.dag_id, |
| 'last_run': r.last_run.isoformat(), |
| } for r in query |
| } |
| return wwwutils.json_response(resp) |
| |
| @expose('/code') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @provide_session |
| def code(self, session=None): |
| all_errors = "" |
| |
| try: |
| dag_id = request.args.get('dag_id') |
| dag_orm = DagModel.get_dagmodel(dag_id, session=session) |
| code = DagCode.get_code_by_fileloc(dag_orm.fileloc) |
| html_code = Markup(highlight( |
| code, lexers.PythonLexer(), HtmlFormatter(linenos=True))) |
| |
| except Exception as e: |
| all_errors += ( |
| "Exception encountered during " + |
| "dag_id retrieval/dag retrieval fallback/code highlighting:\n\n{}\n".format(e) |
| ) |
| html_code = Markup('<p>Failed to load file.</p><p>Details: {}</p>').format( |
| escape(all_errors)) |
| |
| return self.render_template( |
| 'airflow/dag_code.html', html_code=html_code, dag=dag_orm, title=dag_id, |
| root=request.args.get('root'), |
| demo_mode=conf.getboolean('webserver', 'demo_mode'), |
| wrapped=conf.getboolean('webserver', 'default_wrap')) |
| |
| @expose('/dag_details') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @provide_session |
| def dag_details(self, session=None): |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| title = "DAG details" |
| root = request.args.get('root', '') |
| |
| TI = models.TaskInstance |
| states = ( |
| session.query(TI.state, sqla.func.count(TI.dag_id)) |
| .filter(TI.dag_id == dag_id) |
| .group_by(TI.state) |
| .all() |
| ) |
| |
| active_runs = models.DagRun.find( |
| dag_id=dag_id, |
| state=State.RUNNING, |
| external_trigger=False |
| ) |
| |
| return self.render_template( |
| 'airflow/dag_details.html', |
| dag=dag, title=title, root=root, states=states, State=State, active_runs=active_runs) |
| |
| @expose('/pickle_info') |
| @has_access |
| def pickle_info(self): |
| d = {} |
| filter_dag_ids = appbuilder.sm.get_accessible_dag_ids() |
| if not filter_dag_ids: |
| return wwwutils.json_response({}) |
| dag_id = request.args.get('dag_id') |
| dags = [dagbag.dags.get(dag_id)] if dag_id else dagbag.dags.values() |
| for dag in dags: |
| if 'all_dags' in filter_dag_ids or dag.dag_id in filter_dag_ids: |
| if not dag.is_subdag: |
| d[dag.dag_id] = dag.pickle_info() |
| return wwwutils.json_response(d) |
| |
| @expose('/rendered') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def rendered(self, session=None): |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| dttm = pendulum.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| root = request.args.get('root', '') |
| |
| logging.info("Retrieving rendered templates.") |
| dag = dagbag.get_dag(dag_id) |
| |
| task = copy.copy(dag.get_task(task_id)) |
| ti = models.TaskInstance(task=task, execution_date=dttm) |
| try: |
| ti.get_rendered_template_fields() |
| except AirflowException as e: |
| msg = "Error rendering template: " + escape(e) |
| if not PY2: |
| if e.__cause__: |
| msg += Markup("<br/><br/>OriginalError: ") + escape(e.__cause__) |
| flash(msg, "error") |
| except Exception as e: |
| flash("Error rendering template: " + str(e), "error") |
| title = "Rendered Template" |
| html_dict = {} |
| for template_field in task.template_fields: |
| content = getattr(task, template_field) |
| if template_field in wwwutils.get_attr_renderer(): |
| html_dict[template_field] = wwwutils.get_attr_renderer()[template_field](content) |
| else: |
| html_dict[template_field] = Markup("<pre><code>{}</pre></code>").format(str(content)) |
| |
| return self.render_template( |
| 'airflow/ti_code.html', |
| html_dict=html_dict, |
| dag=dag, |
| task_id=task_id, |
| execution_date=execution_date, |
| form=form, |
| root=root, |
| title=title) |
| |
| @expose('/get_logs_with_metadata') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def get_logs_with_metadata(self, session=None): |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| dttm = pendulum.parse(execution_date) |
| if request.args.get('try_number') is not None: |
| try_number = int(request.args.get('try_number')) |
| else: |
| try_number = None |
| metadata = request.args.get('metadata') |
| metadata = json.loads(metadata) |
| response_format = request.args.get('format', 'json') |
| |
| # metadata may be null |
| if not metadata: |
| metadata = {} |
| |
| # Convert string datetime into actual datetime |
| try: |
| execution_date = timezone.parse(execution_date) |
| except ValueError: |
| error_message = ( |
| 'Given execution date, {}, could not be identified ' |
| 'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format( |
| execution_date)) |
| response = jsonify({'error': error_message}) |
| response.status_code = 400 |
| |
| return response |
| |
| logger = logging.getLogger('airflow.task') |
| task_log_reader = conf.get('core', 'task_log_reader') |
| handler = next((handler for handler in logger.handlers |
| if handler.name == task_log_reader), None) |
| |
| ti = session.query(models.TaskInstance).filter( |
| models.TaskInstance.dag_id == dag_id, |
| models.TaskInstance.task_id == task_id, |
| models.TaskInstance.execution_date == dttm).first() |
| |
| def _get_logs_with_metadata(try_number, metadata): |
| if ti is None: |
| logs = ["*** Task instance did not exist in the DB\n"] |
| metadata['end_of_log'] = True |
| else: |
| logs, metadatas = handler.read(ti, try_number, metadata=metadata) |
| metadata = metadatas[0] |
| return logs, metadata |
| |
| try: |
| if ti is not None: |
| dag = dagbag.get_dag(dag_id) |
| ti.task = dag.get_task(ti.task_id) |
| if response_format == 'json': |
| logs, metadata = _get_logs_with_metadata(try_number, metadata) |
| message = logs[0] if try_number is not None else logs |
| return jsonify(message=message, metadata=metadata) |
| |
| filename_template = conf.get('core', 'LOG_FILENAME_TEMPLATE') |
| attachment_filename = render_log_filename( |
| ti=ti, |
| try_number="all" if try_number is None else try_number, |
| filename_template=filename_template) |
| metadata['download_logs'] = True |
| |
| def _generate_log_stream(try_number, metadata): |
| if try_number is None and ti is not None: |
| next_try = ti.next_try_number |
| try_numbers = list(range(1, next_try)) |
| else: |
| try_numbers = [try_number] |
| for try_number in try_numbers: |
| metadata.pop('end_of_log', None) |
| metadata.pop('max_offset', None) |
| metadata.pop('offset', None) |
| while 'end_of_log' not in metadata or not metadata['end_of_log']: |
| logs, metadata = _get_logs_with_metadata(try_number, metadata) |
| yield "\n".join(logs) + "\n" |
| return Response(_generate_log_stream(try_number, metadata), |
| mimetype="text/plain", |
| headers={"Content-Disposition": "attachment; filename={}".format( |
| attachment_filename)}) |
| except AttributeError as e: |
| error_message = ["Task log handler {} does not support read logs.\n{}\n" |
| .format(task_log_reader, str(e))] |
| metadata['end_of_log'] = True |
| return jsonify(message=error_message, error=True, metadata=metadata) |
| |
| @expose('/log') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def log(self, session=None): |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| dttm = pendulum.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| dag = dagbag.get_dag(dag_id) |
| |
| ti = session.query(models.TaskInstance).filter( |
| models.TaskInstance.dag_id == dag_id, |
| models.TaskInstance.task_id == task_id, |
| models.TaskInstance.execution_date == dttm).first() |
| |
| num_logs = 0 |
| if ti is not None: |
| num_logs = ti.next_try_number - 1 |
| if ti.state == State.UP_FOR_RESCHEDULE: |
| # Tasks in reschedule state decremented the try number |
| num_logs += 1 |
| logs = [''] * num_logs |
| root = request.args.get('root', '') |
| return self.render_template( |
| 'airflow/ti_log.html', |
| logs=logs, dag=dag, title="Log by attempts", |
| dag_id=dag.dag_id, task_id=task_id, |
| execution_date=execution_date, form=form, |
| root=root, wrapped=conf.getboolean('webserver', 'default_wrap')) |
| |
| @expose('/elasticsearch') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def elasticsearch(self, session=None): |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| try_number = request.args.get('try_number', 1) |
| elasticsearch_frontend = conf.get('elasticsearch', 'frontend') |
| log_id_template = conf.get('elasticsearch', 'log_id_template') |
| log_id = log_id_template.format( |
| dag_id=dag_id, task_id=task_id, |
| execution_date=execution_date, try_number=try_number) |
| url = 'https://' + elasticsearch_frontend.format(log_id=quote(log_id)) |
| return redirect(url) |
| |
| @expose('/task') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @action_logging |
| def task(self): |
| TI = models.TaskInstance |
| |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| # Carrying execution_date through, even though it's irrelevant for |
| # this context |
| execution_date = request.args.get('execution_date') |
| dttm = pendulum.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| root = request.args.get('root', '') |
| dag = dagbag.get_dag(dag_id) |
| |
| if not dag or task_id not in dag.task_ids: |
| flash( |
| "Task [{}.{}] doesn't seem to exist" |
| " at the moment".format(dag_id, task_id), |
| "error") |
| return redirect(url_for('Airflow.index')) |
| task = copy.copy(dag.get_task(task_id)) |
| task.resolve_template_files() |
| ti = TI(task=task, execution_date=dttm) |
| ti.refresh_from_db() |
| |
| ti_attrs = [] |
| for attr_name in dir(ti): |
| if not attr_name.startswith('_'): |
| attr = getattr(ti, attr_name) |
| if type(attr) != type(self.task): # noqa |
| ti_attrs.append((attr_name, str(attr))) |
| |
| task_attrs = [] |
| for attr_name in dir(task): |
| if not attr_name.startswith('_'): |
| attr = getattr(task, attr_name) |
| if type(attr) != type(self.task) and \ |
| attr_name not in wwwutils.get_attr_renderer(): # noqa |
| task_attrs.append((attr_name, str(attr))) |
| |
| # Color coding the special attributes that are code |
| special_attrs_rendered = {} |
| for attr_name in wwwutils.get_attr_renderer(): |
| if hasattr(task, attr_name): |
| source = getattr(task, attr_name) |
| special_attrs_rendered[attr_name] = \ |
| wwwutils.get_attr_renderer()[attr_name](source) |
| |
| no_failed_deps_result = [( |
| "Unknown", |
| "All dependencies are met but the task instance is not running. In most " |
| "cases this just means that the task will probably be scheduled soon " |
| "unless:<br/>\n- The scheduler is down or under heavy load<br/>\n{}\n" |
| "<br/>\nIf this task instance does not start soon please contact your " |
| "Airflow administrator for assistance.".format( |
| "- This task instance already ran and had it's state changed manually " |
| "(e.g. cleared in the UI)<br/>" if ti.state == State.NONE else ""))] |
| |
| # Use the scheduler's context to figure out which dependencies are not met |
| dep_context = DepContext(SCHEDULER_QUEUED_DEPS) |
| failed_dep_reasons = [(dep.dep_name, dep.reason) for dep in |
| ti.get_failed_dep_statuses( |
| dep_context=dep_context)] |
| |
| title = "Task Instance Details" |
| return self.render_template( |
| 'airflow/task.html', |
| task_attrs=task_attrs, |
| ti_attrs=ti_attrs, |
| failed_dep_reasons=failed_dep_reasons or no_failed_deps_result, |
| task_id=task_id, |
| execution_date=execution_date, |
| special_attrs_rendered=special_attrs_rendered, |
| form=form, |
| root=root, |
| dag=dag, title=title) |
| |
| @expose('/xcom') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def xcom(self, session=None): |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| # Carrying execution_date through, even though it's irrelevant for |
| # this context |
| execution_date = request.args.get('execution_date') |
| dttm = pendulum.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| root = request.args.get('root', '') |
| dm_db = models.DagModel |
| ti_db = models.TaskInstance |
| dag = session.query(dm_db).filter(dm_db.dag_id == dag_id).first() |
| ti = session.query(ti_db).filter(ti_db.dag_id == dag_id and ti_db.task_id == task_id).first() |
| |
| if not ti: |
| flash( |
| "Task [{}.{}] doesn't seem to exist" |
| " at the moment".format(dag_id, task_id), |
| "error") |
| return redirect(url_for('Airflow.index')) |
| |
| xcomlist = session.query(XCom).filter( |
| XCom.dag_id == dag_id, XCom.task_id == task_id, |
| XCom.execution_date == dttm).all() |
| |
| attributes = [] |
| for xcom in xcomlist: |
| if not xcom.key.startswith('_'): |
| attributes.append((xcom.key, xcom.value)) |
| |
| title = "XCom" |
| return self.render_template( |
| 'airflow/xcom.html', |
| attributes=attributes, |
| task_id=task_id, |
| execution_date=execution_date, |
| form=form, |
| root=root, |
| dag=dag, title=title) |
| |
| @expose('/run', methods=['POST']) |
| @has_dag_access(can_dag_edit=True) |
| @has_access |
| @action_logging |
| def run(self): |
| dag_id = request.form.get('dag_id') |
| task_id = request.form.get('task_id') |
| origin = get_safe_url(request.form.get('origin')) |
| dag = dagbag.get_dag(dag_id) |
| task = dag.get_task(task_id) |
| |
| execution_date = request.form.get('execution_date') |
| execution_date = pendulum.parse(execution_date) |
| ignore_all_deps = request.form.get('ignore_all_deps') == "true" |
| ignore_task_deps = request.form.get('ignore_task_deps') == "true" |
| ignore_ti_state = request.form.get('ignore_ti_state') == "true" |
| |
| from airflow.executors import get_default_executor |
| executor = get_default_executor() |
| valid_celery_config = False |
| valid_kubernetes_config = False |
| |
| try: |
| from airflow.executors.celery_executor import CeleryExecutor |
| valid_celery_config = isinstance(executor, CeleryExecutor) |
| except ImportError: |
| pass |
| |
| try: |
| from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor |
| valid_kubernetes_config = isinstance(executor, KubernetesExecutor) |
| except ImportError: |
| pass |
| |
| if not valid_celery_config and not valid_kubernetes_config: |
| flash("Only works with the Celery or Kubernetes executors, sorry", "error") |
| return redirect(origin) |
| |
| ti = models.TaskInstance(task=task, execution_date=execution_date) |
| ti.refresh_from_db() |
| |
| # Make sure the task instance can be run |
| dep_context = DepContext( |
| deps=RUNNING_DEPS, |
| ignore_all_deps=ignore_all_deps, |
| ignore_task_deps=ignore_task_deps, |
| ignore_ti_state=ignore_ti_state) |
| failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context)) |
| if failed_deps: |
| failed_deps_str = ", ".join( |
| ["{}: {}".format(dep.dep_name, dep.reason) for dep in failed_deps]) |
| flash("Could not queue task instance for execution, dependencies not met: " |
| "{}".format(failed_deps_str), |
| "error") |
| return redirect(origin) |
| |
| executor.start() |
| executor.queue_task_instance( |
| ti, |
| ignore_all_deps=ignore_all_deps, |
| ignore_task_deps=ignore_task_deps, |
| ignore_ti_state=ignore_ti_state) |
| executor.heartbeat() |
| flash( |
| "Sent {} to the message queue, " |
| "it should start any moment now.".format(ti)) |
| return redirect(origin) |
| |
| @expose('/delete', methods=['POST']) |
| @has_dag_access(can_dag_edit=True) |
| @has_access |
| @action_logging |
| def delete(self): |
| from airflow.api.common.experimental import delete_dag |
| from airflow.exceptions import DagNotFound, DagFileExists |
| |
| dag_id = request.values.get('dag_id') |
| origin = get_safe_url(request.values.get('origin')) |
| |
| try: |
| delete_dag.delete_dag(dag_id) |
| except DagNotFound: |
| flash("DAG with id {} not found. Cannot delete".format(dag_id), 'error') |
| return redirect(request.referrer) |
| except DagFileExists: |
| flash("Dag id {} is still in DagBag. " |
| "Remove the DAG file first.".format(dag_id), |
| 'error') |
| return redirect(request.referrer) |
| |
| flash("Deleting DAG with id {}. May take a couple minutes to fully" |
| " disappear.".format(dag_id)) |
| |
| # Upon success return to origin. |
| return redirect(origin) |
| |
| @expose('/trigger', methods=['POST', 'GET']) |
| @has_dag_access(can_dag_edit=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def trigger(self, session=None): |
| |
| dag_id = request.values.get('dag_id') |
| origin = get_safe_url(request.values.get('origin')) |
| |
| if request.method == 'GET': |
| return self.render_template( |
| 'airflow/trigger.html', |
| dag_id=dag_id, |
| origin=origin, |
| conf='' |
| ) |
| |
| dag = session.query(models.DagModel).filter(models.DagModel.dag_id == dag_id).first() |
| if not dag: |
| flash("Cannot find dag {}".format(dag_id)) |
| return redirect(origin) |
| |
| execution_date = timezone.utcnow() |
| run_id = "manual__{0}".format(execution_date.isoformat()) |
| |
| dr = DagRun.find(dag_id=dag_id, run_id=run_id) |
| if dr: |
| flash("This run_id {} already exists".format(run_id)) |
| return redirect(origin) |
| |
| run_conf = {} |
| conf = request.values.get('conf') |
| if conf: |
| try: |
| run_conf = json.loads(conf) |
| except ValueError: |
| flash("Invalid JSON configuration", "error") |
| return self.render_template( |
| 'airflow/trigger.html', |
| dag_id=dag_id, |
| origin=origin, |
| conf=conf |
| ) |
| |
| dag = dagbag.get_dag(dag_id) |
| dag.create_dagrun( |
| run_id=run_id, |
| execution_date=execution_date, |
| state=State.RUNNING, |
| conf=run_conf, |
| external_trigger=True |
| ) |
| |
| flash( |
| "Triggered {}, " |
| "it should start any moment now.".format(dag_id)) |
| return redirect(origin) |
| |
| def _clear_dag_tis(self, dag, start_date, end_date, origin, |
| recursive=False, confirmed=False, only_failed=False): |
| from airflow.exceptions import AirflowException |
| |
| if confirmed: |
| count = dag.clear( |
| start_date=start_date, |
| end_date=end_date, |
| include_subdags=recursive, |
| include_parentdag=recursive, |
| only_failed=only_failed, |
| ) |
| |
| flash("{0} task instances have been cleared".format(count)) |
| return redirect(origin) |
| |
| try: |
| tis = dag.clear( |
| start_date=start_date, |
| end_date=end_date, |
| include_subdags=recursive, |
| include_parentdag=recursive, |
| only_failed=only_failed, |
| dry_run=True, |
| ) |
| except AirflowException as ex: |
| flash(str(ex), 'error') |
| return redirect(origin) |
| |
| if not tis: |
| flash("No task instances to clear", 'error') |
| response = redirect(origin) |
| else: |
| details = "\n".join([str(t) for t in tis]) |
| |
| response = self.render_template( |
| 'airflow/confirm.html', |
| message=("Here's the list of task instances you are about " |
| "to clear:"), |
| details=details) |
| |
| return response |
| |
| @expose('/clear', methods=['POST']) |
| @has_dag_access(can_dag_edit=True) |
| @has_access |
| @action_logging |
| def clear(self): |
| dag_id = request.form.get('dag_id') |
| task_id = request.form.get('task_id') |
| origin = get_safe_url(request.form.get('origin')) |
| dag = dagbag.get_dag(dag_id) |
| |
| execution_date = request.form.get('execution_date') |
| execution_date = pendulum.parse(execution_date) |
| confirmed = request.form.get('confirmed') == "true" |
| upstream = request.form.get('upstream') == "true" |
| downstream = request.form.get('downstream') == "true" |
| future = request.form.get('future') == "true" |
| past = request.form.get('past') == "true" |
| recursive = request.form.get('recursive') == "true" |
| only_failed = request.form.get('only_failed') == "true" |
| |
| dag = dag.sub_dag( |
| task_regex=r"^{0}$".format(task_id), |
| include_downstream=downstream, |
| include_upstream=upstream) |
| |
| end_date = execution_date if not future else None |
| start_date = execution_date if not past else None |
| |
| return self._clear_dag_tis(dag, start_date, end_date, origin, |
| recursive=recursive, confirmed=confirmed, only_failed=only_failed) |
| |
| @expose('/dagrun_clear', methods=['POST']) |
| @has_dag_access(can_dag_edit=True) |
| @has_access |
| @action_logging |
| def dagrun_clear(self): |
| dag_id = request.form.get('dag_id') |
| origin = get_safe_url(request.form.get('origin')) |
| execution_date = request.form.get('execution_date') |
| confirmed = request.form.get('confirmed') == "true" |
| |
| dag = dagbag.get_dag(dag_id) |
| execution_date = pendulum.parse(execution_date) |
| start_date = execution_date |
| end_date = execution_date |
| |
| return self._clear_dag_tis(dag, start_date, end_date, origin, |
| recursive=True, confirmed=confirmed) |
| |
| @expose('/blocked', methods=['POST']) |
| @has_access |
| @provide_session |
| def blocked(self, session=None): |
| allowed_dag_ids = appbuilder.sm.get_accessible_dag_ids() |
| |
| if 'all_dags' in allowed_dag_ids: |
| allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)] |
| |
| # Filter by post parameters |
| selected_dag_ids = { |
| unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id |
| } |
| |
| if selected_dag_ids: |
| filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids) |
| else: |
| filter_dag_ids = allowed_dag_ids |
| |
| if not filter_dag_ids: |
| return wwwutils.json_response([]) |
| |
| DR = models.DagRun |
| |
| dags = ( |
| session.query(DR.dag_id, sqla.func.count(DR.id)) |
| .filter(DR.state == State.RUNNING) |
| .filter(DR.dag_id.in_(filter_dag_ids)) |
| .group_by(DR.dag_id) |
| ) |
| |
| payload = [] |
| for dag_id, active_dag_runs in dags: |
| max_active_runs = 0 |
| dag = dagbag.get_dag(dag_id) |
| if dag: |
| # TODO: Make max_active_runs a column so we can query for it directly |
| max_active_runs = dag.max_active_runs |
| payload.append({ |
| 'dag_id': dag_id, |
| 'active_dag_run': active_dag_runs, |
| 'max_active_runs': max_active_runs, |
| }) |
| return wwwutils.json_response(payload) |
| |
| def _mark_dagrun_state_as_failed(self, dag_id, execution_date, confirmed, origin): |
| if not execution_date: |
| flash('Invalid execution date', 'error') |
| return redirect(origin) |
| |
| execution_date = pendulum.parse(execution_date) |
| dag = dagbag.get_dag(dag_id) |
| |
| if not dag: |
| flash('Cannot find DAG: {}'.format(dag_id), 'error') |
| return redirect(origin) |
| |
| new_dag_state = set_dag_run_state_to_failed(dag, execution_date, commit=confirmed) |
| |
| if confirmed: |
| flash('Marked failed on {} task instances'.format(len(new_dag_state))) |
| return redirect(origin) |
| |
| else: |
| details = '\n'.join([str(t) for t in new_dag_state]) |
| |
| response = self.render_template( |
| 'airflow/confirm.html', |
| message=("Here's the list of task instances you are about to mark as failed"), |
| details=details) |
| |
| return response |
| |
| def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed, origin): |
| if not execution_date: |
| flash('Invalid execution date', 'error') |
| return redirect(origin) |
| |
| execution_date = pendulum.parse(execution_date) |
| dag = dagbag.get_dag(dag_id) |
| |
| if not dag: |
| flash('Cannot find DAG: {}'.format(dag_id), 'error') |
| return redirect(origin) |
| |
| new_dag_state = set_dag_run_state_to_success(dag, execution_date, |
| commit=confirmed) |
| |
| if confirmed: |
| flash('Marked success on {} task instances'.format(len(new_dag_state))) |
| return redirect(origin) |
| |
| else: |
| details = '\n'.join([str(t) for t in new_dag_state]) |
| |
| response = self.render_template( |
| 'airflow/confirm.html', |
| message=("Here's the list of task instances you are about to mark as success"), |
| details=details) |
| |
| return response |
| |
| @expose('/dagrun_failed', methods=['POST']) |
| @has_dag_access(can_dag_edit=True) |
| @has_access |
| @action_logging |
| def dagrun_failed(self): |
| dag_id = request.form.get('dag_id') |
| execution_date = request.form.get('execution_date') |
| confirmed = request.form.get('confirmed') == 'true' |
| origin = get_safe_url(request.form.get('origin')) |
| return self._mark_dagrun_state_as_failed(dag_id, execution_date, |
| confirmed, origin) |
| |
| @expose('/dagrun_success', methods=['POST']) |
| @has_dag_access(can_dag_edit=True) |
| @has_access |
| @action_logging |
| def dagrun_success(self): |
| dag_id = request.form.get('dag_id') |
| execution_date = request.form.get('execution_date') |
| confirmed = request.form.get('confirmed') == 'true' |
| origin = get_safe_url(request.form.get('origin')) |
| return self._mark_dagrun_state_as_success(dag_id, execution_date, |
| confirmed, origin) |
| |
| def _mark_task_instance_state(self, dag_id, task_id, origin, execution_date, |
| confirmed, upstream, downstream, |
| future, past, state): |
| dag = dagbag.get_dag(dag_id) |
| task = dag.get_task(task_id) |
| task.dag = dag |
| |
| execution_date = pendulum.parse(execution_date) |
| |
| if not dag: |
| flash("Cannot find DAG: {}".format(dag_id)) |
| return redirect(origin) |
| |
| if not task: |
| flash("Cannot find task {} in DAG {}".format(task_id, dag.dag_id)) |
| return redirect(origin) |
| |
| from airflow.api.common.experimental.mark_tasks import set_state |
| |
| if confirmed: |
| altered = set_state(tasks=[task], execution_date=execution_date, |
| upstream=upstream, downstream=downstream, |
| future=future, past=past, state=state, |
| commit=True) |
| |
| flash("Marked {} on {} task instances".format(state, len(altered))) |
| return redirect(origin) |
| |
| to_be_altered = set_state(tasks=[task], execution_date=execution_date, |
| upstream=upstream, downstream=downstream, |
| future=future, past=past, state=state, |
| commit=False) |
| |
| details = "\n".join([str(t) for t in to_be_altered]) |
| |
| response = self.render_template( |
| "airflow/confirm.html", |
| message=("Here's the list of task instances you are about to mark as {}:".format(state)), |
| details=details) |
| |
| return response |
| |
| @expose('/failed', methods=['POST']) |
| @has_dag_access(can_dag_edit=True) |
| @has_access |
| @action_logging |
| def failed(self): |
| dag_id = request.form.get('dag_id') |
| task_id = request.form.get('task_id') |
| origin = get_safe_url(request.form.get('origin')) |
| execution_date = request.form.get('execution_date') |
| |
| confirmed = request.form.get('confirmed') == "true" |
| upstream = request.form.get('failed_upstream') == "true" |
| downstream = request.form.get('failed_downstream') == "true" |
| future = request.form.get('failed_future') == "true" |
| past = request.form.get('failed_past') == "true" |
| |
| return self._mark_task_instance_state(dag_id, task_id, origin, execution_date, |
| confirmed, upstream, downstream, |
| future, past, State.FAILED) |
| |
| @expose('/success', methods=['POST']) |
| @has_dag_access(can_dag_edit=True) |
| @has_access |
| @action_logging |
| def success(self): |
| dag_id = request.form.get('dag_id') |
| task_id = request.form.get('task_id') |
| origin = get_safe_url(request.form.get('origin')) |
| execution_date = request.form.get('execution_date') |
| |
| confirmed = request.form.get('confirmed') == "true" |
| upstream = request.form.get('success_upstream') == "true" |
| downstream = request.form.get('success_downstream') == "true" |
| future = request.form.get('success_future') == "true" |
| past = request.form.get('success_past') == "true" |
| |
| return self._mark_task_instance_state(dag_id, task_id, origin, execution_date, |
| confirmed, upstream, downstream, |
| future, past, State.SUCCESS) |
| |
| @expose('/tree') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @gzipped |
| @action_logging |
| @provide_session |
| def tree(self, session=None): |
| dag_id = request.args.get('dag_id') |
| blur = conf.getboolean('webserver', 'demo_mode') |
| dag = dagbag.get_dag(dag_id) |
| if not dag: |
| flash('DAG "{0}" seems to be missing.'.format(dag_id), "error") |
| return redirect(url_for('Airflow.index')) |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.sub_dag( |
| task_regex=root, |
| include_downstream=False, |
| include_upstream=True) |
| |
| base_date = request.args.get('base_date') |
| num_runs = request.args.get('num_runs') |
| if num_runs: |
| num_runs = int(num_runs) |
| else: |
| num_runs = conf.getint('webserver', 'default_dag_run_display_number') |
| |
| if base_date: |
| base_date = timezone.parse(base_date) |
| else: |
| base_date = dag.latest_execution_date or timezone.utcnow() |
| |
| DR = models.DagRun |
| dag_runs = ( |
| session.query(DR) |
| .filter( |
| DR.dag_id == dag.dag_id, |
| DR.execution_date <= base_date) |
| .order_by(DR.execution_date.desc()) |
| .limit(num_runs) |
| .all() |
| ) |
| dag_runs = { |
| dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs |
| } |
| |
| dates = sorted(list(dag_runs.keys())) |
| max_date = max(dates) if dates else None |
| min_date = min(dates) if dates else None |
| |
| tis = dag.get_task_instances( |
| start_date=min_date, end_date=base_date, session=session) |
| task_instances = {} |
| for ti in tis: |
| task_instances[(ti.task_id, ti.execution_date)] = ti |
| |
| expanded = set() |
| # The default recursion traces every path so that tree view has full |
| # expand/collapse functionality. After 5,000 nodes we stop and fall |
| # back on a quick DFS search for performance. See PR #320. |
| node_count = [0] |
| node_limit = 5000 / max(1, len(dag.leaves)) |
| |
| def encode_ti(ti): |
| if not ti: |
| return None |
| |
| # NOTE: order of entry is important here because client JS relies on it for |
| # tree node reconstruction. Remember to change JS code in tree.html |
| # whenever order is altered. |
| data = [ |
| ti.state, |
| ti.try_number, |
| None, # start_ts |
| None, # duration |
| ] |
| |
| if ti.start_date: |
| # round to seconds to reduce payload size |
| if six.PY2: |
| data[2] = int(pendulum.instance(ti.start_date).timestamp()) |
| else: |
| data[2] = int(ti.start_date.timestamp()) |
| if ti.duration is not None: |
| data[3] = int(ti.duration) |
| |
| return data |
| |
| def recurse_nodes(task, visited): |
| node_count[0] += 1 |
| visited.add(task) |
| task_id = task.task_id |
| |
| node = { |
| 'name': task.task_id, |
| 'instances': [ |
| encode_ti(task_instances.get((task_id, d))) |
| for d in dates |
| ], |
| 'num_dep': len(task.downstream_list), |
| 'operator': task.task_type, |
| 'retries': task.retries, |
| 'owner': task.owner, |
| 'ui_color': task.ui_color, |
| } |
| |
| if task.downstream_list: |
| children = [ |
| recurse_nodes(t, visited) for t in task.downstream_list |
| if node_count[0] < node_limit or t not in visited] |
| |
| # D3 tree uses children vs _children to define what is |
| # expanded or not. The following block makes it such that |
| # repeated nodes are collapsed by default. |
| if task.task_id not in expanded: |
| children_key = 'children' |
| expanded.add(task.task_id) |
| else: |
| children_key = "_children" |
| node[children_key] = children |
| |
| if task.depends_on_past: |
| node['depends_on_past'] = task.depends_on_past |
| if task.start_date: |
| # round to seconds to reduce payload size |
| if six.PY2: |
| node['start_ts'] = int(pendulum.instance(task.start_date).timestamp()) |
| else: |
| node['start_ts'] = int(task.start_date.timestamp()) |
| if task.end_date: |
| # round to seconds to reduce payload size |
| if six.PY2: |
| node['end_ts'] = int(pendulum.instance(task.end_date).timestamp()) |
| else: |
| node['end_ts'] = int(task.end_date.timestamp()) |
| if task.extra_links: |
| node['extra_links'] = task.extra_links |
| return node |
| |
| data = { |
| 'name': '[DAG]', |
| 'children': [recurse_nodes(t, set()) for t in dag.roots], |
| 'instances': [ |
| dag_runs.get(d) or {'execution_date': d.isoformat()} |
| for d in dates |
| ], |
| } |
| |
| form = DateTimeWithNumRunsForm(data={'base_date': max_date, |
| 'num_runs': num_runs}) |
| external_logs = conf.get('elasticsearch', 'frontend') |
| doc_md = wwwutils.wrapped_markdown(getattr(dag, 'doc_md', None), css_class='dag-doc') |
| |
| # avoid spaces to reduce payload size |
| data = htmlsafe_json_dumps(data, separators=(',', ':')) |
| |
| return self.render_template( |
| 'airflow/tree.html', |
| operators=sorted({op.task_type: op for op in dag.tasks}.values(), |
| key=lambda x: x.task_type), |
| root=root, |
| form=form, |
| dag=dag, |
| doc_md=doc_md, |
| data=data, |
| blur=blur, num_runs=num_runs, |
| show_external_logs=bool(external_logs)) |
| |
| @expose('/graph') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @gzipped |
| @action_logging |
| @provide_session |
| def graph(self, session=None): |
| dag_id = request.args.get('dag_id') |
| blur = conf.getboolean('webserver', 'demo_mode') |
| dag = dagbag.get_dag(dag_id) |
| if not dag: |
| flash('DAG "{0}" seems to be missing.'.format(dag_id), "error") |
| return redirect(url_for('Airflow.index')) |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.sub_dag( |
| task_regex=root, |
| include_upstream=True, |
| include_downstream=False) |
| |
| arrange = request.args.get('arrange', dag.orientation) |
| |
| nodes = [] |
| edges = [] |
| for task in dag.tasks: |
| nodes.append({ |
| 'id': task.task_id, |
| 'value': { |
| 'label': task.task_id, |
| 'labelStyle': "fill:{0};".format(task.ui_fgcolor), |
| 'style': "fill:{0};".format(task.ui_color), |
| 'rx': 5, |
| 'ry': 5, |
| } |
| }) |
| |
| def get_downstream(task): |
| for t in task.downstream_list: |
| edge = { |
| 'source_id': task.task_id, |
| 'target_id': t.task_id, |
| } |
| if edge not in edges: |
| edges.append(edge) |
| get_downstream(t) |
| |
| for t in dag.roots: |
| get_downstream(t) |
| |
| dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag) |
| dt_nr_dr_data['arrange'] = arrange |
| dttm = dt_nr_dr_data['dttm'] |
| |
| class GraphForm(DateTimeWithNumRunsWithDagRunsForm): |
| arrange = SelectField("Layout", choices=( |
| ('LR', "Left->Right"), |
| ('RL', "Right->Left"), |
| ('TB', "Top->Bottom"), |
| ('BT', "Bottom->Top"), |
| )) |
| |
| form = GraphForm(data=dt_nr_dr_data) |
| form.execution_date.choices = dt_nr_dr_data['dr_choices'] |
| |
| task_instances = { |
| ti.task_id: alchemy_to_dict(ti) |
| for ti in dag.get_task_instances(dttm, dttm, session=session)} |
| tasks = { |
| t.task_id: { |
| 'dag_id': t.dag_id, |
| 'task_type': t.task_type, |
| 'extra_links': t.extra_links, |
| } |
| for t in dag.tasks} |
| if not tasks: |
| flash("No tasks found", "error") |
| session.commit() |
| doc_md = wwwutils.wrapped_markdown(getattr(dag, 'doc_md', None), css_class='dag-doc') |
| |
| external_logs = conf.get('elasticsearch', 'frontend') |
| return self.render_template( |
| 'airflow/graph.html', |
| dag=dag, |
| form=form, |
| width=request.args.get('width', "100%"), |
| height=request.args.get('height', "800"), |
| execution_date=dttm.isoformat(), |
| state_token=wwwutils.state_token(dt_nr_dr_data['dr_state']), |
| doc_md=doc_md, |
| arrange=arrange, |
| operators=sorted({op.task_type: op for op in dag.tasks}.values(), |
| key=lambda x: x.task_type), |
| blur=blur, |
| root=root or '', |
| task_instances=task_instances, |
| tasks=tasks, |
| nodes=nodes, |
| edges=edges, |
| show_external_logs=bool(external_logs)) |
| |
| @expose('/duration') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def duration(self, session=None): |
| default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| base_date = request.args.get('base_date') |
| num_runs = request.args.get('num_runs') |
| num_runs = int(num_runs) if num_runs else default_dag_run |
| |
| if dag is None: |
| flash('DAG "{0}" seems to be missing.'.format(dag_id), "error") |
| return redirect(url_for('Airflow.index')) |
| |
| if base_date: |
| base_date = pendulum.parse(base_date) |
| else: |
| base_date = dag.latest_execution_date or timezone.utcnow() |
| |
| dates = dag.date_range(base_date, num=-abs(num_runs)) |
| min_date = dates[0] if dates else timezone.utc_epoch() |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.sub_dag( |
| task_regex=root, |
| include_upstream=True, |
| include_downstream=False) |
| |
| chart_height = wwwutils.get_chart_height(dag) |
| chart = nvd3.lineChart( |
| name="lineChart", x_is_date=True, height=chart_height, width="1200") |
| cum_chart = nvd3.lineChart( |
| name="cumLineChart", x_is_date=True, height=chart_height, width="1200") |
| |
| y = defaultdict(list) |
| x = defaultdict(list) |
| cum_y = defaultdict(list) |
| |
| tis = dag.get_task_instances( |
| start_date=min_date, end_date=base_date, session=session) |
| TF = TaskFail |
| ti_fails = ( |
| session.query(TF) |
| .filter(TF.dag_id == dag.dag_id, # noqa |
| TF.execution_date >= min_date, |
| TF.execution_date <= base_date, |
| TF.task_id.in_([t.task_id for t in dag.tasks])) |
| .all() # noqa |
| ) |
| |
| fails_totals = defaultdict(int) |
| for tf in ti_fails: |
| dict_key = (tf.dag_id, tf.task_id, tf.execution_date) |
| if tf.duration: |
| fails_totals[dict_key] += tf.duration |
| |
| for ti in tis: |
| if ti.duration: |
| dttm = wwwutils.epoch(ti.execution_date) |
| x[ti.task_id].append(dttm) |
| y[ti.task_id].append(float(ti.duration)) |
| fails_dict_key = (ti.dag_id, ti.task_id, ti.execution_date) |
| fails_total = fails_totals[fails_dict_key] |
| cum_y[ti.task_id].append(float(ti.duration + fails_total)) |
| |
| # determine the most relevant time unit for the set of task instance |
| # durations for the DAG |
| y_unit = infer_time_unit([d for t in y.values() for d in t]) |
| cum_y_unit = infer_time_unit([d for t in cum_y.values() for d in t]) |
| # update the y Axis on both charts to have the correct time units |
| chart.create_y_axis('yAxis', format='.02f', custom_format=False, |
| label='Duration ({})'.format(y_unit)) |
| chart.axislist['yAxis']['axisLabelDistance'] = '-15' |
| cum_chart.create_y_axis('yAxis', format='.02f', custom_format=False, |
| label='Duration ({})'.format(cum_y_unit)) |
| cum_chart.axislist['yAxis']['axisLabelDistance'] = '-15' |
| |
| for task in dag.tasks: |
| if x[task.task_id]: |
| chart.add_serie(name=task.task_id, x=x[task.task_id], |
| y=scale_time_units(y[task.task_id], y_unit)) |
| cum_chart.add_serie(name=task.task_id, x=x[task.task_id], |
| y=scale_time_units(cum_y[task.task_id], |
| cum_y_unit)) |
| |
| dates = sorted(list({ti.execution_date for ti in tis})) |
| max_date = max([ti.execution_date for ti in tis]) if dates else None |
| |
| session.commit() |
| |
| form = DateTimeWithNumRunsForm(data={'base_date': max_date, |
| 'num_runs': num_runs}) |
| chart.buildcontent() |
| cum_chart.buildcontent() |
| s_index = cum_chart.htmlcontent.rfind('});') |
| cum_chart.htmlcontent = (cum_chart.htmlcontent[:s_index] + |
| "$( document ).trigger('chartload')" + |
| cum_chart.htmlcontent[s_index:]) |
| |
| return self.render_template( |
| 'airflow/duration_chart.html', |
| dag=dag, |
| demo_mode=conf.getboolean('webserver', 'demo_mode'), |
| root=root, |
| form=form, |
| chart=Markup(chart.htmlcontent), |
| cum_chart=Markup(cum_chart.htmlcontent), |
| ) |
| |
| @expose('/tries') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def tries(self, session=None): |
| default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| base_date = request.args.get('base_date') |
| num_runs = request.args.get('num_runs') |
| num_runs = int(num_runs) if num_runs else default_dag_run |
| |
| if base_date: |
| base_date = pendulum.parse(base_date) |
| else: |
| base_date = dag.latest_execution_date or timezone.utcnow() |
| |
| dates = dag.date_range(base_date, num=-abs(num_runs)) |
| min_date = dates[0] if dates else timezone.utc_epoch() |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.sub_dag( |
| task_regex=root, |
| include_upstream=True, |
| include_downstream=False) |
| |
| chart_height = wwwutils.get_chart_height(dag) |
| chart = nvd3.lineChart( |
| name="lineChart", x_is_date=True, y_axis_format='d', height=chart_height, |
| width="1200") |
| |
| for task in dag.tasks: |
| y = [] |
| x = [] |
| for ti in task.get_task_instances(start_date=min_date, |
| end_date=base_date, |
| session=session): |
| dttm = wwwutils.epoch(ti.execution_date) |
| x.append(dttm) |
| # y value should reflect completed tries to have a 0 baseline. |
| y.append(ti.prev_attempted_tries) |
| if x: |
| chart.add_serie(name=task.task_id, x=x, y=y) |
| |
| tis = dag.get_task_instances( |
| start_date=min_date, end_date=base_date, session=session) |
| tries = sorted(list({ti.try_number for ti in tis})) |
| max_date = max([ti.execution_date for ti in tis]) if tries else None |
| |
| session.commit() |
| |
| form = DateTimeWithNumRunsForm(data={'base_date': max_date, |
| 'num_runs': num_runs}) |
| |
| chart.buildcontent() |
| |
| return self.render_template( |
| 'airflow/chart.html', |
| dag=dag, |
| demo_mode=conf.getboolean('webserver', 'demo_mode'), |
| root=root, |
| form=form, |
| chart=Markup(chart.htmlcontent), |
| ) |
| |
| @expose('/landing_times') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def landing_times(self, session=None): |
| default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| base_date = request.args.get('base_date') |
| num_runs = request.args.get('num_runs') |
| num_runs = int(num_runs) if num_runs else default_dag_run |
| |
| if base_date: |
| base_date = pendulum.parse(base_date) |
| else: |
| base_date = dag.latest_execution_date or timezone.utcnow() |
| |
| dates = dag.date_range(base_date, num=-abs(num_runs)) |
| min_date = dates[0] if dates else timezone.utc_epoch() |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.sub_dag( |
| task_regex=root, |
| include_upstream=True, |
| include_downstream=False) |
| |
| chart_height = wwwutils.get_chart_height(dag) |
| chart = nvd3.lineChart( |
| name="lineChart", x_is_date=True, height=chart_height, width="1200") |
| y = {} |
| x = {} |
| for task in dag.tasks: |
| task_id = task.task_id |
| y[task_id] = [] |
| x[task_id] = [] |
| for ti in task.get_task_instances(start_date=min_date, end_date=base_date): |
| ts = ti.execution_date |
| if dag.schedule_interval and dag.following_schedule(ts): |
| ts = dag.following_schedule(ts) |
| if ti.end_date: |
| dttm = wwwutils.epoch(ti.execution_date) |
| secs = (ti.end_date - ts).total_seconds() |
| x[task_id].append(dttm) |
| y[task_id].append(secs) |
| |
| # determine the most relevant time unit for the set of landing times |
| # for the DAG |
| y_unit = infer_time_unit([d for t in y.values() for d in t]) |
| # update the y Axis to have the correct time units |
| chart.create_y_axis('yAxis', format='.02f', custom_format=False, |
| label='Landing Time ({})'.format(y_unit)) |
| chart.axislist['yAxis']['axisLabelDistance'] = '-15' |
| for task in dag.tasks: |
| if x[task.task_id]: |
| chart.add_serie(name=task.task_id, x=x[task.task_id], |
| y=scale_time_units(y[task.task_id], y_unit)) |
| |
| tis = dag.get_task_instances( |
| start_date=min_date, end_date=base_date, session=session) |
| dates = sorted(list({ti.execution_date for ti in tis})) |
| max_date = max([ti.execution_date for ti in tis]) if dates else None |
| |
| session.commit() |
| |
| form = DateTimeWithNumRunsForm(data={'base_date': max_date, |
| 'num_runs': num_runs}) |
| chart.buildcontent() |
| return self.render_template( |
| 'airflow/chart.html', |
| dag=dag, |
| chart=Markup(chart.htmlcontent), |
| height=str(chart_height + 100) + "px", |
| demo_mode=conf.getboolean('webserver', 'demo_mode'), |
| root=root, |
| form=form, |
| ) |
| |
| @expose('/paused', methods=['POST']) |
| @has_dag_access(can_dag_edit=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def paused(self, session=None): |
| dag_id = request.args.get('dag_id') |
| is_paused = True if request.args.get('is_paused') == 'false' else False |
| models.DagModel.get_dagmodel(dag_id).set_is_paused( |
| is_paused=is_paused, |
| store_serialized_dags=STORE_SERIALIZED_DAGS) |
| return "OK" |
| |
| @expose('/refresh', methods=['POST']) |
| @has_dag_access(can_dag_edit=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def refresh(self, session=None): |
| DagModel = models.DagModel |
| dag_id = request.values.get('dag_id') |
| orm_dag = session.query( |
| DagModel).filter(DagModel.dag_id == dag_id).first() |
| |
| if orm_dag: |
| orm_dag.last_expired = timezone.utcnow() |
| session.merge(orm_dag) |
| session.commit() |
| |
| dag = dagbag.get_dag(dag_id) |
| # sync dag permission |
| appbuilder.sm.sync_perm_for_dag(dag_id, dag.access_control) |
| |
| flash("DAG [{}] is now fresh as a daisy".format(dag_id)) |
| return redirect(request.referrer) |
| |
| @expose('/gantt') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def gantt(self, session=None): |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| demo_mode = conf.getboolean('webserver', 'demo_mode') |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.sub_dag( |
| task_regex=root, |
| include_upstream=True, |
| include_downstream=False) |
| |
| dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag) |
| dttm = dt_nr_dr_data['dttm'] |
| |
| form = DateTimeWithNumRunsWithDagRunsForm(data=dt_nr_dr_data) |
| form.execution_date.choices = dt_nr_dr_data['dr_choices'] |
| |
| tis = [ |
| ti for ti in dag.get_task_instances(dttm, dttm, session=session) |
| if ti.start_date and ti.state] |
| tis = sorted(tis, key=lambda ti: ti.start_date) |
| TF = TaskFail |
| ti_fails = list(itertools.chain(*[( |
| session |
| .query(TF) |
| .filter(TF.dag_id == ti.dag_id, |
| TF.task_id == ti.task_id, |
| TF.execution_date == ti.execution_date) |
| .all() |
| ) for ti in tis])) |
| |
| # determine bars to show in the gantt chart |
| # all reschedules of one attempt are combinded into one bar |
| gantt_bar_items = [] |
| |
| tasks = [] |
| for ti in tis: |
| end_date = ti.end_date or timezone.utcnow() |
| # prev_attempted_tries will reflect the currently running try_number |
| # or the try_number of the last complete run |
| # https://issues.apache.org/jira/browse/AIRFLOW-2143 |
| try_count = ti.prev_attempted_tries |
| gantt_bar_items.append((ti.task_id, ti.start_date, end_date, ti.state, try_count)) |
| d = alchemy_to_dict(ti) |
| d['extraLinks'] = dag.get_task(ti.task_id).extra_links |
| tasks.append(d) |
| |
| tf_count = 0 |
| try_count = 1 |
| prev_task_id = "" |
| for tf in ti_fails: |
| end_date = tf.end_date or timezone.utcnow() |
| start_date = tf.start_date or end_date |
| if tf_count != 0 and tf.task_id == prev_task_id: |
| try_count = try_count + 1 |
| else: |
| try_count = 1 |
| prev_task_id = tf.task_id |
| gantt_bar_items.append((tf.task_id, start_date, end_date, State.FAILED, try_count)) |
| tf_count = tf_count + 1 |
| task = dag.get_task(tf.task_id) |
| d = alchemy_to_dict(tf) |
| d['state'] = State.FAILED |
| d['operator'] = task.task_type |
| d['try_number'] = try_count |
| d['extraLinks'] = task.extra_links |
| tasks.append(d) |
| |
| data = { |
| 'taskNames': [ti.task_id for ti in tis], |
| 'tasks': tasks, |
| 'height': len(tis) * 25 + 25, |
| } |
| |
| session.commit() |
| |
| return self.render_template( |
| 'airflow/gantt.html', |
| dag=dag, |
| execution_date=dttm.isoformat(), |
| form=form, |
| data=data, |
| base_date='', |
| demo_mode=demo_mode, |
| root=root, |
| ) |
| |
| @expose('/extra_links') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @action_logging |
| def extra_links(self): |
| """ |
| A restful endpoint that returns external links for a given Operator |
| |
| It queries the operator that sent the request for the links it wishes |
| to provide for a given external link name. |
| |
| API: GET |
| Args: dag_id: The id of the dag containing the task in question |
| task_id: The id of the task in question |
| execution_date: The date of execution of the task |
| link_name: The name of the link reference to find the actual URL for |
| |
| Returns: |
| 200: {url: <url of link>, error: None} - returned when there was no problem |
| finding the URL |
| 404: {url: None, error: <error message>} - returned when the operator does |
| not return a URL |
| """ |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| link_name = request.args.get('link_name') |
| dttm = airflow.utils.timezone.parse(execution_date) |
| dag = dagbag.get_dag(dag_id) |
| |
| if not dag or task_id not in dag.task_ids: |
| response = jsonify( |
| {'url': None, |
| 'error': "can't find dag {dag} or task_id {task_id}".format( |
| dag=dag, |
| task_id=task_id |
| )} |
| ) |
| response.status_code = 404 |
| return response |
| |
| task = dag.get_task(task_id) |
| |
| try: |
| url = task.get_extra_links(dttm, link_name) |
| except ValueError as err: |
| response = jsonify({'url': None, 'error': str(err)}) |
| response.status_code = 404 |
| return response |
| if url: |
| response = jsonify({'error': None, 'url': url}) |
| response.status_code = 200 |
| return response |
| else: |
| response = jsonify( |
| {'url': None, 'error': 'No URL found for {dest}'.format(dest=link_name)}) |
| response.status_code = 404 |
| return response |
| |
| @expose('/object/task_instances') |
| @has_dag_access(can_dag_read=True) |
| @has_access |
| @action_logging |
| @provide_session |
| def task_instances(self, session=None): |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| |
| dttm = request.args.get('execution_date') |
| if dttm: |
| dttm = pendulum.parse(dttm) |
| else: |
| return "Error: Invalid execution_date" |
| |
| task_instances = { |
| ti.task_id: alchemy_to_dict(ti) |
| for ti in dag.get_task_instances(dttm, dttm, session=session)} |
| |
| return json.dumps(task_instances) |
| |
| |
| class VersionView(AirflowBaseView): |
| default_view = 'version' |
| |
| @expose('/version') |
| @has_access |
| def version(self): |
| try: |
| airflow_version = airflow.__version__ |
| except Exception as e: |
| airflow_version = None |
| logging.error(e) |
| |
| # Get the Git repo and git hash |
| git_version = None |
| try: |
| with open(os.path.join(*[settings.AIRFLOW_HOME, |
| 'airflow', 'git_version'])) as f: |
| git_version = f.readline() |
| except Exception as e: |
| logging.error(e) |
| |
| # Render information |
| title = "Version Info" |
| return self.render_template( |
| 'airflow/version.html', |
| title=title, |
| airflow_version=airflow_version, |
| git_version=git_version) |
| |
| |
| class ConfigurationView(AirflowBaseView): |
| default_view = 'conf' |
| |
| @expose('/configuration') |
| @has_access |
| def conf(self): |
| raw = request.args.get('raw') == "true" |
| title = "Airflow Configuration" |
| subtitle = configuration.AIRFLOW_CONFIG |
| # Don't show config when expose_config variable is False in airflow config |
| if conf.getboolean("webserver", "expose_config"): |
| with open(configuration.AIRFLOW_CONFIG, 'r') as f: |
| config = f.read() |
| table = [(section, key, value, source) |
| for section, parameters in conf.as_dict(True, True).items() |
| for key, (value, source) in parameters.items()] |
| else: |
| config = ( |
| "# Your Airflow administrator chose not to expose the " |
| "configuration, most likely for security reasons.") |
| table = None |
| |
| if raw: |
| return Response( |
| response=config, |
| status=200, |
| mimetype="application/text") |
| else: |
| code_html = Markup(highlight( |
| config, |
| lexers.IniLexer(), # Lexer call |
| HtmlFormatter(noclasses=True)) |
| ) |
| return self.render_template( |
| 'airflow/config.html', |
| pre_subtitle=settings.HEADER + " v" + airflow.__version__, |
| code_html=code_html, title=title, subtitle=subtitle, |
| table=table) |
| |
| |
| ###################################################################################### |
| # ModelViews |
| ###################################################################################### |
| |
| class DagFilter(BaseFilter): |
| def apply(self, query, func): # noqa |
| if appbuilder.sm.has_all_dags_access(): |
| return query |
| filter_dag_ids = appbuilder.sm.get_accessible_dag_ids() |
| return query.filter(self.model.dag_id.in_(filter_dag_ids)) |
| |
| |
| class AirflowModelView(ModelView): |
| list_widget = AirflowModelListWidget |
| page_size = PAGE_SIZE |
| |
| CustomSQLAInterface = wwwutils.CustomSQLAInterface |
| |
| |
| class SlaMissModelView(AirflowModelView): |
| route_base = '/slamiss' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(SlaMiss) |
| |
| base_permissions = ['can_list'] |
| |
| list_columns = ['dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp'] |
| add_columns = ['dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp'] |
| edit_columns = ['dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp'] |
| search_columns = ['dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date'] |
| base_order = ('execution_date', 'desc') |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| formatters_columns = { |
| 'task_id': wwwutils.task_instance_link, |
| 'execution_date': wwwutils.datetime_f('execution_date'), |
| 'timestamp': wwwutils.datetime_f('timestamp'), |
| 'dag_id': wwwutils.dag_link, |
| } |
| |
| |
| class XComModelView(AirflowModelView): |
| route_base = '/xcom' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(XCom) |
| |
| base_permissions = ['can_list', 'can_delete'] |
| |
| search_columns = ['key', 'value', 'timestamp', 'execution_date', 'task_id', 'dag_id'] |
| list_columns = ['key', 'value', 'timestamp', 'execution_date', 'task_id', 'dag_id'] |
| base_order = ('execution_date', 'desc') |
| |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| formatters_columns = { |
| 'task_id': wwwutils.task_instance_link, |
| 'execution_date': wwwutils.datetime_f('execution_date'), |
| 'timestamp': wwwutils.datetime_f('timestamp'), |
| 'dag_id': wwwutils.dag_link, |
| } |
| |
| @action('muldelete', 'Delete', "Are you sure you want to delete selected records?", |
| single=False) |
| def action_muldelete(self, items): |
| self.datamodel.delete_all(items) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| def pre_add(self, item): |
| item.execution_date = timezone.make_aware(item.execution_date) |
| item.value = XCom.serialize_value(item.value) |
| |
| def pre_update(self, item): |
| item.execution_date = timezone.make_aware(item.execution_date) |
| item.value = XCom.serialize_value(item.value) |
| |
| |
| class ConnectionModelView(AirflowModelView): |
| route_base = '/connection' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(Connection) |
| |
| base_permissions = ['can_add', 'can_list', 'can_edit', 'can_delete'] |
| |
| extra_fields = ['extra__jdbc__drv_path', 'extra__jdbc__drv_clsname', |
| 'extra__google_cloud_platform__project', |
| 'extra__google_cloud_platform__key_path', |
| 'extra__google_cloud_platform__keyfile_dict', |
| 'extra__google_cloud_platform__scope', |
| 'extra__google_cloud_platform__num_retries', |
| 'extra__grpc__auth_type', |
| 'extra__grpc__credential_pem_file', |
| 'extra__grpc__scopes'] |
| list_columns = ['conn_id', 'conn_type', 'host', 'port', 'is_encrypted', |
| 'is_extra_encrypted'] |
| add_columns = edit_columns = ['conn_id', 'conn_type', 'host', 'schema', |
| 'login', 'password', 'port', 'extra'] + extra_fields |
| add_form = edit_form = ConnectionForm |
| add_template = 'airflow/conn_create.html' |
| edit_template = 'airflow/conn_edit.html' |
| |
| base_order = ('conn_id', 'asc') |
| |
| @action('muldelete', 'Delete', 'Are you sure you want to delete selected records?', |
| single=False) |
| @has_dag_access(can_dag_edit=True) |
| def action_muldelete(self, items): |
| self.datamodel.delete_all(items) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| def process_form(self, form, is_created): |
| formdata = form.data |
| if formdata['conn_type'] in ['jdbc', 'google_cloud_platform', 'grpc']: |
| extra = { |
| key: formdata[key] |
| for key in self.extra_fields if key in formdata} |
| form.extra.data = json.dumps(extra) |
| |
| def prefill_form(self, form, pk): |
| try: |
| d = json.loads(form.data.get('extra', '{}')) |
| except Exception: |
| d = {} |
| |
| if not hasattr(d, 'get'): |
| logging.warning('extra field for {} is not iterable'.format( |
| form.data.get('conn_id', '<unknown>'))) |
| return |
| |
| for field in self.extra_fields: |
| value = d.get(field, '') |
| if value: |
| field = getattr(form, field) |
| field.data = value |
| |
| |
| class PoolModelView(AirflowModelView): |
| route_base = '/pool' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(models.Pool) |
| |
| base_permissions = ['can_add', 'can_list', 'can_edit', 'can_delete'] |
| |
| list_columns = ['pool', 'slots', 'used_slots', 'queued_slots'] |
| add_columns = ['pool', 'slots', 'description'] |
| edit_columns = ['pool', 'slots', 'description'] |
| |
| base_order = ('pool', 'asc') |
| |
| @action('muldelete', 'Delete', 'Are you sure you want to delete selected records?', |
| single=False) |
| def action_muldelete(self, items): |
| if any(item.pool == models.Pool.DEFAULT_POOL_NAME for item in items): |
| flash("default_pool cannot be deleted", 'error') |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| self.datamodel.delete_all(items) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| def pool_link(attr): |
| pool_id = attr.get('pool') |
| if pool_id is not None: |
| url = url_for('TaskInstanceModelView.list', _flt_3_pool=pool_id) |
| return Markup("<a href='{url}'>{pool_id}</a>").format(url=url, pool_id=pool_id) |
| else: |
| return Markup('<span class="label label-danger">Invalid</span>') |
| |
| def fused_slots(attr): |
| pool_id = attr.get('pool') |
| used_slots = attr.get('used_slots') |
| if pool_id is not None and used_slots is not None: |
| url = url_for('TaskInstanceModelView.list', _flt_3_pool=pool_id, _flt_3_state='running') |
| return Markup("<a href='{url}'>{used_slots}</a>").format(url=url, used_slots=used_slots) |
| else: |
| return Markup('<span class="label label-danger">Invalid</span>') |
| |
| def fqueued_slots(attr): |
| pool_id = attr.get('pool') |
| queued_slots = attr.get('queued_slots') |
| if pool_id is not None and queued_slots is not None: |
| url = url_for('TaskInstanceModelView.list', _flt_3_pool=pool_id, _flt_3_state='queued') |
| return Markup("<a href='{url}'>{queued_slots}</a>").format(url=url, queued_slots=queued_slots) |
| else: |
| return Markup('<span class="label label-danger">Invalid</span>') |
| |
| formatters_columns = { |
| 'pool': pool_link, |
| 'used_slots': fused_slots, |
| 'queued_slots': fqueued_slots |
| } |
| |
| validators_columns = { |
| 'pool': [validators.DataRequired()], |
| 'slots': [validators.NumberRange(min=-1)] |
| } |
| |
| |
| class VariableModelView(AirflowModelView): |
| route_base = '/variable' |
| |
| list_template = 'airflow/variable_list.html' |
| edit_template = 'airflow/variable_edit.html' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(models.Variable) |
| |
| base_permissions = ['can_add', 'can_list', 'can_edit', 'can_delete', 'can_varimport'] |
| |
| list_columns = ['key', 'val', 'is_encrypted'] |
| add_columns = ['key', 'val'] |
| edit_columns = ['key', 'val'] |
| search_columns = ['key', 'val'] |
| |
| base_order = ('key', 'asc') |
| |
| def hidden_field_formatter(attr): |
| key = attr.get('key') |
| val = attr.get('val') |
| if wwwutils.should_hide_value_for_key(key): |
| return Markup('*' * 8) |
| if val: |
| return val |
| else: |
| return Markup('<span class="label label-danger">Invalid</span>') |
| |
| formatters_columns = { |
| 'val': hidden_field_formatter, |
| } |
| |
| validators_columns = { |
| 'key': [validators.DataRequired()] |
| } |
| |
| def prefill_form(self, form, id): |
| if wwwutils.should_hide_value_for_key(form.key.data): |
| form.val.data = '*' * 8 |
| |
| @action('muldelete', 'Delete', 'Are you sure you want to delete selected records?', |
| single=False) |
| def action_muldelete(self, items): |
| self.datamodel.delete_all(items) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| @action('varexport', 'Export', '', single=False) |
| def action_varexport(self, items): |
| var_dict = {} |
| d = json.JSONDecoder() |
| for var in items: |
| try: |
| val = d.decode(var.val) |
| except Exception: |
| val = var.val |
| var_dict[var.key] = val |
| |
| response = make_response(json.dumps(var_dict, sort_keys=True, indent=4)) |
| response.headers["Content-Disposition"] = "attachment; filename=variables.json" |
| response.headers["Content-Type"] = "application/json; charset=utf-8" |
| return response |
| |
| @expose('/varimport', methods=["POST"]) |
| @has_access |
| @action_logging |
| def varimport(self): |
| try: |
| out = request.files['file'].read() |
| if not PY2 and isinstance(out, bytes): |
| d = json.loads(out.decode('utf-8')) |
| else: |
| d = json.loads(out) |
| except Exception: |
| self.update_redirect() |
| flash("Missing file or syntax error.", 'error') |
| return redirect(self.get_redirect()) |
| else: |
| suc_count = fail_count = 0 |
| for k, v in d.items(): |
| try: |
| models.Variable.set(k, v, serialize_json=isinstance(v, dict)) |
| except Exception as e: |
| logging.info('Variable import failed: {}'.format(repr(e))) |
| fail_count += 1 |
| else: |
| suc_count += 1 |
| flash("{} variable(s) successfully updated.".format(suc_count)) |
| if fail_count: |
| flash("{} variable(s) failed to be updated.".format(fail_count), 'error') |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| |
| class JobModelView(AirflowModelView): |
| route_base = '/job' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(jobs.BaseJob) |
| |
| base_permissions = ['can_list'] |
| |
| list_columns = ['id', 'dag_id', 'state', 'job_type', 'start_date', |
| 'end_date', 'latest_heartbeat', |
| 'executor_class', 'hostname', 'unixname'] |
| search_columns = ['id', 'dag_id', 'state', 'job_type', 'start_date', |
| 'end_date', 'latest_heartbeat', 'executor_class', |
| 'hostname', 'unixname'] |
| |
| base_order = ('start_date', 'desc') |
| |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| formatters_columns = { |
| 'start_date': wwwutils.datetime_f('start_date'), |
| 'end_date': wwwutils.datetime_f('end_date'), |
| 'hostname': wwwutils.nobr_f('hostname'), |
| 'state': wwwutils.state_f, |
| 'latest_heartbeat': wwwutils.datetime_f('latest_heartbeat'), |
| } |
| |
| |
| class DagRunModelView(AirflowModelView): |
| route_base = '/dagrun' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(models.DagRun) |
| |
| base_permissions = ['can_list', 'can_add'] |
| |
| add_columns = ['state', 'dag_id', 'execution_date', 'run_id', 'external_trigger', 'conf'] |
| list_columns = ['state', 'dag_id', 'execution_date', 'run_id', 'external_trigger', 'conf'] |
| search_columns = ['state', 'dag_id', 'execution_date', 'run_id', 'external_trigger', 'conf'] |
| |
| base_order = ('execution_date', 'desc') |
| |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| add_form = edit_form = DagRunForm |
| |
| formatters_columns = { |
| 'execution_date': wwwutils.datetime_f('execution_date'), |
| 'state': wwwutils.state_f, |
| 'start_date': wwwutils.datetime_f('start_date'), |
| 'dag_id': wwwutils.dag_link, |
| 'run_id': wwwutils.dag_run_link, |
| } |
| |
| @action('muldelete', "Delete", "Are you sure you want to delete selected records?", |
| single=False) |
| @has_dag_access(can_dag_edit=True) |
| @provide_session |
| def action_muldelete(self, items, session=None): |
| self.datamodel.delete_all(items) |
| self.update_redirect() |
| dirty_ids = [] |
| for item in items: |
| dirty_ids.append(item.dag_id) |
| return redirect(self.get_redirect()) |
| |
| @action('set_running', "Set state to 'running'", '', single=False) |
| @provide_session |
| def action_set_running(self, drs, session=None): |
| try: |
| DR = models.DagRun |
| count = 0 |
| dirty_ids = [] |
| for dr in session.query(DR).filter( |
| DR.id.in_([dagrun.id for dagrun in drs])).all(): |
| dirty_ids.append(dr.dag_id) |
| count += 1 |
| dr.start_date = timezone.utcnow() |
| dr.state = State.RUNNING |
| session.commit() |
| flash("{count} dag runs were set to running".format(count=count)) |
| except Exception as ex: |
| flash(str(ex), 'error') |
| flash('Failed to set state', 'error') |
| return redirect(self.get_default_url()) |
| |
| @action('set_failed', "Set state to 'failed'", |
| "All running task instances would also be marked as failed, are you sure?", |
| single=False) |
| @provide_session |
| def action_set_failed(self, drs, session=None): |
| try: |
| DR = models.DagRun |
| count = 0 |
| dirty_ids = [] |
| altered_tis = [] |
| for dr in session.query(DR).filter( |
| DR.id.in_([dagrun.id for dagrun in drs])).all(): |
| dirty_ids.append(dr.dag_id) |
| count += 1 |
| altered_tis += \ |
| set_dag_run_state_to_failed(dagbag.get_dag(dr.dag_id), |
| dr.execution_date, |
| commit=True, |
| session=session) |
| altered_ti_count = len(altered_tis) |
| flash( |
| "{count} dag runs and {altered_ti_count} task instances " |
| "were set to failed".format(count=count, altered_ti_count=altered_ti_count)) |
| except Exception: |
| flash('Failed to set state', 'error') |
| return redirect(self.get_default_url()) |
| |
| @action('set_success', "Set state to 'success'", |
| "All task instances would also be marked as success, are you sure?", |
| single=False) |
| @provide_session |
| def action_set_success(self, drs, session=None): |
| try: |
| DR = models.DagRun |
| count = 0 |
| dirty_ids = [] |
| altered_tis = [] |
| for dr in session.query(DR).filter( |
| DR.id.in_([dagrun.id for dagrun in drs])).all(): |
| dirty_ids.append(dr.dag_id) |
| count += 1 |
| altered_tis += \ |
| set_dag_run_state_to_success(dagbag.get_dag(dr.dag_id), |
| dr.execution_date, |
| commit=True, |
| session=session) |
| altered_ti_count = len(altered_tis) |
| flash( |
| "{count} dag runs and {altered_ti_count} task instances " |
| "were set to success".format(count=count, altered_ti_count=altered_ti_count)) |
| except Exception: |
| flash('Failed to set state', 'error') |
| return redirect(self.get_default_url()) |
| |
| |
| class LogModelView(AirflowModelView): |
| route_base = '/log' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(Log) |
| |
| base_permissions = ['can_list'] |
| |
| list_columns = ['id', 'dttm', 'dag_id', 'task_id', 'event', 'execution_date', |
| 'owner', 'extra'] |
| search_columns = ['dag_id', 'task_id', 'event', 'execution_date', 'owner', 'extra'] |
| |
| base_order = ('dttm', 'desc') |
| |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| formatters_columns = { |
| 'dttm': wwwutils.datetime_f('dttm'), |
| 'execution_date': wwwutils.datetime_f('execution_date'), |
| 'dag_id': wwwutils.dag_link, |
| } |
| |
| |
| class TaskRescheduleModelView(AirflowModelView): |
| """View to show records from Task Reschedule table""" |
| route_base = '/taskreschedule' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(models.TaskReschedule) |
| |
| base_permissions = ['can_list'] |
| |
| list_columns = ['id', 'dag_id', 'task_id', 'execution_date', 'try_number', |
| 'start_date', 'end_date', 'duration', 'reschedule_date'] |
| |
| search_columns = ['dag_id', 'task_id', 'execution_date', 'start_date', 'end_date', |
| 'reschedule_date'] |
| |
| base_order = ('id', 'desc') |
| |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| def duration_f(attr): |
| end_date = attr.get('end_date') |
| duration = attr.get('duration') |
| if end_date and duration: |
| return timedelta(seconds=duration) |
| |
| formatters_columns = { |
| 'dag_id': wwwutils.dag_link, |
| 'task_id': wwwutils.task_instance_link, |
| 'start_date': wwwutils.datetime_f('start_date'), |
| 'end_date': wwwutils.datetime_f('end_date'), |
| 'execution_date': wwwutils.datetime_f('execution_date'), |
| 'reschedule_date': wwwutils.datetime_f('reschedule_date'), |
| 'duration': duration_f, |
| } |
| |
| |
| class TaskInstanceModelView(AirflowModelView): |
| route_base = '/taskinstance' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(models.TaskInstance) |
| |
| base_permissions = ['can_list'] |
| |
| page_size = PAGE_SIZE |
| |
| list_columns = ['state', 'dag_id', 'task_id', 'execution_date', 'operator', |
| 'start_date', 'end_date', 'duration', 'job_id', 'hostname', |
| 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number', |
| 'pool', 'log_url'] |
| |
| order_columns = [item for item in list_columns if item not in ['try_number', 'log_url']] |
| |
| search_columns = ['state', 'dag_id', 'task_id', 'execution_date', 'hostname', |
| 'queue', 'pool', 'operator', 'start_date', 'end_date'] |
| |
| base_order = ('job_id', 'asc') |
| |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| def log_url_formatter(attr): |
| log_url = attr.get('log_url') |
| return Markup( |
| '<a href="{log_url}">' |
| ' <span class="glyphicon glyphicon-book" aria-hidden="true">' |
| '</span></a>').format(log_url=log_url) |
| |
| def duration_f(attr): |
| end_date = attr.get('end_date') |
| duration = attr.get('duration') |
| if end_date and duration: |
| return timedelta(seconds=duration) |
| |
| formatters_columns = { |
| 'log_url': log_url_formatter, |
| 'task_id': wwwutils.task_instance_link, |
| 'hostname': wwwutils.nobr_f('hostname'), |
| 'state': wwwutils.state_f, |
| 'execution_date': wwwutils.datetime_f('execution_date'), |
| 'start_date': wwwutils.datetime_f('start_date'), |
| 'end_date': wwwutils.datetime_f('end_date'), |
| 'queued_dttm': wwwutils.datetime_f('queued_dttm'), |
| 'dag_id': wwwutils.dag_link, |
| 'duration': duration_f, |
| } |
| |
| @provide_session |
| @action('clear', lazy_gettext('Clear'), |
| lazy_gettext('Are you sure you want to clear the state of the selected task' |
| ' instance(s) and set their dagruns to the running state?'), |
| single=False) |
| def action_clear(self, tis, session=None): |
| try: |
| dag_to_tis = {} |
| |
| for ti in tis: |
| dag = dagbag.get_dag(ti.dag_id) |
| tis = dag_to_tis.setdefault(dag, []) |
| tis.append(ti) |
| |
| for dag, tis in dag_to_tis.items(): |
| models.clear_task_instances(tis, session=session, dag=dag) |
| |
| session.commit() |
| flash("{0} task instances have been cleared".format(len(tis))) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| except Exception: |
| flash('Failed to clear task instances', 'error') |
| |
| @provide_session |
| def set_task_instance_state(self, tis, target_state, session=None): |
| try: |
| count = len(tis) |
| for ti in tis: |
| ti.set_state(target_state, session=session) |
| session.commit() |
| flash("{count} task instances were set to '{target_state}'".format( |
| count=count, target_state=target_state)) |
| except Exception: |
| flash('Failed to set state', 'error') |
| |
| @action('set_running', "Set state to 'running'", '', single=False) |
| @has_dag_access(can_dag_edit=True) |
| def action_set_running(self, tis): |
| self.set_task_instance_state(tis, State.RUNNING) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| @action('set_failed', "Set state to 'failed'", '', single=False) |
| @has_dag_access(can_dag_edit=True) |
| def action_set_failed(self, tis): |
| self.set_task_instance_state(tis, State.FAILED) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| @action('set_success', "Set state to 'success'", '', single=False) |
| @has_dag_access(can_dag_edit=True) |
| def action_set_success(self, tis): |
| self.set_task_instance_state(tis, State.SUCCESS) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| @action('set_retry', "Set state to 'up_for_retry'", '', single=False) |
| @has_dag_access(can_dag_edit=True) |
| def action_set_retry(self, tis): |
| self.set_task_instance_state(tis, State.UP_FOR_RETRY) |
| self.update_redirect() |
| return redirect(self.get_redirect()) |
| |
| def get_one(self, id): |
| """ |
| As a workaround for AIRFLOW-252, this method overrides Flask-Admin's |
| ModelView.get_one(). |
| |
| TODO: this method should be removed once the below bug is fixed on |
| Flask-Admin side. https://github.com/flask-admin/flask-admin/issues/1226 |
| """ |
| task_id, dag_id, execution_date = iterdecode(id) # noqa |
| execution_date = pendulum.parse(execution_date) |
| return self.session.query(self.model).get((task_id, dag_id, execution_date)) |
| |
| |
| class DagModelView(AirflowModelView): |
| route_base = '/dagmodel' |
| |
| datamodel = AirflowModelView.CustomSQLAInterface(models.DagModel) |
| |
| base_permissions = ['can_list', 'can_show'] |
| |
| list_columns = ['dag_id', 'is_paused', 'last_scheduler_run', |
| 'last_expired', 'scheduler_lock', 'fileloc', 'owners'] |
| |
| formatters_columns = { |
| 'dag_id': wwwutils.dag_link |
| } |
| |
| base_filters = [['dag_id', DagFilter, lambda: []]] |
| |
| def get_query(self): |
| """ |
| Default filters for model |
| """ |
| return ( |
| super(DagModelView, self).get_query() |
| .filter(or_(models.DagModel.is_active, |
| models.DagModel.is_paused)) |
| .filter(~models.DagModel.is_subdag) |
| ) |
| |
| def get_count_query(self): |
| """ |
| Default filters for model |
| """ |
| return ( |
| super(DagModelView, self).get_count_query() |
| .filter(models.DagModel.is_active) |
| .filter(~models.DagModel.is_subdag) |
| ) |
| |
| @has_access |
| @permission_name("list") |
| @provide_session |
| @expose('/autocomplete') |
| def autocomplete(self, session=None): |
| query = unquote(request.args.get('query', '')) |
| |
| if not query: |
| wwwutils.json_response([]) |
| |
| # Provide suggestions of dag_ids and owners |
| dag_ids_query = session.query(DagModel.dag_id.label('item')).filter( |
| ~DagModel.is_subdag, DagModel.is_active, |
| DagModel.dag_id.ilike('%' + query + '%')) |
| |
| owners_query = session.query(func.distinct(DagModel.owners).label('item')).filter( |
| ~DagModel.is_subdag, DagModel.is_active, |
| DagModel.owners.ilike('%' + query + '%')) |
| |
| # Hide DAGs if not showing status: "all" |
| status = flask_session.get(FILTER_STATUS_COOKIE) |
| if status == 'active': |
| dag_ids_query = dag_ids_query.filter(~DagModel.is_paused) |
| owners_query = owners_query.filter(~DagModel.is_paused) |
| elif status == 'paused': |
| dag_ids_query = dag_ids_query.filter(DagModel.is_paused) |
| owners_query = owners_query.filter(DagModel.is_paused) |
| |
| filter_dag_ids = appbuilder.sm.get_accessible_dag_ids() |
| if 'all_dags' not in filter_dag_ids: |
| dag_ids_query = dag_ids_query.filter(DagModel.dag_id.in_(filter_dag_ids)) |
| owners_query = owners_query.filter(DagModel.dag_id.in_(filter_dag_ids)) |
| |
| payload = [row[0] for row in dag_ids_query.union(owners_query).limit(10).all()] |
| |
| return wwwutils.json_response(payload) |