Merge pull request #975 from jtschoonhoven/issue-974
statuses column on /admin shows only active or most recent dag_runs
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index 96ed9b1..ded3722 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -20,7 +20,8 @@
<th>DAG</th>
<th>Schedule</th>
<th>Owner</th>
- <th style="padding-left: 5px;">Statuses
+ <th style="padding-left: 5px;">Recent Statuses
+ <span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true" title="Status of tasks from all active DAG runs or, if not currently active, from most recent run."></span>
<img id="loading" width="15" src="{{ url_for("static", filename="loading.gif") }}">
</th>
<th class="text-center">Links</th>
@@ -211,6 +212,7 @@
d3.select("#loading").remove();
}
$("#pause_header").tooltip();
+ $("#statuses_info").tooltip();
$("circle").tooltip({
html: true,
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 837899e..07f616c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -16,7 +16,7 @@
import traceback
import sqlalchemy as sqla
-from sqlalchemy import or_, desc
+from sqlalchemy import or_, desc, and_
from flask import redirect, url_for, request, Markup, Response, current_app, render_template
@@ -548,13 +548,36 @@
task_ids += dag.task_ids
if not dag.is_subdag:
dag_ids.append(dag.dag_id)
+
TI = models.TaskInstance
+ DagRun = models.DagRun
session = Session()
+
+ LastDagRun = (
+ session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
+ .group_by(DagRun.dag_id)
+ .subquery('last_dag_run')
+ )
+
+ # Select all task_instances from active dag_runs.
+ # If no dag_run is active, return task instances from most recent dag_run.
qry = (
session.query(TI.dag_id, TI.state, sqla.func.count(TI.task_id))
- .filter(TI.task_id.in_(task_ids))
- .filter(TI.dag_id.in_(dag_ids))
- .group_by(TI.dag_id, TI.state)
+ .outerjoin(DagRun, and_(
+ DagRun.dag_id == TI.dag_id,
+ DagRun.execution_date == TI.execution_date,
+ DagRun.state == State.RUNNING))
+ .outerjoin(LastDagRun, and_(
+ LastDagRun.c.dag_id == TI.dag_id,
+ LastDagRun.c.execution_date == TI.execution_date)
+ )
+ .filter(TI.task_id.in_(task_ids))
+ .filter(TI.dag_id.in_(dag_ids))
+ .filter(or_(
+ DagRun.dag_id != None,
+ LastDagRun.c.dag_id != None
+ ))
+ .group_by(TI.dag_id, TI.state)
)
data = {}