Merge pull request #975 from jtschoonhoven/issue-974

statuses column on /admin shows only active or most recent dag_runs
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index 96ed9b1..ded3722 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -20,7 +20,8 @@
                 <th>DAG</th>
                 <th>Schedule</th>
                 <th>Owner</th>
-                <th style="padding-left: 5px;">Statuses
+                <th style="padding-left: 5px;">Recent Statuses
+                  <span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true" title="Status of tasks from all active DAG runs or, if not currently active, from most recent run."></span>
                   <img id="loading" width="15" src="{{ url_for("static", filename="loading.gif") }}">
                 </th>
                 <th class="text-center">Links</th>
@@ -211,6 +212,7 @@
             d3.select("#loading").remove();
         }
         $("#pause_header").tooltip();
+        $("#statuses_info").tooltip();
 
         $("circle").tooltip({
           html: true,
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 837899e..07f616c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -16,7 +16,7 @@
 import traceback
 
 import sqlalchemy as sqla
-from sqlalchemy import or_, desc
+from sqlalchemy import or_, desc, and_
 
 
 from flask import redirect, url_for, request, Markup, Response, current_app, render_template
@@ -548,13 +548,36 @@
             task_ids += dag.task_ids
             if not dag.is_subdag:
                 dag_ids.append(dag.dag_id)
+
         TI = models.TaskInstance
+        DagRun = models.DagRun
         session = Session()
+
+        LastDagRun = (
+            session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
+            .group_by(DagRun.dag_id)
+            .subquery('last_dag_run')
+        )
+
+        # Select all task_instances from active dag_runs.
+        # If no dag_run is active, return task instances from most recent dag_run.
         qry = (
             session.query(TI.dag_id, TI.state, sqla.func.count(TI.task_id))
-                .filter(TI.task_id.in_(task_ids))
-                .filter(TI.dag_id.in_(dag_ids))
-                .group_by(TI.dag_id, TI.state)
+            .outerjoin(DagRun, and_(
+                DagRun.dag_id == TI.dag_id,
+                DagRun.execution_date == TI.execution_date,
+                DagRun.state == State.RUNNING))
+            .outerjoin(LastDagRun, and_(
+                LastDagRun.c.dag_id == TI.dag_id,
+                LastDagRun.c.execution_date == TI.execution_date)
+            )
+            .filter(TI.task_id.in_(task_ids))
+            .filter(TI.dag_id.in_(dag_ids))
+            .filter(or_(
+                DagRun.dag_id != None,
+                LastDagRun.c.dag_id != None
+            ))
+            .group_by(TI.dag_id, TI.state)
         )
 
         data = {}