[AIRFLOW-970] Load latest_runs on homepage async
The latest_runs column on the homepage loads
synchronously with an n+1
query. Homepage loads will be significantly faster
if this happens
asynchronously and as a batch.
Closes #2144 from saguziel/aguziel-latest-run-
async
(cherry picked from commit 0f7ddbbedb05f2f11500250db4989edcb27bc164)
diff --git a/airflow/models.py b/airflow/models.py
index 1ceb821..646f74b 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4249,6 +4249,29 @@
return False
+ @classmethod
+ @provide_session
+ def get_latest_runs(cls, session):
+ """Returns the latest running DagRun for each DAG. """
+ subquery = (
+ session
+ .query(
+ cls.dag_id,
+ func.max(cls.execution_date).label('execution_date'))
+ .filter(cls.state == State.RUNNING)
+ .group_by(cls.dag_id)
+ .subquery()
+ )
+ dagruns = (
+ session
+ .query(cls)
+ .join(subquery,
+ and_(cls.dag_id == subquery.c.dag_id,
+ cls.execution_date == subquery.c.execution_date))
+ .all()
+ )
+ return dagruns
+
class Pool(Base):
__tablename__ = "slot_pool"
diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py
index 56b9d79..63355c7 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -20,7 +20,8 @@
from airflow.www.app import csrf
from flask import (
- g, Markup, Blueprint, redirect, jsonify, abort, request, current_app, send_file
+ g, Markup, Blueprint, redirect, jsonify, abort,
+ request, current_app, send_file, url_for
)
from datetime import datetime
@@ -110,3 +111,23 @@
task = dag.get_task(task_id)
fields = {k: str(v) for k, v in vars(task).items() if not k.startswith('_')}
return jsonify(fields)
+
+
+@api_experimental.route('/latest_runs', methods=['GET'])
+@requires_authentication
+def latest_dag_runs():
+ """Returns the latest running DagRun for each DAG formatted for the UI. """
+ from airflow.models import DagRun
+ dagruns = DagRun.get_latest_runs()
+ payload = []
+ for dagrun in dagruns:
+ if dagrun.execution_date:
+ payload.append({
+ 'dag_id': dagrun.dag_id,
+ 'execution_date': dagrun.execution_date.strftime("%Y-%m-%d %H:%M"),
+ 'start_date': ((dagrun.start_date or '') and
+ dagrun.start_date.strftime("%Y-%m-%d %H:%M")),
+ 'dag_run_url': url_for('airflow.graph', dag_id=dagrun.dag_id,
+ execution_date=dagrun.execution_date)
+ })
+ return jsonify(payload)
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index 7c59dea..c0dbc62 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -105,19 +105,7 @@
</td>
<!-- Column 7: Last Run -->
- <td class="text-nowrap">
- {% if dag %}
- {% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
- {% if last_run and last_run.start_date %}
- <a href="{{ url_for('airflow.graph', dag_id=last_run.dag_id, execution_date=last_run.execution_date ) }}">
- {{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
- </a> <span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true" title="Start Date: {{last_run.start_date.strftime('%Y-%m-%d %H:%M')}}"></span>
- {% else %}
- <!--No DAG Runs-->
- {% endif %}
- {% else %}
- <!--No DAG Runs-->
- {% endif %}
+ <td class="text-nowrap latest_dag_run {{ dag.dag_id }}">
</td>
<!-- Column 8: Dag Runs -->
@@ -237,6 +225,21 @@
}
});
});
+ $.getJSON("{{ url_for('api_experimental.latest_dag_runs') }}", function(data) {
+ $.each(data, function() {
+ var link = $("<a>", {
+ href: this.dag_run_url,
+ text: this.execution_date
+ });
+ var info_icon = $('<span>', {
+ "aria-hidden": "true",
+ id: "statuses_info",
+ title: "Start Date: " + this.start_date,
+ "class": "glyphicon glyphicon-info-sign"
+ });
+ $('.latest_dag_run.' + this.dag_id).append(link).append(info_icon);
+ });
+ });
d3.json("{{ url_for('airflow.dag_stats') }}", function(error, json) {
for(var dag_id in json) {
states = json[dag_id];
diff --git a/tests/dags/test_latest_runs.py b/tests/dags/test_latest_runs.py
new file mode 100644
index 0000000..dd04c0e
--- /dev/null
+++ b/tests/dags/test_latest_runs.py
@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from datetime import datetime
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+
+for i in range(1, 2):
+ dag = DAG(dag_id='test_latest_runs_{}'.format(i))
+ task = DummyOperator(
+ task_id='dummy_task',
+ dag=dag,
+ owner='airflow',
+ start_date=datetime(2016, 2, 1))
diff --git a/tests/models.py b/tests/models.py
index da36d56..392de03 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -468,7 +468,7 @@
dag = DAG(
dag_id='test_latest_runs_1',
start_date=DEFAULT_DATE)
- dag_1_run_1 = self.create_dag_run(dag,
+ dag_1_run_1 = self.create_dag_run(dag,
execution_date=datetime.datetime(2015, 1, 1))
dag_1_run_2 = self.create_dag_run(dag,
execution_date=datetime.datetime(2015, 1, 2))