| # -*- coding: utf-8 -*- |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| from past.builtins import basestring, unicode |
| |
| import ast |
| import logging |
| import os |
| import pkg_resources |
| import socket |
| from functools import wraps |
| from datetime import datetime, timedelta |
| import dateutil.parser |
| import copy |
| import math |
| import json |
| import bleach |
| from collections import defaultdict |
| |
| import inspect |
| from textwrap import dedent |
| import traceback |
| |
| import sqlalchemy as sqla |
| from sqlalchemy import or_, desc, and_, union_all |
| |
| from flask import ( |
| redirect, url_for, request, Markup, Response, current_app, render_template, make_response) |
| from flask_admin import BaseView, expose, AdminIndexView |
| from flask_admin.contrib.sqla import ModelView |
| from flask_admin.actions import action |
| from flask_admin.babel import lazy_gettext |
| from flask_admin.tools import iterdecode |
| from flask_login import flash |
| from flask._compat import PY2 |
| |
| from jinja2.sandbox import ImmutableSandboxedEnvironment |
| from jinja2 import escape |
| |
| import markdown |
| import nvd3 |
| |
| from wtforms import ( |
| Form, SelectField, TextAreaField, PasswordField, StringField, validators) |
| |
| from pygments import highlight, lexers |
| from pygments.formatters import HtmlFormatter |
| |
| import airflow |
| from airflow import configuration as conf |
| from airflow import models |
| from airflow import settings |
| from airflow.api.common.experimental.mark_tasks import set_dag_run_state |
| from airflow.exceptions import AirflowException |
| from airflow.settings import Session |
| from airflow.models import XCom, DagRun |
| from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS |
| |
| from airflow.models import BaseOperator |
| from airflow.operators.subdag_operator import SubDagOperator |
| |
| from airflow.utils.json import json_ser |
| from airflow.utils.state import State |
| from airflow.utils.db import provide_session |
| from airflow.utils.helpers import alchemy_to_dict |
| from airflow.utils.dates import infer_time_unit, scale_time_units |
| from airflow.www import utils as wwwutils |
| from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm |
| from airflow.www.validators import GreaterEqualThan |
| |
| QUERY_LIMIT = 100000 |
| CHART_LIMIT = 200000 |
| |
| dagbag = models.DagBag(settings.DAGS_FOLDER) |
| |
| login_required = airflow.login.login_required |
| current_user = airflow.login.current_user |
| logout_user = airflow.login.logout_user |
| |
| FILTER_BY_OWNER = False |
| |
| PAGE_SIZE = conf.getint('webserver', 'page_size') |
| |
| if conf.getboolean('webserver', 'FILTER_BY_OWNER'): |
| # filter_by_owner if authentication is enabled and filter_by_owner is true |
| FILTER_BY_OWNER = not current_app.config['LOGIN_DISABLED'] |
| |
| |
| def dag_link(v, c, m, p): |
| dag_id = bleach.clean(m.dag_id) |
| url = url_for( |
| 'airflow.graph', |
| dag_id=dag_id) |
| return Markup( |
| '<a href="{}">{}</a>'.format(url, dag_id)) |
| |
| |
| def log_url_formatter(v, c, m, p): |
| return Markup( |
| '<a href="{m.log_url}">' |
| ' <span class="glyphicon glyphicon-book" aria-hidden="true">' |
| '</span></a>').format(**locals()) |
| |
| |
| def task_instance_link(v, c, m, p): |
| dag_id = bleach.clean(m.dag_id) |
| task_id = bleach.clean(m.task_id) |
| url = url_for( |
| 'airflow.task', |
| dag_id=dag_id, |
| task_id=task_id, |
| execution_date=m.execution_date.isoformat()) |
| url_root = url_for( |
| 'airflow.graph', |
| dag_id=dag_id, |
| root=task_id, |
| execution_date=m.execution_date.isoformat()) |
| return Markup( |
| """ |
| <span style="white-space: nowrap;"> |
| <a href="{url}">{task_id}</a> |
| <a href="{url_root}" title="Filter on this task and upstream"> |
| <span class="glyphicon glyphicon-filter" style="margin-left: 0px;" |
| aria-hidden="true"></span> |
| </a> |
| </span> |
| """.format(**locals())) |
| |
| |
| def state_token(state): |
| color = State.color(state) |
| return Markup( |
| '<span class="label" style="background-color:{color};">' |
| '{state}</span>'.format(**locals())) |
| |
| |
| def state_f(v, c, m, p): |
| return state_token(m.state) |
| |
| |
| def duration_f(v, c, m, p): |
| if m.end_date and m.duration: |
| return timedelta(seconds=m.duration) |
| |
| |
| def datetime_f(v, c, m, p): |
| attr = getattr(m, p) |
| dttm = attr.isoformat() if attr else '' |
| if datetime.utcnow().isoformat()[:4] == dttm[:4]: |
| dttm = dttm[5:] |
| return Markup("<nobr>{}</nobr>".format(dttm)) |
| |
| |
| def nobr_f(v, c, m, p): |
| return Markup("<nobr>{}</nobr>".format(getattr(m, p))) |
| |
| |
| def label_link(v, c, m, p): |
| try: |
| default_params = ast.literal_eval(m.default_params) |
| except: |
| default_params = {} |
| url = url_for( |
| 'airflow.chart', chart_id=m.id, iteration_no=m.iteration_no, |
| **default_params) |
| return Markup("<a href='{url}'>{m.label}</a>".format(**locals())) |
| |
| |
| def pool_link(v, c, m, p): |
| url = '/admin/taskinstance/?flt1_pool_equals=' + m.pool |
| return Markup("<a href='{url}'>{m.pool}</a>".format(**locals())) |
| |
| |
| def pygment_html_render(s, lexer=lexers.TextLexer): |
| return highlight( |
| s, |
| lexer(), |
| HtmlFormatter(linenos=True), |
| ) |
| |
| |
| def render(obj, lexer): |
| out = "" |
| if isinstance(obj, basestring): |
| out += pygment_html_render(obj, lexer) |
| elif isinstance(obj, (tuple, list)): |
| for i, s in enumerate(obj): |
| out += "<div>List item #{}</div>".format(i) |
| out += "<div>" + pygment_html_render(s, lexer) + "</div>" |
| elif isinstance(obj, dict): |
| for k, v in obj.items(): |
| out += '<div>Dict item "{}"</div>'.format(k) |
| out += "<div>" + pygment_html_render(v, lexer) + "</div>" |
| return out |
| |
| |
| def wrapped_markdown(s): |
| return '<div class="rich_doc">' + markdown.markdown(s) + "</div>" |
| |
| |
| attr_renderer = { |
| 'bash_command': lambda x: render(x, lexers.BashLexer), |
| 'hql': lambda x: render(x, lexers.SqlLexer), |
| 'sql': lambda x: render(x, lexers.SqlLexer), |
| 'doc': lambda x: render(x, lexers.TextLexer), |
| 'doc_json': lambda x: render(x, lexers.JsonLexer), |
| 'doc_rst': lambda x: render(x, lexers.RstLexer), |
| 'doc_yaml': lambda x: render(x, lexers.YamlLexer), |
| 'doc_md': wrapped_markdown, |
| 'python_callable': lambda x: render( |
| inspect.getsource(x), lexers.PythonLexer), |
| } |
| |
| |
| def data_profiling_required(f): |
| """Decorator for views requiring data profiling access""" |
| @wraps(f) |
| def decorated_function(*args, **kwargs): |
| if ( |
| current_app.config['LOGIN_DISABLED'] or |
| (not current_user.is_anonymous() and current_user.data_profiling()) |
| ): |
| return f(*args, **kwargs) |
| else: |
| flash("This page requires data profiling privileges", "error") |
| return redirect(url_for('admin.index')) |
| |
| return decorated_function |
| |
| |
| def fused_slots(v, c, m, p): |
| url = ( |
| '/admin/taskinstance/' + |
| '?flt1_pool_equals=' + m.pool + |
| '&flt2_state_equals=running') |
| return Markup("<a href='{0}'>{1}</a>".format(url, m.used_slots())) |
| |
| |
| def fqueued_slots(v, c, m, p): |
| url = ( |
| '/admin/taskinstance/' + |
| '?flt1_pool_equals=' + m.pool + |
| '&flt2_state_equals=queued&sort=10&desc=1') |
| return Markup("<a href='{0}'>{1}</a>".format(url, m.queued_slots())) |
| |
| |
| def recurse_tasks(tasks, task_ids, dag_ids, task_id_to_dag): |
| if isinstance(tasks, list): |
| for task in tasks: |
| recurse_tasks(task, task_ids, dag_ids, task_id_to_dag) |
| return |
| if isinstance(tasks, SubDagOperator): |
| subtasks = tasks.subdag.tasks |
| dag_ids.append(tasks.subdag.dag_id) |
| for subtask in subtasks: |
| if subtask.task_id not in task_ids: |
| task_ids.append(subtask.task_id) |
| task_id_to_dag[subtask.task_id] = tasks.subdag |
| recurse_tasks(subtasks, task_ids, dag_ids, task_id_to_dag) |
| if isinstance(tasks, BaseOperator): |
| task_id_to_dag[tasks.task_id] = tasks.dag |
| |
| |
| def get_chart_height(dag): |
| """ |
| TODO(aoen): See [AIRFLOW-1263] We use the number of tasks in the DAG as a heuristic to |
| approximate the size of generated chart (otherwise the charts are tiny and unreadable |
| when DAGs have a large number of tasks). Ideally nvd3 should allow for dynamic-height |
| charts, that is charts that take up space based on the size of the components within. |
| """ |
| return 600 + len(dag.tasks) * 10 |
| |
| |
| class Airflow(BaseView): |
| def is_visible(self): |
| return False |
| |
| @expose('/') |
| @login_required |
| def index(self): |
| return self.render('airflow/dags.html') |
| |
| @expose('/chart_data') |
| @data_profiling_required |
| @wwwutils.gzipped |
| # @cache.cached(timeout=3600, key_prefix=wwwutils.make_cache_key) |
| def chart_data(self): |
| from airflow import macros |
| import pandas as pd |
| session = settings.Session() |
| chart_id = request.args.get('chart_id') |
| csv = request.args.get('csv') == "true" |
| chart = session.query(models.Chart).filter_by(id=chart_id).first() |
| db = session.query( |
| models.Connection).filter_by(conn_id=chart.conn_id).first() |
| session.expunge_all() |
| session.commit() |
| session.close() |
| |
| payload = { |
| "state": "ERROR", |
| "error": "" |
| } |
| |
| # Processing templated fields |
| try: |
| args = ast.literal_eval(chart.default_params) |
| if type(args) is not type(dict()): |
| raise AirflowException('Not a dict') |
| except: |
| args = {} |
| payload['error'] += ( |
| "Default params is not valid, string has to evaluate as " |
| "a Python dictionary. ") |
| |
| request_dict = {k: request.args.get(k) for k in request.args} |
| args.update(request_dict) |
| args['macros'] = macros |
| sandbox = ImmutableSandboxedEnvironment() |
| sql = sandbox.from_string(chart.sql).render(**args) |
| label = sandbox.from_string(chart.label).render(**args) |
| payload['sql_html'] = Markup(highlight( |
| sql, |
| lexers.SqlLexer(), # Lexer call |
| HtmlFormatter(noclasses=True)) |
| ) |
| payload['label'] = label |
| |
| pd.set_option('display.max_colwidth', 100) |
| hook = db.get_hook() |
| try: |
| df = hook.get_pandas_df( |
| wwwutils.limit_sql(sql, CHART_LIMIT, conn_type=db.conn_type)) |
| df = df.fillna(0) |
| except Exception as e: |
| payload['error'] += "SQL execution failed. Details: " + str(e) |
| |
| if csv: |
| return Response( |
| response=df.to_csv(index=False), |
| status=200, |
| mimetype="application/text") |
| |
| if not payload['error'] and len(df) == CHART_LIMIT: |
| payload['warning'] = ( |
| "Data has been truncated to {0}" |
| " rows. Expect incomplete results.").format(CHART_LIMIT) |
| |
| if not payload['error'] and len(df) == 0: |
| payload['error'] += "Empty result set. " |
| elif ( |
| not payload['error'] and |
| chart.sql_layout == 'series' and |
| chart.chart_type != "datatable" and |
| len(df.columns) < 3): |
| payload['error'] += "SQL needs to return at least 3 columns. " |
| elif ( |
| not payload['error'] and |
| chart.sql_layout == 'columns' and |
| len(df.columns) < 2): |
| payload['error'] += "SQL needs to return at least 2 columns. " |
| elif not payload['error']: |
| import numpy as np |
| chart_type = chart.chart_type |
| |
| data = None |
| if chart.show_datatable or chart_type == "datatable": |
| data = df.to_dict(orient="split") |
| data['columns'] = [{'title': c} for c in data['columns']] |
| payload['data'] = data |
| |
| # Trying to convert time to something Highcharts likes |
| x_col = 1 if chart.sql_layout == 'series' else 0 |
| if chart.x_is_date: |
| try: |
| # From string to datetime |
| df[df.columns[x_col]] = pd.to_datetime( |
| df[df.columns[x_col]]) |
| df[df.columns[x_col]] = df[df.columns[x_col]].apply( |
| lambda x: int(x.strftime("%s")) * 1000) |
| except Exception as e: |
| payload['error'] = "Time conversion failed" |
| |
| if chart_type == 'datatable': |
| payload['state'] = 'SUCCESS' |
| return wwwutils.json_response(payload) |
| else: |
| if chart.sql_layout == 'series': |
| # User provides columns (series, x, y) |
| xaxis_label = df.columns[1] |
| yaxis_label = df.columns[2] |
| df[df.columns[2]] = df[df.columns[2]].astype(np.float) |
| df = df.pivot_table( |
| index=df.columns[1], |
| columns=df.columns[0], |
| values=df.columns[2], aggfunc=np.sum) |
| else: |
| # User provides columns (x, y, metric1, metric2, ...) |
| xaxis_label = df.columns[0] |
| yaxis_label = 'y' |
| df.index = df[df.columns[0]] |
| df = df.sort(df.columns[0]) |
| del df[df.columns[0]] |
| for col in df.columns: |
| df[col] = df[col].astype(np.float) |
| |
| df = df.fillna(0) |
| NVd3ChartClass = chart_mapping.get(chart.chart_type) |
| NVd3ChartClass = getattr(nvd3, NVd3ChartClass) |
| nvd3_chart = NVd3ChartClass(x_is_date=chart.x_is_date) |
| |
| for col in df.columns: |
| nvd3_chart.add_serie(name=col, y=df[col].tolist(), x=df[col].index.tolist()) |
| try: |
| nvd3_chart.buildcontent() |
| payload['chart_type'] = nvd3_chart.__class__.__name__ |
| payload['htmlcontent'] = nvd3_chart.htmlcontent |
| except Exception as e: |
| payload['error'] = str(e) |
| |
| payload['state'] = 'SUCCESS' |
| payload['request_dict'] = request_dict |
| return wwwutils.json_response(payload) |
| |
| @expose('/chart') |
| @data_profiling_required |
| def chart(self): |
| session = settings.Session() |
| chart_id = request.args.get('chart_id') |
| embed = request.args.get('embed') |
| chart = session.query(models.Chart).filter_by(id=chart_id).first() |
| session.expunge_all() |
| session.commit() |
| session.close() |
| |
| NVd3ChartClass = chart_mapping.get(chart.chart_type) |
| if not NVd3ChartClass: |
| flash( |
| "Not supported anymore as the license was incompatible, " |
| "sorry", |
| "danger") |
| redirect('/admin/chart/') |
| |
| sql = "" |
| if chart.show_sql: |
| sql = Markup(highlight( |
| chart.sql, |
| lexers.SqlLexer(), # Lexer call |
| HtmlFormatter(noclasses=True)) |
| ) |
| return self.render( |
| 'airflow/nvd3.html', |
| chart=chart, |
| title="Airflow - Chart", |
| sql=sql, |
| label=chart.label, |
| embed=embed) |
| |
| @expose('/dag_stats') |
| @login_required |
| def dag_stats(self): |
| ds = models.DagStat |
| session = Session() |
| |
| ds.update() |
| |
| qry = ( |
| session.query(ds.dag_id, ds.state, ds.count) |
| ) |
| |
| data = {} |
| for dag_id, state, count in qry: |
| if dag_id not in data: |
| data[dag_id] = {} |
| data[dag_id][state] = count |
| |
| payload = {} |
| for dag in dagbag.dags.values(): |
| payload[dag.safe_dag_id] = [] |
| for state in State.dag_states: |
| try: |
| count = data[dag.dag_id][state] |
| except Exception: |
| count = 0 |
| d = { |
| 'state': state, |
| 'count': count, |
| 'dag_id': dag.dag_id, |
| 'color': State.color(state) |
| } |
| payload[dag.safe_dag_id].append(d) |
| return wwwutils.json_response(payload) |
| |
| @expose('/task_stats') |
| @login_required |
| def task_stats(self): |
| TI = models.TaskInstance |
| DagRun = models.DagRun |
| Dag = models.DagModel |
| session = Session() |
| |
| LastDagRun = ( |
| session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date')) |
| .join(Dag, Dag.dag_id == DagRun.dag_id) |
| .filter(DagRun.state != State.RUNNING) |
| .filter(Dag.is_active == True) |
| .group_by(DagRun.dag_id) |
| .subquery('last_dag_run') |
| ) |
| RunningDagRun = ( |
| session.query(DagRun.dag_id, DagRun.execution_date) |
| .join(Dag, Dag.dag_id == DagRun.dag_id) |
| .filter(DagRun.state == State.RUNNING) |
| .filter(Dag.is_active == True) |
| .subquery('running_dag_run') |
| ) |
| |
| # Select all task_instances from active dag_runs. |
| # If no dag_run is active, return task instances from most recent dag_run. |
| LastTI = ( |
| session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) |
| .join(LastDagRun, and_( |
| LastDagRun.c.dag_id == TI.dag_id, |
| LastDagRun.c.execution_date == TI.execution_date)) |
| ) |
| RunningTI = ( |
| session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) |
| .join(RunningDagRun, and_( |
| RunningDagRun.c.dag_id == TI.dag_id, |
| RunningDagRun.c.execution_date == TI.execution_date)) |
| ) |
| |
| UnionTI = union_all(LastTI, RunningTI).alias('union_ti') |
| qry = ( |
| session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count()) |
| .group_by(UnionTI.c.dag_id, UnionTI.c.state) |
| ) |
| |
| data = {} |
| for dag_id, state, count in qry: |
| if dag_id not in data: |
| data[dag_id] = {} |
| data[dag_id][state] = count |
| session.commit() |
| session.close() |
| |
| payload = {} |
| for dag in dagbag.dags.values(): |
| payload[dag.safe_dag_id] = [] |
| for state in State.task_states: |
| try: |
| count = data[dag.dag_id][state] |
| except: |
| count = 0 |
| d = { |
| 'state': state, |
| 'count': count, |
| 'dag_id': dag.dag_id, |
| 'color': State.color(state) |
| } |
| payload[dag.safe_dag_id].append(d) |
| return wwwutils.json_response(payload) |
| |
| @expose('/code') |
| @login_required |
| def code(self): |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| title = dag_id |
| try: |
| with open(dag.fileloc, 'r') as f: |
| code = f.read() |
| html_code = highlight( |
| code, lexers.PythonLexer(), HtmlFormatter(linenos=True)) |
| except IOError as e: |
| html_code = str(e) |
| |
| return self.render( |
| 'airflow/dag_code.html', html_code=html_code, dag=dag, title=title, |
| root=request.args.get('root'), |
| demo_mode=conf.getboolean('webserver', 'demo_mode')) |
| |
| @expose('/dag_details') |
| @login_required |
| def dag_details(self): |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| title = "DAG details" |
| |
| session = settings.Session() |
| TI = models.TaskInstance |
| states = ( |
| session.query(TI.state, sqla.func.count(TI.dag_id)) |
| .filter(TI.dag_id == dag_id) |
| .group_by(TI.state) |
| .all() |
| ) |
| return self.render( |
| 'airflow/dag_details.html', |
| dag=dag, title=title, states=states, State=State) |
| |
| @current_app.errorhandler(404) |
| def circles(self): |
| return render_template( |
| 'airflow/circles.html', hostname=socket.getfqdn()), 404 |
| |
| @current_app.errorhandler(500) |
| def show_traceback(self): |
| from airflow.utils import asciiart as ascii_ |
| return render_template( |
| 'airflow/traceback.html', |
| hostname=socket.getfqdn(), |
| nukular=ascii_.nukular, |
| info=traceback.format_exc()), 500 |
| |
| @expose('/noaccess') |
| def noaccess(self): |
| return self.render('airflow/noaccess.html') |
| |
| @expose('/pickle_info') |
| @login_required |
| def pickle_info(self): |
| d = {} |
| dag_id = request.args.get('dag_id') |
| dags = [dagbag.dags.get(dag_id)] if dag_id else dagbag.dags.values() |
| for dag in dags: |
| if not dag.is_subdag: |
| d[dag.dag_id] = dag.pickle_info() |
| return wwwutils.json_response(d) |
| |
| @expose('/login', methods=['GET', 'POST']) |
| def login(self): |
| return airflow.login.login(self, request) |
| |
| @expose('/logout') |
| def logout(self): |
| logout_user() |
| flash('You have been logged out.') |
| return redirect(url_for('admin.index')) |
| |
| @expose('/rendered') |
| @login_required |
| @wwwutils.action_logging |
| def rendered(self): |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| dttm = dateutil.parser.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| dag = dagbag.get_dag(dag_id) |
| task = copy.copy(dag.get_task(task_id)) |
| ti = models.TaskInstance(task=task, execution_date=dttm) |
| try: |
| ti.render_templates() |
| except Exception as e: |
| flash("Error rendering template: " + str(e), "error") |
| title = "Rendered Template" |
| html_dict = {} |
| for template_field in task.__class__.template_fields: |
| content = getattr(task, template_field) |
| if template_field in attr_renderer: |
| html_dict[template_field] = attr_renderer[template_field](content) |
| else: |
| html_dict[template_field] = ( |
| "<pre><code>" + str(content) + "</pre></code>") |
| |
| return self.render( |
| 'airflow/ti_code.html', |
| html_dict=html_dict, |
| dag=dag, |
| task_id=task_id, |
| execution_date=execution_date, |
| form=form, |
| title=title, ) |
| |
| @expose('/log') |
| @login_required |
| @wwwutils.action_logging |
| def log(self): |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| execution_date = request.args.get('execution_date') |
| dttm = dateutil.parser.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| dag = dagbag.get_dag(dag_id) |
| session = Session() |
| ti = session.query(models.TaskInstance).filter( |
| models.TaskInstance.dag_id == dag_id, |
| models.TaskInstance.task_id == task_id, |
| models.TaskInstance.execution_date == dttm).first() |
| if ti is None: |
| logs = ["*** Task instance did not exist in the DB\n"] |
| else: |
| logger = logging.getLogger('airflow.task') |
| task_log_reader = conf.get('core', 'task_log_reader') |
| handler = next((handler for handler in logger.handlers |
| if handler.name == task_log_reader), None) |
| try: |
| ti.task = dag.get_task(ti.task_id) |
| logs = handler.read(ti) |
| except AttributeError as e: |
| logs = ["Task log handler {} does not support read logs.\n{}\n" \ |
| .format(task_log_reader, str(e))] |
| |
| for i, log in enumerate(logs): |
| if PY2 and not isinstance(log, unicode): |
| logs[i] = log.decode('utf-8') |
| |
| return self.render( |
| 'airflow/ti_log.html', |
| logs=logs, dag=dag, title="Log by attempts", task_id=task_id, |
| execution_date=execution_date, form=form) |
| |
| @expose('/task') |
| @login_required |
| @wwwutils.action_logging |
| def task(self): |
| TI = models.TaskInstance |
| |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| # Carrying execution_date through, even though it's irrelevant for |
| # this context |
| execution_date = request.args.get('execution_date') |
| dttm = dateutil.parser.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| dag = dagbag.get_dag(dag_id) |
| |
| if not dag or task_id not in dag.task_ids: |
| flash( |
| "Task [{}.{}] doesn't seem to exist" |
| " at the moment".format(dag_id, task_id), |
| "error") |
| return redirect('/admin/') |
| task = copy.copy(dag.get_task(task_id)) |
| task.resolve_template_files() |
| ti = TI(task=task, execution_date=dttm) |
| ti.refresh_from_db() |
| |
| ti_attrs = [] |
| for attr_name in dir(ti): |
| if not attr_name.startswith('_'): |
| attr = getattr(ti, attr_name) |
| if type(attr) != type(self.task): |
| ti_attrs.append((attr_name, str(attr))) |
| |
| task_attrs = [] |
| for attr_name in dir(task): |
| if not attr_name.startswith('_'): |
| attr = getattr(task, attr_name) |
| if type(attr) != type(self.task) and \ |
| attr_name not in attr_renderer: |
| task_attrs.append((attr_name, str(attr))) |
| |
| # Color coding the special attributes that are code |
| special_attrs_rendered = {} |
| for attr_name in attr_renderer: |
| if hasattr(task, attr_name): |
| source = getattr(task, attr_name) |
| special_attrs_rendered[attr_name] = attr_renderer[attr_name](source) |
| |
| no_failed_deps_result = [( |
| "Unknown", |
| dedent("""\ |
| All dependencies are met but the task instance is not running. In most cases this just means that the task will probably be scheduled soon unless:<br/> |
| - The scheduler is down or under heavy load<br/> |
| {} |
| <br/> |
| If this task instance does not start soon please contact your Airflow administrator for assistance.""" |
| .format( |
| "- This task instance already ran and had it's state changed manually (e.g. cleared in the UI)<br/>" |
| if ti.state == State.NONE else "")))] |
| |
| # Use the scheduler's context to figure out which dependencies are not met |
| dep_context = DepContext(SCHEDULER_DEPS) |
| failed_dep_reasons = [(dep.dep_name, dep.reason) for dep in |
| ti.get_failed_dep_statuses( |
| dep_context=dep_context)] |
| |
| title = "Task Instance Details" |
| return self.render( |
| 'airflow/task.html', |
| task_attrs=task_attrs, |
| ti_attrs=ti_attrs, |
| failed_dep_reasons=failed_dep_reasons or no_failed_deps_result, |
| task_id=task_id, |
| execution_date=execution_date, |
| special_attrs_rendered=special_attrs_rendered, |
| form=form, |
| dag=dag, title=title) |
| |
| @expose('/xcom') |
| @login_required |
| @wwwutils.action_logging |
| def xcom(self): |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| # Carrying execution_date through, even though it's irrelevant for |
| # this context |
| execution_date = request.args.get('execution_date') |
| dttm = dateutil.parser.parse(execution_date) |
| form = DateTimeForm(data={'execution_date': dttm}) |
| dag = dagbag.get_dag(dag_id) |
| if not dag or task_id not in dag.task_ids: |
| flash( |
| "Task [{}.{}] doesn't seem to exist" |
| " at the moment".format(dag_id, task_id), |
| "error") |
| return redirect('/admin/') |
| |
| session = Session() |
| xcomlist = session.query(XCom).filter( |
| XCom.dag_id == dag_id, XCom.task_id == task_id, |
| XCom.execution_date == dttm).all() |
| |
| attributes = [] |
| for xcom in xcomlist: |
| if not xcom.key.startswith('_'): |
| attributes.append((xcom.key, xcom.value)) |
| |
| title = "XCom" |
| return self.render( |
| 'airflow/xcom.html', |
| attributes=attributes, |
| task_id=task_id, |
| execution_date=execution_date, |
| form=form, |
| dag=dag, title=title) |
| |
| @expose('/run') |
| @login_required |
| @wwwutils.action_logging |
| @wwwutils.notify_owner |
| def run(self): |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| origin = request.args.get('origin') |
| dag = dagbag.get_dag(dag_id) |
| task = dag.get_task(task_id) |
| |
| execution_date = request.args.get('execution_date') |
| execution_date = dateutil.parser.parse(execution_date) |
| ignore_all_deps = request.args.get('ignore_all_deps') == "true" |
| ignore_task_deps = request.args.get('ignore_task_deps') == "true" |
| ignore_ti_state = request.args.get('ignore_ti_state') == "true" |
| |
| try: |
| from airflow.executors import GetDefaultExecutor |
| from airflow.executors.celery_executor import CeleryExecutor |
| executor = GetDefaultExecutor() |
| if not isinstance(executor, CeleryExecutor): |
| flash("Only works with the CeleryExecutor, sorry", "error") |
| return redirect(origin) |
| except ImportError: |
| # in case CeleryExecutor cannot be imported it is not active either |
| flash("Only works with the CeleryExecutor, sorry", "error") |
| return redirect(origin) |
| |
| ti = models.TaskInstance(task=task, execution_date=execution_date) |
| ti.refresh_from_db() |
| |
| # Make sure the task instance can be queued |
| dep_context = DepContext( |
| deps=QUEUE_DEPS, |
| ignore_all_deps=ignore_all_deps, |
| ignore_task_deps=ignore_task_deps, |
| ignore_ti_state=ignore_ti_state) |
| failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context)) |
| if failed_deps: |
| failed_deps_str = ", ".join( |
| ["{}: {}".format(dep.dep_name, dep.reason) for dep in failed_deps]) |
| flash("Could not queue task instance for execution, dependencies not met: " |
| "{}".format(failed_deps_str), |
| "error") |
| return redirect(origin) |
| |
| executor.start() |
| executor.queue_task_instance( |
| ti, |
| ignore_all_deps=ignore_all_deps, |
| ignore_task_deps=ignore_task_deps, |
| ignore_ti_state=ignore_ti_state) |
| executor.heartbeat() |
| flash( |
| "Sent {} to the message queue, " |
| "it should start any moment now.".format(ti)) |
| return redirect(origin) |
| |
| @expose('/trigger') |
| @login_required |
| @wwwutils.action_logging |
| @wwwutils.notify_owner |
| def trigger(self): |
| dag_id = request.args.get('dag_id') |
| origin = request.args.get('origin') or "/admin/" |
| dag = dagbag.get_dag(dag_id) |
| |
| if not dag: |
| flash("Cannot find dag {}".format(dag_id)) |
| return redirect(origin) |
| |
| execution_date = datetime.utcnow() |
| run_id = "manual__{0}".format(execution_date.isoformat()) |
| |
| dr = DagRun.find(dag_id=dag_id, run_id=run_id) |
| if dr: |
| flash("This run_id {} already exists".format(run_id)) |
| return redirect(origin) |
| |
| run_conf = {} |
| |
| dag.create_dagrun( |
| run_id=run_id, |
| execution_date=execution_date, |
| state=State.RUNNING, |
| conf=run_conf, |
| external_trigger=True |
| ) |
| |
| flash( |
| "Triggered {}, " |
| "it should start any moment now.".format(dag_id)) |
| return redirect(origin) |
| |
| def _clear_dag_tis(self, dag, start_date, end_date, origin, |
| recursive=False, confirmed=False): |
| if confirmed: |
| count = dag.clear( |
| start_date=start_date, |
| end_date=end_date, |
| include_subdags=recursive) |
| |
| flash("{0} task instances have been cleared".format(count)) |
| return redirect(origin) |
| |
| tis = dag.clear( |
| start_date=start_date, |
| end_date=end_date, |
| include_subdags=recursive, |
| dry_run=True) |
| if not tis: |
| flash("No task instances to clear", 'error') |
| response = redirect(origin) |
| else: |
| details = "\n".join([str(t) for t in tis]) |
| |
| response = self.render( |
| 'airflow/confirm.html', |
| message=("Here's the list of task instances you are about " |
| "to clear:"), |
| details=details) |
| |
| return response |
| |
| @expose('/clear') |
| @login_required |
| @wwwutils.action_logging |
| @wwwutils.notify_owner |
| def clear(self): |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| origin = request.args.get('origin') |
| dag = dagbag.get_dag(dag_id) |
| |
| execution_date = request.args.get('execution_date') |
| execution_date = dateutil.parser.parse(execution_date) |
| confirmed = request.args.get('confirmed') == "true" |
| upstream = request.args.get('upstream') == "true" |
| downstream = request.args.get('downstream') == "true" |
| future = request.args.get('future') == "true" |
| past = request.args.get('past') == "true" |
| recursive = request.args.get('recursive') == "true" |
| |
| dag = dag.sub_dag( |
| task_regex=r"^{0}$".format(task_id), |
| include_downstream=downstream, |
| include_upstream=upstream) |
| |
| end_date = execution_date if not future else None |
| start_date = execution_date if not past else None |
| |
| return self._clear_dag_tis(dag, start_date, end_date, origin, |
| recursive=recursive, confirmed=confirmed) |
| |
| @expose('/dagrun_clear') |
| @login_required |
| @wwwutils.action_logging |
| @wwwutils.notify_owner |
| def dagrun_clear(self): |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| origin = request.args.get('origin') |
| execution_date = request.args.get('execution_date') |
| confirmed = request.args.get('confirmed') == "true" |
| |
| dag = dagbag.get_dag(dag_id) |
| execution_date = dateutil.parser.parse(execution_date) |
| start_date = execution_date |
| end_date = execution_date |
| |
| return self._clear_dag_tis(dag, start_date, end_date, origin, |
| recursive=True, confirmed=confirmed) |
| |
| @expose('/blocked') |
| @login_required |
| def blocked(self): |
| session = settings.Session() |
| DR = models.DagRun |
| dags = ( |
| session.query(DR.dag_id, sqla.func.count(DR.id)) |
| .filter(DR.state == State.RUNNING) |
| .group_by(DR.dag_id) |
| .all() |
| ) |
| payload = [] |
| for dag_id, active_dag_runs in dags: |
| max_active_runs = 0 |
| if dag_id in dagbag.dags: |
| max_active_runs = dagbag.dags[dag_id].max_active_runs |
| payload.append({ |
| 'dag_id': dag_id, |
| 'active_dag_run': active_dag_runs, |
| 'max_active_runs': max_active_runs, |
| }) |
| return wwwutils.json_response(payload) |
| |
| @expose('/dagrun_success') |
| @login_required |
| @wwwutils.action_logging |
| @wwwutils.notify_owner |
| def dagrun_success(self): |
| dag_id = request.args.get('dag_id') |
| execution_date = request.args.get('execution_date') |
| confirmed = request.args.get('confirmed') == 'true' |
| origin = request.args.get('origin') |
| |
| if not execution_date: |
| flash('Invalid execution date', 'error') |
| return redirect(origin) |
| |
| execution_date = dateutil.parser.parse(execution_date) |
| dag = dagbag.get_dag(dag_id) |
| |
| if not dag: |
| flash('Cannot find DAG: {}'.format(dag_id), 'error') |
| return redirect(origin) |
| |
| new_dag_state = set_dag_run_state(dag, execution_date, state=State.SUCCESS, |
| commit=confirmed) |
| |
| if confirmed: |
| flash('Marked success on {} task instances'.format(len(new_dag_state))) |
| return redirect(origin) |
| |
| else: |
| details = '\n'.join([str(t) for t in new_dag_state]) |
| |
| response = self.render('airflow/confirm.html', |
| message=("Here's the list of task instances you are " |
| "about to mark as successful:"), |
| details=details) |
| |
| return response |
| |
| @expose('/success') |
| @login_required |
| @wwwutils.action_logging |
| @wwwutils.notify_owner |
| def success(self): |
| dag_id = request.args.get('dag_id') |
| task_id = request.args.get('task_id') |
| origin = request.args.get('origin') |
| dag = dagbag.get_dag(dag_id) |
| task = dag.get_task(task_id) |
| task.dag = dag |
| |
| execution_date = request.args.get('execution_date') |
| execution_date = dateutil.parser.parse(execution_date) |
| confirmed = request.args.get('confirmed') == "true" |
| upstream = request.args.get('upstream') == "true" |
| downstream = request.args.get('downstream') == "true" |
| future = request.args.get('future') == "true" |
| past = request.args.get('past') == "true" |
| |
| if not dag: |
| flash("Cannot find DAG: {}".format(dag_id)) |
| return redirect(origin) |
| |
| if not task: |
| flash("Cannot find task {} in DAG {}".format(task_id, dag.dag_id)) |
| return redirect(origin) |
| |
| from airflow.api.common.experimental.mark_tasks import set_state |
| |
| if confirmed: |
| altered = set_state(task=task, execution_date=execution_date, |
| upstream=upstream, downstream=downstream, |
| future=future, past=past, state=State.SUCCESS, |
| commit=True) |
| |
| flash("Marked success on {} task instances".format(len(altered))) |
| return redirect(origin) |
| |
| to_be_altered = set_state(task=task, execution_date=execution_date, |
| upstream=upstream, downstream=downstream, |
| future=future, past=past, state=State.SUCCESS, |
| commit=False) |
| |
| details = "\n".join([str(t) for t in to_be_altered]) |
| |
| response = self.render("airflow/confirm.html", |
| message=("Here's the list of task instances you are " |
| "about to mark as successful:"), |
| details=details) |
| |
| return response |
| |
| @expose('/tree') |
| @login_required |
| @wwwutils.gzipped |
| @wwwutils.action_logging |
| def tree(self): |
| dag_id = request.args.get('dag_id') |
| blur = conf.getboolean('webserver', 'demo_mode') |
| dag = dagbag.get_dag(dag_id) |
| root = request.args.get('root') |
| if root: |
| dag = dag.sub_dag( |
| task_regex=root, |
| include_downstream=False, |
| include_upstream=True) |
| |
| session = settings.Session() |
| |
| base_date = request.args.get('base_date') |
| num_runs = request.args.get('num_runs') |
| num_runs = int(num_runs) if num_runs else 25 |
| |
| if base_date: |
| base_date = dateutil.parser.parse(base_date) |
| else: |
| base_date = dag.latest_execution_date or datetime.utcnow() |
| |
| dates = dag.date_range(base_date, num=-abs(num_runs)) |
| min_date = dates[0] if dates else datetime(2000, 1, 1) |
| |
| DR = models.DagRun |
| dag_runs = ( |
| session.query(DR) |
| .filter( |
| DR.dag_id == dag.dag_id, |
| DR.execution_date <= base_date, |
| DR.execution_date >= min_date) |
| .all() |
| ) |
| dag_runs = { |
| dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs} |
| |
| dates = sorted(list(dag_runs.keys())) |
| max_date = max(dates) if dates else None |
| |
| tis = dag.get_task_instances( |
| session, start_date=min_date, end_date=base_date) |
| task_instances = {} |
| for ti in tis: |
| tid = alchemy_to_dict(ti) |
| dr = dag_runs.get(ti.execution_date) |
| tid['external_trigger'] = dr['external_trigger'] if dr else False |
| task_instances[(ti.task_id, ti.execution_date)] = tid |
| |
| expanded = [] |
| # The default recursion traces every path so that tree view has full |
| # expand/collapse functionality. After 5,000 nodes we stop and fall |
| # back on a quick DFS search for performance. See PR #320. |
| node_count = [0] |
| node_limit = 5000 / max(1, len(dag.roots)) |
| |
| def recurse_nodes(task, visited): |
| visited.add(task) |
| node_count[0] += 1 |
| |
| children = [ |
| recurse_nodes(t, visited) for t in task.upstream_list |
| if node_count[0] < node_limit or t not in visited] |
| |
| # D3 tree uses children vs _children to define what is |
| # expanded or not. The following block makes it such that |
| # repeated nodes are collapsed by default. |
| children_key = 'children' |
| if task.task_id not in expanded: |
| expanded.append(task.task_id) |
| elif children: |
| children_key = "_children" |
| |
| def set_duration(tid): |
| if (isinstance(tid, dict) and tid.get("state") == State.RUNNING and |
| tid["start_date"] is not None): |
| d = datetime.utcnow() - dateutil.parser.parse(tid["start_date"]) |
| tid["duration"] = d.total_seconds() |
| return tid |
| |
| return { |
| 'name': task.task_id, |
| 'instances': [ |
| set_duration(task_instances.get((task.task_id, d))) or { |
| 'execution_date': d.isoformat(), |
| 'task_id': task.task_id |
| } |
| for d in dates], |
| children_key: children, |
| 'num_dep': len(task.upstream_list), |
| 'operator': task.task_type, |
| 'retries': task.retries, |
| 'owner': task.owner, |
| 'start_date': task.start_date, |
| 'end_date': task.end_date, |
| 'depends_on_past': task.depends_on_past, |
| 'ui_color': task.ui_color, |
| } |
| |
| data = { |
| 'name': '[DAG]', |
| 'children': [recurse_nodes(t, set()) for t in dag.roots], |
| 'instances': [ |
| dag_runs.get(d) or {'execution_date': d.isoformat()} |
| for d in dates], |
| } |
| |
| data = json.dumps(data, indent=4, default=json_ser) |
| session.commit() |
| session.close() |
| |
| form = DateTimeWithNumRunsForm(data={'base_date': max_date, |
| 'num_runs': num_runs}) |
| return self.render( |
| 'airflow/tree.html', |
| operators=sorted( |
| list(set([op.__class__ for op in dag.tasks])), |
| key=lambda x: x.__name__ |
| ), |
| root=root, |
| form=form, |
| dag=dag, data=data, blur=blur) |
| |
| @expose('/graph') |
| @login_required |
| @wwwutils.gzipped |
| @wwwutils.action_logging |
| def graph(self): |
| session = settings.Session() |
| dag_id = request.args.get('dag_id') |
| blur = conf.getboolean('webserver', 'demo_mode') |
| dag = dagbag.get_dag(dag_id) |
| if dag_id not in dagbag.dags: |
| flash('DAG "{0}" seems to be missing.'.format(dag_id), "error") |
| return redirect('/admin/') |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.sub_dag( |
| task_regex=root, |
| include_upstream=True, |
| include_downstream=False) |
| |
| arrange = request.args.get('arrange', dag.orientation) |
| |
| nodes = [] |
| edges = [] |
| for task in dag.tasks: |
| nodes.append({ |
| 'id': task.task_id, |
| 'value': { |
| 'label': task.task_id, |
| 'labelStyle': "fill:{0};".format(task.ui_fgcolor), |
| 'style': "fill:{0};".format(task.ui_color), |
| } |
| }) |
| |
| def get_upstream(task): |
| for t in task.upstream_list: |
| edge = { |
| 'u': t.task_id, |
| 'v': task.task_id, |
| } |
| if edge not in edges: |
| edges.append(edge) |
| get_upstream(t) |
| |
| for t in dag.roots: |
| get_upstream(t) |
| |
| dttm = request.args.get('execution_date') |
| if dttm: |
| dttm = dateutil.parser.parse(dttm) |
| else: |
| dttm = dag.latest_execution_date or datetime.utcnow().date() |
| |
| DR = models.DagRun |
| drs = ( |
| session.query(DR) |
| .filter_by(dag_id=dag_id) |
| .order_by(desc(DR.execution_date)).all() |
| ) |
| dr_choices = [] |
| dr_state = None |
| for dr in drs: |
| dr_choices.append((dr.execution_date.isoformat(), dr.run_id)) |
| if dttm == dr.execution_date: |
| dr_state = dr.state |
| |
| class GraphForm(Form): |
| execution_date = SelectField("DAG run", choices=dr_choices) |
| arrange = SelectField("Layout", choices=( |
| ('LR', "Left->Right"), |
| ('RL', "Right->Left"), |
| ('TB', "Top->Bottom"), |
| ('BT', "Bottom->Top"), |
| )) |
| |
| form = GraphForm( |
| data={'execution_date': dttm.isoformat(), 'arrange': arrange}) |
| |
| task_instances = { |
| ti.task_id: alchemy_to_dict(ti) |
| for ti in dag.get_task_instances(session, dttm, dttm)} |
| tasks = { |
| t.task_id: { |
| 'dag_id': t.dag_id, |
| 'task_type': t.task_type, |
| } |
| for t in dag.tasks} |
| if not tasks: |
| flash("No tasks found", "error") |
| session.commit() |
| session.close() |
| doc_md = markdown.markdown(dag.doc_md) if hasattr(dag, 'doc_md') and dag.doc_md else '' |
| |
| return self.render( |
| 'airflow/graph.html', |
| dag=dag, |
| form=form, |
| width=request.args.get('width', "100%"), |
| height=request.args.get('height', "800"), |
| execution_date=dttm.isoformat(), |
| state_token=state_token(dr_state), |
| doc_md=doc_md, |
| arrange=arrange, |
| operators=sorted( |
| list(set([op.__class__ for op in dag.tasks])), |
| key=lambda x: x.__name__ |
| ), |
| blur=blur, |
| root=root or '', |
| task_instances=json.dumps(task_instances, indent=2), |
| tasks=json.dumps(tasks, indent=2), |
| nodes=json.dumps(nodes, indent=2), |
| edges=json.dumps(edges, indent=2), ) |
| |
| @expose('/duration') |
| @login_required |
| @wwwutils.action_logging |
| def duration(self): |
| session = settings.Session() |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| base_date = request.args.get('base_date') |
| num_runs = request.args.get('num_runs') |
| num_runs = int(num_runs) if num_runs else 25 |
| |
| if base_date: |
| base_date = dateutil.parser.parse(base_date) |
| else: |
| base_date = dag.latest_execution_date or datetime.utcnow() |
| |
| dates = dag.date_range(base_date, num=-abs(num_runs)) |
| min_date = dates[0] if dates else datetime(2000, 1, 1) |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.sub_dag( |
| task_regex=root, |
| include_upstream=True, |
| include_downstream=False) |
| |
| chart_height = get_chart_height(dag) |
| chart = nvd3.lineChart( |
| name="lineChart", x_is_date=True, height=chart_height, width="1200") |
| cum_chart = nvd3.lineChart( |
| name="cumLineChart", x_is_date=True, height=chart_height, width="1200") |
| |
| y = defaultdict(list) |
| x = defaultdict(list) |
| cum_y = defaultdict(list) |
| |
| tis = dag.get_task_instances( |
| session, start_date=min_date, end_date=base_date) |
| TF = models.TaskFail |
| ti_fails = ( |
| session |
| .query(TF) |
| .filter( |
| TF.dag_id == dag.dag_id, |
| TF.execution_date >= min_date, |
| TF.execution_date <= base_date, |
| TF.task_id.in_([t.task_id for t in dag.tasks])) |
| .all() |
| ) |
| |
| fails_totals = defaultdict(int) |
| for tf in ti_fails: |
| dict_key = (tf.dag_id, tf.task_id, tf.execution_date) |
| fails_totals[dict_key] += tf.duration |
| |
| for ti in tis: |
| if ti.duration: |
| dttm = wwwutils.epoch(ti.execution_date) |
| x[ti.task_id].append(dttm) |
| y[ti.task_id].append(float(ti.duration)) |
| fails_dict_key = (ti.dag_id, ti.task_id, ti.execution_date) |
| fails_total = fails_totals[fails_dict_key] |
| cum_y[ti.task_id].append(float(ti.duration + fails_total)) |
| |
| # determine the most relevant time unit for the set of task instance |
| # durations for the DAG |
| y_unit = infer_time_unit([d for t in y.values() for d in t]) |
| cum_y_unit = infer_time_unit([d for t in cum_y.values() for d in t]) |
| # update the y Axis on both charts to have the correct time units |
| chart.create_y_axis('yAxis', format='.02f', custom_format=False, |
| label='Duration ({})'.format(y_unit)) |
| chart.axislist['yAxis']['axisLabelDistance'] = '40' |
| cum_chart.create_y_axis('yAxis', format='.02f', custom_format=False, |
| label='Duration ({})'.format(cum_y_unit)) |
| cum_chart.axislist['yAxis']['axisLabelDistance'] = '40' |
| for task in dag.tasks: |
| if x[task.task_id]: |
| chart.add_serie(name=task.task_id, x=x[task.task_id], |
| y=scale_time_units(y[task.task_id], y_unit)) |
| cum_chart.add_serie(name=task.task_id, x=x[task.task_id], |
| y=scale_time_units(cum_y[task.task_id], |
| cum_y_unit)) |
| |
| dates = sorted(list({ti.execution_date for ti in tis})) |
| max_date = max([ti.execution_date for ti in tis]) if dates else None |
| |
| session.commit() |
| session.close() |
| |
| form = DateTimeWithNumRunsForm(data={'base_date': max_date, |
| 'num_runs': num_runs}) |
| chart.buildcontent() |
| cum_chart.buildcontent() |
| s_index = cum_chart.htmlcontent.rfind('});') |
| cum_chart.htmlcontent = (cum_chart.htmlcontent[:s_index] + |
| "$(function() {$( document ).trigger('chartload') })" + |
| cum_chart.htmlcontent[s_index:]) |
| |
| return self.render( |
| 'airflow/duration_chart.html', |
| dag=dag, |
| demo_mode=conf.getboolean('webserver', 'demo_mode'), |
| root=root, |
| form=form, |
| chart=chart.htmlcontent, |
| cum_chart=cum_chart.htmlcontent |
| ) |
| |
| @expose('/tries') |
| @login_required |
| @wwwutils.action_logging |
| def tries(self): |
| session = settings.Session() |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| base_date = request.args.get('base_date') |
| num_runs = request.args.get('num_runs') |
| num_runs = int(num_runs) if num_runs else 25 |
| |
| if base_date: |
| base_date = dateutil.parser.parse(base_date) |
| else: |
| base_date = dag.latest_execution_date or datetime.utcnow() |
| |
| dates = dag.date_range(base_date, num=-abs(num_runs)) |
| min_date = dates[0] if dates else datetime(2000, 1, 1) |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.sub_dag( |
| task_regex=root, |
| include_upstream=True, |
| include_downstream=False) |
| |
| chart_height = get_chart_height(dag) |
| chart = nvd3.lineChart( |
| name="lineChart", x_is_date=True, y_axis_format='d', height=chart_height, |
| width="1200") |
| |
| for task in dag.tasks: |
| y = [] |
| x = [] |
| for ti in task.get_task_instances(session, start_date=min_date, |
| end_date=base_date): |
| dttm = wwwutils.epoch(ti.execution_date) |
| x.append(dttm) |
| y.append(ti.try_number) |
| if x: |
| chart.add_serie(name=task.task_id, x=x, y=y) |
| |
| tis = dag.get_task_instances( |
| session, start_date=min_date, end_date=base_date) |
| tries = sorted(list({ti.try_number for ti in tis})) |
| max_date = max([ti.execution_date for ti in tis]) if tries else None |
| |
| session.commit() |
| session.close() |
| |
| form = DateTimeWithNumRunsForm(data={'base_date': max_date, |
| 'num_runs': num_runs}) |
| |
| chart.buildcontent() |
| |
| return self.render( |
| 'airflow/chart.html', |
| dag=dag, |
| demo_mode=conf.getboolean('webserver', 'demo_mode'), |
| root=root, |
| form=form, |
| chart=chart.htmlcontent |
| ) |
| |
| @expose('/landing_times') |
| @login_required |
| @wwwutils.action_logging |
| def landing_times(self): |
| session = settings.Session() |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| base_date = request.args.get('base_date') |
| num_runs = request.args.get('num_runs') |
| num_runs = int(num_runs) if num_runs else 25 |
| |
| if base_date: |
| base_date = dateutil.parser.parse(base_date) |
| else: |
| base_date = dag.latest_execution_date or datetime.utcnow() |
| |
| dates = dag.date_range(base_date, num=-abs(num_runs)) |
| min_date = dates[0] if dates else datetime(2000, 1, 1) |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.sub_dag( |
| task_regex=root, |
| include_upstream=True, |
| include_downstream=False) |
| |
| chart_height = get_chart_height(dag) |
| chart = nvd3.lineChart( |
| name="lineChart", x_is_date=True, height=chart_height, width="1200") |
| y = {} |
| x = {} |
| for task in dag.tasks: |
| y[task.task_id] = [] |
| x[task.task_id] = [] |
| for ti in task.get_task_instances(session, start_date=min_date, |
| end_date=base_date): |
| ts = ti.execution_date |
| if dag.schedule_interval and dag.following_schedule(ts): |
| ts = dag.following_schedule(ts) |
| if ti.end_date: |
| dttm = wwwutils.epoch(ti.execution_date) |
| secs = (ti.end_date - ts).total_seconds() |
| x[ti.task_id].append(dttm) |
| y[ti.task_id].append(secs) |
| |
| # determine the most relevant time unit for the set of landing times |
| # for the DAG |
| y_unit = infer_time_unit([d for t in y.values() for d in t]) |
| # update the y Axis to have the correct time units |
| chart.create_y_axis('yAxis', format='.02f', custom_format=False, |
| label='Landing Time ({})'.format(y_unit)) |
| chart.axislist['yAxis']['axisLabelDistance'] = '40' |
| for task in dag.tasks: |
| if x[task.task_id]: |
| chart.add_serie(name=task.task_id, x=x[task.task_id], |
| y=scale_time_units(y[task.task_id], y_unit)) |
| |
| tis = dag.get_task_instances( |
| session, start_date=min_date, end_date=base_date) |
| dates = sorted(list({ti.execution_date for ti in tis})) |
| max_date = max([ti.execution_date for ti in tis]) if dates else None |
| |
| session.commit() |
| session.close() |
| |
| form = DateTimeWithNumRunsForm(data={'base_date': max_date, |
| 'num_runs': num_runs}) |
| chart.buildcontent() |
| return self.render( |
| 'airflow/chart.html', |
| dag=dag, |
| chart=chart.htmlcontent, |
| height=str(chart_height + 100) + "px", |
| demo_mode=conf.getboolean('webserver', 'demo_mode'), |
| root=root, |
| form=form, |
| ) |
| |
| @expose('/paused', methods=['POST']) |
| @login_required |
| @wwwutils.action_logging |
| def paused(self): |
| DagModel = models.DagModel |
| dag_id = request.args.get('dag_id') |
| session = settings.Session() |
| orm_dag = session.query( |
| DagModel).filter(DagModel.dag_id == dag_id).first() |
| if request.args.get('is_paused') == 'false': |
| orm_dag.is_paused = True |
| else: |
| orm_dag.is_paused = False |
| session.merge(orm_dag) |
| session.commit() |
| session.close() |
| |
| dagbag.get_dag(dag_id) |
| return "OK" |
| |
| @expose('/refresh') |
| @login_required |
| @wwwutils.action_logging |
| def refresh(self): |
| DagModel = models.DagModel |
| dag_id = request.args.get('dag_id') |
| session = settings.Session() |
| orm_dag = session.query( |
| DagModel).filter(DagModel.dag_id == dag_id).first() |
| |
| if orm_dag: |
| orm_dag.last_expired = datetime.utcnow() |
| session.merge(orm_dag) |
| session.commit() |
| session.close() |
| |
| dagbag.get_dag(dag_id) |
| flash("DAG [{}] is now fresh as a daisy".format(dag_id)) |
| return redirect(request.referrer) |
| |
| @expose('/refresh_all') |
| @login_required |
| @wwwutils.action_logging |
| def refresh_all(self): |
| dagbag.collect_dags(only_if_updated=False) |
| flash("All DAGs are now up to date") |
| return redirect('/') |
| |
| @expose('/gantt') |
| @login_required |
| @wwwutils.action_logging |
| def gantt(self): |
| session = settings.Session() |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| demo_mode = conf.getboolean('webserver', 'demo_mode') |
| |
| root = request.args.get('root') |
| if root: |
| dag = dag.sub_dag( |
| task_regex=root, |
| include_upstream=True, |
| include_downstream=False) |
| |
| dttm = request.args.get('execution_date') |
| if dttm: |
| dttm = dateutil.parser.parse(dttm) |
| else: |
| dttm = dag.latest_execution_date or datetime.utcnow().date() |
| |
| form = DateTimeForm(data={'execution_date': dttm}) |
| |
| tis = [ |
| ti for ti in dag.get_task_instances(session, dttm, dttm) |
| if ti.start_date] |
| tis = sorted(tis, key=lambda ti: ti.start_date) |
| |
| tasks = [] |
| for ti in tis: |
| end_date = ti.end_date if ti.end_date else datetime.utcnow() |
| tasks.append({ |
| 'startDate': wwwutils.epoch(ti.start_date), |
| 'endDate': wwwutils.epoch(end_date), |
| 'isoStart': ti.start_date.isoformat()[:-4], |
| 'isoEnd': end_date.isoformat()[:-4], |
| 'taskName': ti.task_id, |
| 'duration': "{}".format(end_date - ti.start_date)[:-4], |
| 'status': ti.state, |
| 'executionDate': ti.execution_date.isoformat(), |
| }) |
| states = {ti.state: ti.state for ti in tis} |
| data = { |
| 'taskNames': [ti.task_id for ti in tis], |
| 'tasks': tasks, |
| 'taskStatus': states, |
| 'height': len(tis) * 25 + 25, |
| } |
| |
| session.commit() |
| session.close() |
| |
| return self.render( |
| 'airflow/gantt.html', |
| dag=dag, |
| execution_date=dttm.isoformat(), |
| form=form, |
| data=json.dumps(data, indent=2), |
| base_date='', |
| demo_mode=demo_mode, |
| root=root, |
| ) |
| |
| @expose('/object/task_instances') |
| @login_required |
| @wwwutils.action_logging |
| def task_instances(self): |
| session = settings.Session() |
| dag_id = request.args.get('dag_id') |
| dag = dagbag.get_dag(dag_id) |
| |
| dttm = request.args.get('execution_date') |
| if dttm: |
| dttm = dateutil.parser.parse(dttm) |
| else: |
| return ("Error: Invalid execution_date") |
| |
| task_instances = { |
| ti.task_id: alchemy_to_dict(ti) |
| for ti in dag.get_task_instances(session, dttm, dttm)} |
| |
| return json.dumps(task_instances) |
| |
| @expose('/variables/<form>', methods=["GET", "POST"]) |
| @login_required |
| @wwwutils.action_logging |
| def variables(self, form): |
| try: |
| if request.method == 'POST': |
| data = request.json |
| if data: |
| session = settings.Session() |
| var = models.Variable(key=form, val=json.dumps(data)) |
| session.add(var) |
| session.commit() |
| return "" |
| else: |
| return self.render( |
| 'airflow/variables/{}.html'.format(form) |
| ) |
| except: |
| # prevent XSS |
| form = escape(form) |
| return ("Error: form airflow/variables/{}.html " |
| "not found.").format(form), 404 |
| |
| @expose('/varimport', methods=["GET", "POST"]) |
| @login_required |
| @wwwutils.action_logging |
| def varimport(self): |
| try: |
| out = str(request.files['file'].read()) |
| d = json.loads(out) |
| except Exception: |
| flash("Missing file or syntax error.") |
| else: |
| for k, v in d.items(): |
| models.Variable.set(k, v, serialize_json=isinstance(v, dict)) |
| flash("{} variable(s) successfully updated.".format(len(d))) |
| return redirect('/admin/variable') |
| |
| |
| class HomeView(AdminIndexView): |
| @expose("/") |
| @login_required |
| def index(self): |
| session = Session() |
| DM = models.DagModel |
| |
| # restrict the dags shown if filter_by_owner and current user is not superuser |
| do_filter = FILTER_BY_OWNER and (not current_user.is_superuser()) |
| owner_mode = conf.get('webserver', 'OWNER_MODE').strip().lower() |
| |
| hide_paused_dags_by_default = conf.getboolean('webserver', |
| 'hide_paused_dags_by_default') |
| show_paused_arg = request.args.get('showPaused', 'None') |
| |
| def get_int_arg(value, default=0): |
| try: |
| return int(value) |
| except ValueError: |
| return default |
| |
| arg_current_page = request.args.get('page', '0') |
| arg_search_query = request.args.get('search', None) |
| |
| dags_per_page = PAGE_SIZE |
| current_page = get_int_arg(arg_current_page, default=0) |
| |
| if show_paused_arg.strip().lower() == 'false': |
| hide_paused = True |
| elif show_paused_arg.strip().lower() == 'true': |
| hide_paused = False |
| else: |
| hide_paused = hide_paused_dags_by_default |
| |
| # read orm_dags from the db |
| sql_query = session.query(DM) |
| |
| if do_filter and owner_mode == 'ldapgroup': |
| sql_query = sql_query.filter( |
| ~DM.is_subdag, |
| DM.is_active, |
| DM.owners.in_(current_user.ldap_groups) |
| ) |
| elif do_filter and owner_mode == 'user': |
| sql_query = sql_query.filter( |
| ~DM.is_subdag, DM.is_active, |
| DM.owners == current_user.user.username |
| ) |
| else: |
| sql_query = sql_query.filter( |
| ~DM.is_subdag, DM.is_active |
| ) |
| |
| # optionally filter out "paused" dags |
| if hide_paused: |
| sql_query = sql_query.filter(~DM.is_paused) |
| |
| orm_dags = {dag.dag_id: dag for dag |
| in sql_query |
| .all()} |
| |
| import_errors = session.query(models.ImportError).all() |
| for ie in import_errors: |
| flash( |
| "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie), |
| "error") |
| session.expunge_all() |
| session.commit() |
| session.close() |
| |
| # get a list of all non-subdag dags visible to everyone |
| # optionally filter out "paused" dags |
| if hide_paused: |
| unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if |
| not dag.parent_dag and not dag.is_paused] |
| |
| else: |
| unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if |
| not dag.parent_dag] |
| |
| # optionally filter to get only dags that the user should see |
| if do_filter and owner_mode == 'ldapgroup': |
| # only show dags owned by someone in @current_user.ldap_groups |
| webserver_dags = { |
| dag.dag_id: dag |
| for dag in unfiltered_webserver_dags |
| if dag.owner in current_user.ldap_groups |
| } |
| elif do_filter and owner_mode == 'user': |
| # only show dags owned by @current_user.user.username |
| webserver_dags = { |
| dag.dag_id: dag |
| for dag in unfiltered_webserver_dags |
| if dag.owner == current_user.user.username |
| } |
| else: |
| webserver_dags = { |
| dag.dag_id: dag |
| for dag in unfiltered_webserver_dags |
| } |
| |
| if arg_search_query: |
| lower_search_query = arg_search_query.lower() |
| # filter by dag_id |
| webserver_dags_filtered = { |
| dag_id: dag |
| for dag_id, dag in webserver_dags.items() |
| if (lower_search_query in dag_id.lower() or |
| lower_search_query in dag.owner.lower()) |
| } |
| |
| all_dag_ids = (set([dag.dag_id for dag in orm_dags.values() |
| if lower_search_query in dag.dag_id.lower() or |
| lower_search_query in dag.owners.lower()]) | |
| set(webserver_dags_filtered.keys())) |
| |
| sorted_dag_ids = sorted(all_dag_ids) |
| else: |
| webserver_dags_filtered = webserver_dags |
| sorted_dag_ids = sorted(set(orm_dags.keys()) | set(webserver_dags.keys())) |
| |
| start = current_page * dags_per_page |
| end = start + dags_per_page |
| |
| num_of_all_dags = len(sorted_dag_ids) |
| page_dag_ids = sorted_dag_ids[start:end] |
| num_of_pages = int(math.ceil(num_of_all_dags / float(dags_per_page))) |
| |
| auto_complete_data = set() |
| for dag in webserver_dags_filtered.values(): |
| auto_complete_data.add(dag.dag_id) |
| auto_complete_data.add(dag.owner) |
| for dag in orm_dags.values(): |
| auto_complete_data.add(dag.dag_id) |
| auto_complete_data.add(dag.owners) |
| |
| return self.render( |
| 'airflow/dags.html', |
| webserver_dags=webserver_dags_filtered, |
| orm_dags=orm_dags, |
| hide_paused=hide_paused, |
| current_page=current_page, |
| search_query=arg_search_query if arg_search_query else '', |
| page_size=dags_per_page, |
| num_of_pages=num_of_pages, |
| num_dag_from=start + 1, |
| num_dag_to=min(end, num_of_all_dags), |
| num_of_all_dags=num_of_all_dags, |
| paging=wwwutils.generate_pages(current_page, num_of_pages, |
| search=arg_search_query, |
| showPaused=not hide_paused), |
| dag_ids_in_page=page_dag_ids, |
| auto_complete_data=auto_complete_data) |
| |
| |
| class QueryView(wwwutils.DataProfilingMixin, BaseView): |
| @expose('/', methods=['POST', 'GET']) |
| @wwwutils.gzipped |
| def query(self): |
| session = settings.Session() |
| dbs = session.query(models.Connection).order_by( |
| models.Connection.conn_id).all() |
| session.expunge_all() |
| db_choices = list( |
| ((db.conn_id, db.conn_id) for db in dbs if db.get_hook())) |
| conn_id_str = request.form.get('conn_id') |
| csv = request.form.get('csv') == "true" |
| sql = request.form.get('sql') |
| |
| class QueryForm(Form): |
| conn_id = SelectField("Layout", choices=db_choices) |
| sql = TextAreaField("SQL", widget=wwwutils.AceEditorWidget()) |
| |
| data = { |
| 'conn_id': conn_id_str, |
| 'sql': sql, |
| } |
| results = None |
| has_data = False |
| error = False |
| if conn_id_str: |
| db = [db for db in dbs if db.conn_id == conn_id_str][0] |
| hook = db.get_hook() |
| try: |
| df = hook.get_pandas_df(wwwutils.limit_sql(sql, QUERY_LIMIT, conn_type=db.conn_type)) |
| # df = hook.get_pandas_df(sql) |
| has_data = len(df) > 0 |
| df = df.fillna('') |
| results = df.to_html( |
| classes=[ |
| 'table', 'table-bordered', 'table-striped', 'no-wrap'], |
| index=False, |
| na_rep='', |
| ) if has_data else '' |
| except Exception as e: |
| flash(str(e), 'error') |
| error = True |
| |
| if has_data and len(df) == QUERY_LIMIT: |
| flash( |
| "Query output truncated at " + str(QUERY_LIMIT) + |
| " rows", 'info') |
| |
| if not has_data and error: |
| flash('No data', 'error') |
| |
| if csv: |
| return Response( |
| response=df.to_csv(index=False), |
| status=200, |
| mimetype="application/text") |
| |
| form = QueryForm(request.form, data=data) |
| session.commit() |
| session.close() |
| return self.render( |
| 'airflow/query.html', form=form, |
| title="Ad Hoc Query", |
| results=results or '', |
| has_data=has_data) |
| |
| |
| class AirflowModelView(ModelView): |
| list_template = 'airflow/model_list.html' |
| edit_template = 'airflow/model_edit.html' |
| create_template = 'airflow/model_create.html' |
| column_display_actions = True |
| page_size = PAGE_SIZE |
| |
| |
| class ModelViewOnly(wwwutils.LoginMixin, AirflowModelView): |
| """ |
| Modifying the base ModelView class for non edit, browse only operations |
| """ |
| named_filter_urls = True |
| can_create = False |
| can_edit = False |
| can_delete = False |
| column_display_pk = True |
| |
| |
| class PoolModelView(wwwutils.SuperUserMixin, AirflowModelView): |
| column_list = ('pool', 'slots', 'used_slots', 'queued_slots') |
| column_formatters = dict( |
| pool=pool_link, used_slots=fused_slots, queued_slots=fqueued_slots) |
| named_filter_urls = True |
| form_args = { |
| 'pool': { |
| 'validators': [ |
| validators.DataRequired(), |
| ] |
| } |
| } |
| |
| |
| class SlaMissModelView(wwwutils.SuperUserMixin, ModelViewOnly): |
| verbose_name_plural = "SLA misses" |
| verbose_name = "SLA miss" |
| column_list = ( |
| 'dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp') |
| column_formatters = dict( |
| task_id=task_instance_link, |
| execution_date=datetime_f, |
| timestamp=datetime_f, |
| dag_id=dag_link) |
| named_filter_urls = True |
| column_searchable_list = ('dag_id', 'task_id',) |
| column_filters = ( |
| 'dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date') |
| form_widget_args = { |
| 'email_sent': {'disabled': True}, |
| 'timestamp': {'disabled': True}, |
| } |
| |
| |
| class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView): |
| verbose_name = "chart" |
| verbose_name_plural = "charts" |
| form_columns = ( |
| 'label', |
| 'owner', |
| 'conn_id', |
| 'chart_type', |
| 'show_datatable', |
| 'x_is_date', |
| 'y_log_scale', |
| 'show_sql', |
| 'height', |
| 'sql_layout', |
| 'sql', |
| 'default_params', |
| ) |
| column_list = ( |
| 'label', |
| 'conn_id', |
| 'chart_type', |
| 'owner', |
| 'last_modified', |
| ) |
| column_sortable_list = ( |
| 'label', |
| 'conn_id', |
| 'chart_type', |
| ('owner', 'owner.username'), |
| 'last_modified', |
| ) |
| column_formatters = dict(label=label_link, last_modified=datetime_f) |
| column_default_sort = ('last_modified', True) |
| create_template = 'airflow/chart/create.html' |
| edit_template = 'airflow/chart/edit.html' |
| column_filters = ('label', 'owner.username', 'conn_id') |
| column_searchable_list = ('owner.username', 'label', 'sql') |
| column_descriptions = { |
| 'label': "Can include {{ templated_fields }} and {{ macros }}", |
| 'chart_type': "The type of chart to be displayed", |
| 'sql': "Can include {{ templated_fields }} and {{ macros }}.", |
| 'height': "Height of the chart, in pixels.", |
| 'conn_id': "Source database to run the query against", |
| 'x_is_date': ( |
| "Whether the X axis should be casted as a date field. Expect most " |
| "intelligible date formats to get casted properly." |
| ), |
| 'owner': ( |
| "The chart's owner, mostly used for reference and filtering in " |
| "the list view." |
| ), |
| 'show_datatable': |
| "Whether to display an interactive data table under the chart.", |
| 'default_params': ( |
| 'A dictionary of {"key": "values",} that define what the ' |
| 'templated fields (parameters) values should be by default. ' |
| 'To be valid, it needs to "eval" as a Python dict. ' |
| 'The key values will show up in the url\'s querystring ' |
| 'and can be altered there.' |
| ), |
| 'show_sql': "Whether to display the SQL statement as a collapsible " |
| "section in the chart page.", |
| 'y_log_scale': "Whether to use a log scale for the Y axis.", |
| 'sql_layout': ( |
| "Defines the layout of the SQL that the application should " |
| "expect. Depending on the tables you are sourcing from, it may " |
| "make more sense to pivot / unpivot the metrics." |
| ), |
| } |
| column_labels = { |
| 'sql': "SQL", |
| 'height': "Chart Height", |
| 'sql_layout': "SQL Layout", |
| 'show_sql': "Display the SQL Statement", |
| 'default_params': "Default Parameters", |
| } |
| form_choices = { |
| 'chart_type': [ |
| ('line', 'Line Chart'), |
| ('spline', 'Spline Chart'), |
| ('bar', 'Bar Chart'), |
| ('column', 'Column Chart'), |
| ('area', 'Overlapping Area Chart'), |
| ('stacked_area', 'Stacked Area Chart'), |
| ('percent_area', 'Percent Area Chart'), |
| ('datatable', 'No chart, data table only'), |
| ], |
| 'sql_layout': [ |
| ('series', 'SELECT series, x, y FROM ...'), |
| ('columns', 'SELECT x, y (series 1), y (series 2), ... FROM ...'), |
| ], |
| 'conn_id': [ |
| (c.conn_id, c.conn_id) |
| for c in ( |
| Session().query(models.Connection.conn_id) |
| .group_by(models.Connection.conn_id) |
| ) |
| ] |
| } |
| |
| def on_model_change(self, form, model, is_created=True): |
| if model.iteration_no is None: |
| model.iteration_no = 0 |
| else: |
| model.iteration_no += 1 |
| if not model.user_id and current_user and hasattr(current_user, 'id'): |
| model.user_id = current_user.id |
| model.last_modified = datetime.utcnow() |
| |
| |
| chart_mapping = ( |
| ('line', 'lineChart'), |
| ('spline', 'lineChart'), |
| ('bar', 'multiBarChart'), |
| ('column', 'multiBarChart'), |
| ('area', 'stackedAreaChart'), |
| ('stacked_area', 'stackedAreaChart'), |
| ('percent_area', 'stackedAreaChart'), |
| ('datatable', 'datatable'), |
| ) |
| chart_mapping = dict(chart_mapping) |
| |
| |
| class KnownEventView(wwwutils.DataProfilingMixin, AirflowModelView): |
| verbose_name = "known event" |
| verbose_name_plural = "known events" |
| form_columns = ( |
| 'label', |
| 'event_type', |
| 'start_date', |
| 'end_date', |
| 'reported_by', |
| 'description', |
| ) |
| form_args = { |
| 'label': { |
| 'validators': [ |
| validators.DataRequired(), |
| ], |
| }, |
| 'event_type': { |
| 'validators': [ |
| validators.DataRequired(), |
| ], |
| }, |
| 'start_date': { |
| 'validators': [ |
| validators.DataRequired(), |
| ], |
| }, |
| 'end_date': { |
| 'validators': [ |
| validators.DataRequired(), |
| GreaterEqualThan(fieldname='start_date'), |
| ], |
| }, |
| 'reported_by': { |
| 'validators': [ |
| validators.DataRequired(), |
| ], |
| } |
| } |
| column_list = ( |
| 'label', |
| 'event_type', |
| 'start_date', |
| 'end_date', |
| 'reported_by', |
| ) |
| column_default_sort = ("start_date", True) |
| column_sortable_list = ( |
| 'label', |
| ('event_type', 'event_type.know_event_type'), |
| 'start_date', |
| 'end_date', |
| ('reported_by', 'reported_by.username'), |
| ) |
| |
| |
| class KnownEventTypeView(wwwutils.DataProfilingMixin, AirflowModelView): |
| pass |
| |
| |
| # NOTE: For debugging / troubleshooting |
| # mv = KnowEventTypeView( |
| # models.KnownEventType, |
| # Session, name="Known Event Types", category="Manage") |
| # admin.add_view(mv) |
| # class DagPickleView(SuperUserMixin, ModelView): |
| # pass |
| # mv = DagPickleView( |
| # models.DagPickle, |
| # Session, name="Pickles", category="Manage") |
| # admin.add_view(mv) |
| |
| |
| class VariableView(wwwutils.DataProfilingMixin, AirflowModelView): |
| verbose_name = "Variable" |
| verbose_name_plural = "Variables" |
| list_template = 'airflow/variable_list.html' |
| |
| def hidden_field_formatter(view, context, model, name): |
| if wwwutils.should_hide_value_for_key(model.key): |
| return Markup('*' * 8) |
| try: |
| return getattr(model, name) |
| except AirflowException: |
| return Markup('<span class="label label-danger">Invalid</span>') |
| |
| form_columns = ( |
| 'key', |
| 'val', |
| ) |
| column_list = ('key', 'val', 'is_encrypted',) |
| column_filters = ('key', 'val') |
| column_searchable_list = ('key', 'val') |
| column_default_sort = ('key', False) |
| form_widget_args = { |
| 'is_encrypted': {'disabled': True}, |
| 'val': { |
| 'rows': 20, |
| } |
| } |
| form_args = { |
| 'key': { |
| 'validators': { |
| validators.DataRequired(), |
| }, |
| }, |
| } |
| column_sortable_list = ( |
| 'key', |
| 'val', |
| 'is_encrypted', |
| ) |
| column_formatters = { |
| 'val': hidden_field_formatter, |
| } |
| |
| # Default flask-admin export functionality doesn't handle serialized json |
| @action('varexport', 'Export', None) |
| def action_varexport(self, ids): |
| V = models.Variable |
| session = settings.Session() |
| qry = session.query(V).filter(V.id.in_(ids)).all() |
| session.close() |
| |
| var_dict = {} |
| d = json.JSONDecoder() |
| for var in qry: |
| val = None |
| try: |
| val = d.decode(var.val) |
| except: |
| val = var.val |
| var_dict[var.key] = val |
| |
| response = make_response(json.dumps(var_dict, sort_keys=True, indent=4)) |
| response.headers["Content-Disposition"] = "attachment; filename=variables.json" |
| return response |
| |
| def on_form_prefill(self, form, id): |
| if wwwutils.should_hide_value_for_key(form.key.data): |
| form.val.data = '*' * 8 |
| |
| |
| class XComView(wwwutils.SuperUserMixin, AirflowModelView): |
| verbose_name = "XCom" |
| verbose_name_plural = "XComs" |
| |
| form_columns = ( |
| 'key', |
| 'value', |
| 'execution_date', |
| 'task_id', |
| 'dag_id', |
| ) |
| |
| form_extra_fields = { |
| 'value': StringField('Value'), |
| } |
| |
| column_filters = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id') |
| column_searchable_list = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id') |
| |
| |
| class JobModelView(ModelViewOnly): |
| verbose_name_plural = "jobs" |
| verbose_name = "job" |
| column_display_actions = False |
| column_default_sort = ('start_date', True) |
| column_filters = ( |
| 'job_type', 'dag_id', 'state', |
| 'unixname', 'hostname', 'start_date', 'end_date', 'latest_heartbeat') |
| column_formatters = dict( |
| start_date=datetime_f, |
| end_date=datetime_f, |
| hostname=nobr_f, |
| state=state_f, |
| latest_heartbeat=datetime_f) |
| |
| |
| class DagRunModelView(ModelViewOnly): |
| verbose_name_plural = "DAG Runs" |
| can_edit = True |
| can_create = True |
| column_editable_list = ('state',) |
| verbose_name = "dag run" |
| column_default_sort = ('execution_date', True) |
| form_choices = { |
| 'state': [ |
| ('success', 'success'), |
| ('running', 'running'), |
| ('failed', 'failed'), |
| ], |
| } |
| form_args = dict( |
| dag_id=dict(validators=[validators.DataRequired()]) |
| ) |
| column_list = ( |
| 'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger') |
| column_filters = column_list |
| column_searchable_list = ('dag_id', 'state', 'run_id') |
| column_formatters = dict( |
| execution_date=datetime_f, |
| state=state_f, |
| start_date=datetime_f, |
| dag_id=dag_link) |
| |
| @action('new_delete', "Delete", "Are you sure you want to delete selected records?") |
| def action_new_delete(self, ids): |
| session = settings.Session() |
| deleted = set(session.query(models.DagRun) |
| .filter(models.DagRun.id.in_(ids)) |
| .all()) |
| session.query(models.DagRun) \ |
| .filter(models.DagRun.id.in_(ids)) \ |
| .delete(synchronize_session='fetch') |
| session.commit() |
| dirty_ids = [] |
| for row in deleted: |
| dirty_ids.append(row.dag_id) |
| models.DagStat.update(dirty_ids, dirty_only=False, session=session) |
| session.close() |
| |
| @action('set_running', "Set state to 'running'", None) |
| def action_set_running(self, ids): |
| self.set_dagrun_state(ids, State.RUNNING) |
| |
| @action('set_failed', "Set state to 'failed'", None) |
| def action_set_failed(self, ids): |
| self.set_dagrun_state(ids, State.FAILED) |
| |
| @action('set_success', "Set state to 'success'", None) |
| def action_set_success(self, ids): |
| self.set_dagrun_state(ids, State.SUCCESS) |
| |
| @provide_session |
| def set_dagrun_state(self, ids, target_state, session=None): |
| try: |
| DR = models.DagRun |
| count = 0 |
| dirty_ids = [] |
| for dr in session.query(DR).filter(DR.id.in_(ids)).all(): |
| dirty_ids.append(dr.dag_id) |
| count += 1 |
| dr.state = target_state |
| if target_state == State.RUNNING: |
| dr.start_date = datetime.utcnow() |
| else: |
| dr.end_date = datetime.utcnow() |
| session.commit() |
| models.DagStat.update(dirty_ids, session=session) |
| flash( |
| "{count} dag runs were set to '{target_state}'".format(**locals())) |
| except Exception as ex: |
| if not self.handle_view_exception(ex): |
| raise Exception("Ooops") |
| flash('Failed to set state', 'error') |
| |
| |
| class LogModelView(ModelViewOnly): |
| verbose_name_plural = "logs" |
| verbose_name = "log" |
| column_display_actions = False |
| column_default_sort = ('dttm', True) |
| column_filters = ('dag_id', 'task_id', 'execution_date') |
| column_formatters = dict( |
| dttm=datetime_f, execution_date=datetime_f, dag_id=dag_link) |
| |
| |
| class TaskInstanceModelView(ModelViewOnly): |
| verbose_name_plural = "task instances" |
| verbose_name = "task instance" |
| column_filters = ( |
| 'state', 'dag_id', 'task_id', 'execution_date', 'hostname', |
| 'queue', 'pool', 'operator', 'start_date', 'end_date') |
| named_filter_urls = True |
| column_formatters = dict( |
| log_url=log_url_formatter, |
| task_id=task_instance_link, |
| hostname=nobr_f, |
| state=state_f, |
| execution_date=datetime_f, |
| start_date=datetime_f, |
| end_date=datetime_f, |
| queued_dttm=datetime_f, |
| dag_id=dag_link, duration=duration_f) |
| column_searchable_list = ('dag_id', 'task_id', 'state') |
| column_default_sort = ('job_id', True) |
| form_choices = { |
| 'state': [ |
| ('success', 'success'), |
| ('running', 'running'), |
| ('failed', 'failed'), |
| ], |
| } |
| column_list = ( |
| 'state', 'dag_id', 'task_id', 'execution_date', 'operator', |
| 'start_date', 'end_date', 'duration', 'job_id', 'hostname', |
| 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number', |
| 'pool', 'log_url') |
| can_delete = True |
| page_size = PAGE_SIZE |
| |
| @action('set_running', "Set state to 'running'", None) |
| def action_set_running(self, ids): |
| self.set_task_instance_state(ids, State.RUNNING) |
| |
| @action('set_failed', "Set state to 'failed'", None) |
| def action_set_failed(self, ids): |
| self.set_task_instance_state(ids, State.FAILED) |
| |
| @action('set_success', "Set state to 'success'", None) |
| def action_set_success(self, ids): |
| self.set_task_instance_state(ids, State.SUCCESS) |
| |
| @action('set_retry', "Set state to 'up_for_retry'", None) |
| def action_set_retry(self, ids): |
| self.set_task_instance_state(ids, State.UP_FOR_RETRY) |
| |
| @action('delete', |
| lazy_gettext('Delete'), |
| lazy_gettext('Are you sure you want to delete selected records?')) |
| def action_delete(self, ids): |
| """ |
| As a workaround for AIRFLOW-277, this method overrides Flask-Admin's ModelView.action_delete(). |
| |
| TODO: this method should be removed once the below bug is fixed on Flask-Admin side. |
| https://github.com/flask-admin/flask-admin/issues/1226 |
| """ |
| if 'sqlite' in conf.get('core', 'sql_alchemy_conn'): |
| self.delete_task_instances(ids) |
| else: |
| super(TaskInstanceModelView, self).action_delete(ids) |
| |
| @provide_session |
| def set_task_instance_state(self, ids, target_state, session=None): |
| try: |
| TI = models.TaskInstance |
| count = len(ids) |
| for id in ids: |
| task_id, dag_id, execution_date = id.split(',') |
| execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') |
| ti = session.query(TI).filter(TI.task_id == task_id, |
| TI.dag_id == dag_id, |
| TI.execution_date == execution_date).one() |
| ti.state = target_state |
| session.commit() |
| flash( |
| "{count} task instances were set to '{target_state}'".format(**locals())) |
| except Exception as ex: |
| if not self.handle_view_exception(ex): |
| raise Exception("Ooops") |
| flash('Failed to set state', 'error') |
| |
| @provide_session |
| def delete_task_instances(self, ids, session=None): |
| try: |
| TI = models.TaskInstance |
| count = 0 |
| for id in ids: |
| task_id, dag_id, execution_date = id.split(',') |
| execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') |
| count += session.query(TI).filter(TI.task_id == task_id, |
| TI.dag_id == dag_id, |
| TI.execution_date == execution_date).delete() |
| session.commit() |
| flash("{count} task instances were deleted".format(**locals())) |
| except Exception as ex: |
| if not self.handle_view_exception(ex): |
| raise Exception("Ooops") |
| flash('Failed to delete', 'error') |
| |
| def get_one(self, id): |
| """ |
| As a workaround for AIRFLOW-252, this method overrides Flask-Admin's ModelView.get_one(). |
| |
| TODO: this method should be removed once the below bug is fixed on Flask-Admin side. |
| https://github.com/flask-admin/flask-admin/issues/1226 |
| """ |
| task_id, dag_id, execution_date = iterdecode(id) |
| execution_date = dateutil.parser.parse(execution_date) |
| return self.session.query(self.model).get((task_id, dag_id, execution_date)) |
| |
| |
| class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView): |
| create_template = 'airflow/conn_create.html' |
| edit_template = 'airflow/conn_edit.html' |
| list_template = 'airflow/conn_list.html' |
| form_columns = ( |
| 'conn_id', |
| 'conn_type', |
| 'host', |
| 'schema', |
| 'login', |
| 'password', |
| 'port', |
| 'extra', |
| 'extra__jdbc__drv_path', |
| 'extra__jdbc__drv_clsname', |
| 'extra__google_cloud_platform__project', |
| 'extra__google_cloud_platform__key_path', |
| 'extra__google_cloud_platform__keyfile_dict', |
| 'extra__google_cloud_platform__scope', |
| ) |
| verbose_name = "Connection" |
| verbose_name_plural = "Connections" |
| column_default_sort = ('conn_id', False) |
| column_list = ('conn_id', 'conn_type', 'host', 'port', 'is_encrypted', 'is_extra_encrypted',) |
| form_overrides = dict(_password=PasswordField, _extra=TextAreaField) |
| form_widget_args = { |
| 'is_extra_encrypted': {'disabled': True}, |
| 'is_encrypted': {'disabled': True}, |
| } |
| # Used to customized the form, the forms elements get rendered |
| # and results are stored in the extra field as json. All of these |
| # need to be prefixed with extra__ and then the conn_type ___ as in |
| # extra__{conn_type}__name. You can also hide form elements and rename |
| # others from the connection_form.js file |
| form_extra_fields = { |
| 'extra__jdbc__drv_path': StringField('Driver Path'), |
| 'extra__jdbc__drv_clsname': StringField('Driver Class'), |
| 'extra__google_cloud_platform__project': StringField('Project Id'), |
| 'extra__google_cloud_platform__key_path': StringField('Keyfile Path'), |
| 'extra__google_cloud_platform__keyfile_dict': PasswordField('Keyfile JSON'), |
| 'extra__google_cloud_platform__scope': StringField('Scopes (comma seperated)'), |
| |
| } |
| form_choices = { |
| 'conn_type': models.Connection._types |
| } |
| |
| def on_model_change(self, form, model, is_created): |
| formdata = form.data |
| if formdata['conn_type'] in ['jdbc', 'google_cloud_platform']: |
| extra = { |
| key: formdata[key] |
| for key in self.form_extra_fields.keys() if key in formdata} |
| model.extra = json.dumps(extra) |
| |
| @classmethod |
| def alert_fernet_key(cls): |
| fk = None |
| try: |
| fk = conf.get('core', 'fernet_key') |
| except: |
| pass |
| return fk is None |
| |
| @classmethod |
| def is_secure(cls): |
| """ |
| Used to display a message in the Connection list view making it clear |
| that the passwords and `extra` field can't be encrypted. |
| """ |
| is_secure = False |
| try: |
| import cryptography |
| conf.get('core', 'fernet_key') |
| is_secure = True |
| except: |
| pass |
| return is_secure |
| |
| def on_form_prefill(self, form, id): |
| try: |
| d = json.loads(form.data.get('extra', '{}')) |
| except Exception: |
| d = {} |
| |
| for field in list(self.form_extra_fields.keys()): |
| value = d.get(field, '') |
| if value: |
| field = getattr(form, field) |
| field.data = value |
| |
| |
| class UserModelView(wwwutils.SuperUserMixin, AirflowModelView): |
| verbose_name = "User" |
| verbose_name_plural = "Users" |
| column_default_sort = 'username' |
| |
| |
| class VersionView(wwwutils.SuperUserMixin, BaseView): |
| @expose('/') |
| def version(self): |
| # Look at the version from setup.py |
| try: |
| airflow_version = pkg_resources.require("apache-airflow")[0].version |
| except Exception as e: |
| airflow_version = None |
| logging.error(e) |
| |
| # Get the Git repo and git hash |
| git_version = None |
| try: |
| with open(os.path.join(*[settings.AIRFLOW_HOME, 'airflow', 'git_version'])) as f: |
| git_version = f.readline() |
| except Exception as e: |
| logging.error(e) |
| |
| # Render information |
| title = "Version Info" |
| return self.render('airflow/version.html', |
| title=title, |
| airflow_version=airflow_version, |
| git_version=git_version) |
| |
| |
| class ConfigurationView(wwwutils.SuperUserMixin, BaseView): |
| @expose('/') |
| def conf(self): |
| raw = request.args.get('raw') == "true" |
| title = "Airflow Configuration" |
| subtitle = conf.AIRFLOW_CONFIG |
| if conf.getboolean("webserver", "expose_config"): |
| with open(conf.AIRFLOW_CONFIG, 'r') as f: |
| config = f.read() |
| table = [(section, key, value, source) |
| for section, parameters in conf.as_dict(True, True).items() |
| for key, (value, source) in parameters.items()] |
| |
| else: |
| config = ( |
| "# You Airflow administrator chose not to expose the " |
| "configuration, most likely for security reasons.") |
| table = None |
| if raw: |
| return Response( |
| response=config, |
| status=200, |
| mimetype="application/text") |
| else: |
| code_html = Markup(highlight( |
| config, |
| lexers.IniLexer(), # Lexer call |
| HtmlFormatter(noclasses=True)) |
| ) |
| return self.render( |
| 'airflow/config.html', |
| pre_subtitle=settings.HEADER + " v" + airflow.__version__, |
| code_html=code_html, title=title, subtitle=subtitle, |
| table=table) |
| |
| |
| class DagModelView(wwwutils.SuperUserMixin, ModelView): |
| column_list = ('dag_id', 'owners') |
| column_editable_list = ('is_paused',) |
| form_excluded_columns = ('is_subdag', 'is_active') |
| column_searchable_list = ('dag_id',) |
| column_filters = ( |
| 'dag_id', 'owners', 'is_paused', 'is_active', 'is_subdag', |
| 'last_scheduler_run', 'last_expired') |
| form_widget_args = { |
| 'last_scheduler_run': {'disabled': True}, |
| 'fileloc': {'disabled': True}, |
| 'is_paused': {'disabled': True}, |
| 'last_pickled': {'disabled': True}, |
| 'pickle_id': {'disabled': True}, |
| 'last_loaded': {'disabled': True}, |
| 'last_expired': {'disabled': True}, |
| 'pickle_size': {'disabled': True}, |
| 'scheduler_lock': {'disabled': True}, |
| 'owners': {'disabled': True}, |
| } |
| column_formatters = dict( |
| dag_id=dag_link, |
| ) |
| can_delete = False |
| can_create = False |
| page_size = PAGE_SIZE |
| list_template = 'airflow/list_dags.html' |
| named_filter_urls = True |
| |
| def get_query(self): |
| """ |
| Default filters for model |
| """ |
| return ( |
| super(DagModelView, self) |
| .get_query() |
| .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) |
| .filter(~models.DagModel.is_subdag) |
| ) |
| |
| def get_count_query(self): |
| """ |
| Default filters for model |
| """ |
| return ( |
| super(DagModelView, self) |
| .get_count_query() |
| .filter(models.DagModel.is_active) |
| .filter(~models.DagModel.is_subdag) |
| ) |