[AIRFLOW-5660] Attempt to find the task in DB from Kubernetes pod labels (#6340)

Try to find the task in DB before regressing to searching every task,
and explicitly warn about the performance regressions.

Co-Authored-By: Ash Berlin-Taylor <ash_github@firemirror.com>
(cherry picked from commit 66e2c22e1615c0999747d0c38355163e877872e7)
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index ecef411..52692da 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -643,6 +643,27 @@
             return None
 
         with create_session() as session:
+            task = (
+                session
+                .query(TaskInstance)
+                .filter_by(task_id=task_id, dag_id=dag_id, execution_date=ex_time)
+                .one_or_none()
+            )
+            if task:
+                self.log.info(
+                    'Found matching task %s-%s (%s) with current state of %s',
+                    task.dag_id, task.task_id, task.execution_date, task.state
+                )
+                return (dag_id, task_id, ex_time, try_num)
+            else:
+                self.log.warning(
+                    'task_id/dag_id are not safe to use as Kubernetes labels. This can cause '
+                    'severe performance regressions. Please see '
+                    '<https://kubernetes.io/docs/concepts/overview/working-with-objects'
+                    '/labels/#syntax-and-character-set>. '
+                    'Given dag_id: %s, task_id: %s', task_id, dag_id
+                )
+
             tasks = (
                 session
                 .query(TaskInstance)