fix(providers/standard): add response_timeout to HITLOperator to prevent race with execution_timeout (#63475)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
diff --git a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
index 96404a0..dc26aa1 100644
--- a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
+++ b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
@@ -118,7 +118,7 @@
subject="Please choose option to proceed: ",
options=["option 7", "option 8", "option 9"],
defaults=["option 7"],
- execution_timeout=datetime.timedelta(seconds=1),
+ response_timeout=datetime.timedelta(seconds=1),
notifiers=[hitl_request_callback],
on_success_callback=hitl_success_callback,
on_failure_callback=hitl_failure_callback,
@@ -136,7 +136,7 @@
Timeout Option: {{ ti.xcom_pull(task_ids='wait_for_default_option')["chosen_options"] }}
""",
defaults="Reject",
- execution_timeout=datetime.timedelta(minutes=5),
+ response_timeout=datetime.timedelta(minutes=5),
notifiers=[hitl_request_callback],
on_success_callback=hitl_success_callback,
on_failure_callback=hitl_failure_callback,
diff --git a/providers/standard/src/airflow/providers/standard/operators/hitl.py b/providers/standard/src/airflow/providers/standard/operators/hitl.py
index b009422..d3199a7 100644
--- a/providers/standard/src/airflow/providers/standard/operators/hitl.py
+++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py
@@ -17,7 +17,9 @@
from __future__ import annotations
import logging
+import warnings
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.common.compat.sdk import AirflowOptionalProviderFeatureException
from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_3_PLUS, AIRFLOW_V_3_1_PLUS
@@ -25,6 +27,7 @@
raise AirflowOptionalProviderFeatureException("Human in the loop functionality needs Airflow 3.1+.")
from collections.abc import Collection, Mapping, Sequence
+from datetime import timedelta
from typing import TYPE_CHECKING, Any
from urllib.parse import ParseResult, urlencode, urlparse, urlunparse
@@ -55,6 +58,9 @@
:param params: dictionary of parameter definitions that are in the format of Dag params such that
a Form Field can be rendered. Entered data is validated (schema, required fields) like for a Dag run
and added to XCom of the task result.
+ :param response_timeout: Maximum time to wait for a human response after deferring to the trigger.
+ This is separate from ``execution_timeout`` which controls the pre-defer execution phase.
+ If not set, no timeout is applied to the human response wait.
"""
template_fields: Collection[str] = ("subject", "body")
@@ -70,9 +76,26 @@
params: ParamsDict | dict[str, Any] | None = None,
notifiers: Sequence[BaseNotifier] | BaseNotifier | None = None,
assigned_users: HITLUser | list[HITLUser] | None = None,
+ response_timeout: timedelta | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
+
+ # Handle backward compatibility: if execution_timeout is set but response_timeout is not,
+ # migrate execution_timeout to response_timeout and clear it to prevent the BaseOperator
+ # timeout from racing the defer() call.
+ if self.execution_timeout and not response_timeout:
+ warnings.warn(
+ "Passing `execution_timeout` to HITLOperator to control the human response wait is "
+ "deprecated. Use `response_timeout` instead. `execution_timeout` will be cleared to "
+ "prevent it from killing the task before defer() is reached.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ response_timeout = self.execution_timeout
+ self.execution_timeout = None
+
+ self.response_timeout = response_timeout
self.subject = subject
self.body = body
@@ -160,8 +183,8 @@
assigned_users=self.assigned_users,
)
- if self.execution_timeout:
- timeout_datetime = utcnow() + self.execution_timeout
+ if self.response_timeout:
+ timeout_datetime = utcnow() + self.response_timeout
else:
timeout_datetime = None
diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py b/providers/standard/tests/unit/standard/operators/test_hitl.py
index 6a71c3d..e1ebae5 100644
--- a/providers/standard/tests/unit/standard/operators/test_hitl.py
+++ b/providers/standard/tests/unit/standard/operators/test_hitl.py
@@ -32,6 +32,7 @@
from sqlalchemy import select
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import TaskInstance, Trigger
from airflow.models.hitl import HITLDetail
from airflow.providers.common.compat.sdk import AirflowException, DownstreamTasksSkipped, ParamValidationError
@@ -1028,13 +1029,13 @@
"serialized_params": None,
}
- def test_execute_enriches_summary_with_timeout(self) -> None:
- """execute() adds timeout_datetime; all other init keys remain."""
+ def test_execute_enriches_summary_with_response_timeout(self) -> None:
+ """execute() adds timeout_datetime using response_timeout; all other init keys remain."""
op = HITLOperator(
task_id="test",
subject="Review",
options=["OK"],
- execution_timeout=datetime.timedelta(minutes=10),
+ response_timeout=datetime.timedelta(minutes=10),
)
with (
@@ -1084,6 +1085,32 @@
"timeout_datetime": None,
}
+ def test_execution_timeout_deprecated_and_migrated(self) -> None:
+ """execution_timeout is migrated to response_timeout with a deprecation warning."""
+ with pytest.warns(AirflowProviderDeprecationWarning, match="Use `response_timeout` instead"):
+ op = HITLOperator(
+ task_id="test",
+ subject="Review",
+ options=["OK"],
+ execution_timeout=datetime.timedelta(minutes=10),
+ )
+
+ assert op.response_timeout == datetime.timedelta(minutes=10)
+ assert op.execution_timeout is None
+
+ def test_response_timeout_does_not_clear_execution_timeout(self) -> None:
+ """When response_timeout is set, execution_timeout is left untouched."""
+ op = HITLOperator(
+ task_id="test",
+ subject="Review",
+ options=["OK"],
+ response_timeout=datetime.timedelta(minutes=5),
+ execution_timeout=datetime.timedelta(minutes=30),
+ )
+
+ assert op.response_timeout == datetime.timedelta(minutes=5)
+ assert op.execution_timeout == datetime.timedelta(minutes=30)
+
def test_hitl_operator_execute_complete_enriches_summary(self) -> None:
"""execute_complete() adds response fields directly into hitl_summary."""
op = HITLOperator(
@@ -1257,7 +1284,7 @@
task_id="test",
subject="Release v2.0?",
body="Please approve the production deployment.",
- execution_timeout=datetime.timedelta(minutes=30),
+ response_timeout=datetime.timedelta(minutes=30),
)
# -- After __init__: only base + approval keys --