[AIRFLOW-5660] Attempt to find the task in DB from Kubernetes pod labels (#6340)
Try to find the task in DB before regressing to searching every task,
and explicitly warn about the performance regressions.
Co-Authored-By: Ash Berlin-Taylor <ash_github@firemirror.com>
(cherry picked from commit 66e2c22e1615c0999747d0c38355163e877872e7)
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index ecef411..52692da 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -643,6 +643,27 @@
return None
with create_session() as session:
+ task = (
+ session
+ .query(TaskInstance)
+ .filter_by(task_id=task_id, dag_id=dag_id, execution_date=ex_time)
+ .one_or_none()
+ )
+ if task:
+ self.log.info(
+ 'Found matching task %s-%s (%s) with current state of %s',
+ task.dag_id, task.task_id, task.execution_date, task.state
+ )
+ return (dag_id, task_id, ex_time, try_num)
+ else:
+ self.log.warning(
+ 'task_id/dag_id are not safe to use as Kubernetes labels. This can cause '
+ 'severe performance regressions. Please see '
+ '<https://kubernetes.io/docs/concepts/overview/working-with-objects'
+ '/labels/#syntax-and-character-set>. '
+ 'Given dag_id: %s, task_id: %s', task_id, dag_id
+ )
+
tasks = (
session
.query(TaskInstance)