Fix broken SLA Mechanism (#14056)
closes https://github.com/apache/airflow/issues/14050
We were not de-serializing `BaseOperator.sla` properly, hence
we were returning float instead of `timedelta` object.
Example: 100.0 instead of timedelta(seconds=100)
And because we had a check in _manage_sla in `SchedulerJob` and `DagFileProcessor`,
we were skipping SLA.
SchedulerJob:
https://github.com/apache/airflow/blob/88bdcfa0df5bcb4c489486e05826544b428c8f43/airflow/jobs/scheduler_job.py#L1766-L1768
DagFileProcessor:
https://github.com/apache/airflow/blob/88bdcfa0df5bcb4c489486e05826544b428c8f43/airflow/jobs/scheduler_job.py#L395-L397
(cherry picked from commit 604a37eee50715db345c5a7afed085c9afe8530d)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 3b213ab..f25ae02 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -393,6 +393,7 @@
We are assuming that the scheduler runs often, so we only check for
tasks that should have succeeded in the past hour.
"""
+ self.log.info("Running SLA Checks for %s", dag.dag_id)
if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks):
self.log.info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag)
return
@@ -552,6 +553,7 @@
:param session: DB session.
"""
for request in callback_requests:
+ self.log.debug("Processing Callback Request: %s", request)
try:
if isinstance(request, TaskCallbackRequest):
self._execute_task_callbacks(dagbag, request)
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index f59ce6a..38df10a 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -451,7 +451,7 @@
v = set(v)
elif k == "subdag":
v = SerializedDAG.deserialize_dag(v)
- elif k in {"retry_delay", "execution_timeout"}:
+ elif k in {"retry_delay", "execution_timeout", "sla"}:
v = cls._deserialize_timedelta(v)
elif k in encoded_op["template_fields"]:
pass
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 0d97452..3867833 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3525,6 +3525,9 @@
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
DummyOperator(task_id='task1', dag=dag, sla=timedelta(seconds=60))
+ # Used Serialized DAG as Serialized DAG is used in Scheduler
+ dag = SerializedDAG.from_json(SerializedDAG.to_json(dag))
+
with patch.object(settings, "CHECK_SLAS", True):
scheduler_job = SchedulerJob(subdir=os.devnull)
mock_agent = mock.MagicMock()
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 1b7524a..2046e22 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -60,6 +60,7 @@
"depends_on_past": False,
"retries": 1,
"retry_delay": {"__type": "timedelta", "__var": 300.0},
+ "sla": {"__type": "timedelta", "__var": 100.0},
},
},
"start_date": 1564617600.0,
@@ -84,6 +85,7 @@
"owner": "airflow",
"retries": 1,
"retry_delay": 300.0,
+ "sla": 100.0,
"_downstream_task_ids": [],
"_inlets": [],
"_is_dummy": False,
@@ -111,6 +113,7 @@
"task_id": "custom_task",
"retries": 1,
"retry_delay": 300.0,
+ "sla": 100.0,
"_downstream_task_ids": [],
"_inlets": [],
"_is_dummy": False,
@@ -158,6 +161,7 @@
"retries": 1,
"retry_delay": timedelta(minutes=5),
"depends_on_past": False,
+ "sla": timedelta(seconds=100),
},
start_date=datetime(2019, 8, 1),
is_paused_upon_creation=False,