fix(providers/standard): add response_timeout to HITLOperator to prevent race with execution_timeout (#63475)

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
diff --git a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
index 96404a0..dc26aa1 100644
--- a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
+++ b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
@@ -118,7 +118,7 @@
         subject="Please choose option to proceed: ",
         options=["option 7", "option 8", "option 9"],
         defaults=["option 7"],
-        execution_timeout=datetime.timedelta(seconds=1),
+        response_timeout=datetime.timedelta(seconds=1),
         notifiers=[hitl_request_callback],
         on_success_callback=hitl_success_callback,
         on_failure_callback=hitl_failure_callback,
@@ -136,7 +136,7 @@
         Timeout Option: {{ ti.xcom_pull(task_ids='wait_for_default_option')["chosen_options"] }}
         """,
         defaults="Reject",
-        execution_timeout=datetime.timedelta(minutes=5),
+        response_timeout=datetime.timedelta(minutes=5),
         notifiers=[hitl_request_callback],
         on_success_callback=hitl_success_callback,
         on_failure_callback=hitl_failure_callback,
diff --git a/providers/standard/src/airflow/providers/standard/operators/hitl.py b/providers/standard/src/airflow/providers/standard/operators/hitl.py
index b009422..d3199a7 100644
--- a/providers/standard/src/airflow/providers/standard/operators/hitl.py
+++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py
@@ -17,7 +17,9 @@
 from __future__ import annotations
 
 import logging
+import warnings
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.common.compat.sdk import AirflowOptionalProviderFeatureException
 from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_3_PLUS, AIRFLOW_V_3_1_PLUS
 
@@ -25,6 +27,7 @@
     raise AirflowOptionalProviderFeatureException("Human in the loop functionality needs Airflow 3.1+.")
 
 from collections.abc import Collection, Mapping, Sequence
+from datetime import timedelta
 from typing import TYPE_CHECKING, Any
 from urllib.parse import ParseResult, urlencode, urlparse, urlunparse
 
@@ -55,6 +58,9 @@
     :param params: dictionary of parameter definitions that are in the format of Dag params such that
         a Form Field can be rendered. Entered data is validated (schema, required fields) like for a Dag run
         and added to XCom of the task result.
+    :param response_timeout: Maximum time to wait for a human response after deferring to the trigger.
+        This is separate from ``execution_timeout`` which controls the pre-defer execution phase.
+        If not set, no timeout is applied to the human response wait.
     """
 
     template_fields: Collection[str] = ("subject", "body")
@@ -70,9 +76,26 @@
         params: ParamsDict | dict[str, Any] | None = None,
         notifiers: Sequence[BaseNotifier] | BaseNotifier | None = None,
         assigned_users: HITLUser | list[HITLUser] | None = None,
+        response_timeout: timedelta | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
+
+        # Handle backward compatibility: if execution_timeout is set but response_timeout is not,
+        # migrate execution_timeout to response_timeout and clear it to prevent the BaseOperator
+        # timeout from racing the defer() call.
+        if self.execution_timeout and not response_timeout:
+            warnings.warn(
+                "Passing `execution_timeout` to HITLOperator to control the human response wait is "
+                "deprecated. Use `response_timeout` instead. `execution_timeout` will be cleared to "
+                "prevent it from killing the task before defer() is reached.",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
+            response_timeout = self.execution_timeout
+            self.execution_timeout = None
+
+        self.response_timeout = response_timeout
         self.subject = subject
         self.body = body
 
@@ -160,8 +183,8 @@
             assigned_users=self.assigned_users,
         )
 
-        if self.execution_timeout:
-            timeout_datetime = utcnow() + self.execution_timeout
+        if self.response_timeout:
+            timeout_datetime = utcnow() + self.response_timeout
         else:
             timeout_datetime = None
 
diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py b/providers/standard/tests/unit/standard/operators/test_hitl.py
index 6a71c3d..e1ebae5 100644
--- a/providers/standard/tests/unit/standard/operators/test_hitl.py
+++ b/providers/standard/tests/unit/standard/operators/test_hitl.py
@@ -32,6 +32,7 @@
 
 from sqlalchemy import select
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.models import TaskInstance, Trigger
 from airflow.models.hitl import HITLDetail
 from airflow.providers.common.compat.sdk import AirflowException, DownstreamTasksSkipped, ParamValidationError
@@ -1028,13 +1029,13 @@
             "serialized_params": None,
         }
 
-    def test_execute_enriches_summary_with_timeout(self) -> None:
-        """execute() adds timeout_datetime; all other init keys remain."""
+    def test_execute_enriches_summary_with_response_timeout(self) -> None:
+        """execute() adds timeout_datetime using response_timeout; all other init keys remain."""
         op = HITLOperator(
             task_id="test",
             subject="Review",
             options=["OK"],
-            execution_timeout=datetime.timedelta(minutes=10),
+            response_timeout=datetime.timedelta(minutes=10),
         )
 
         with (
@@ -1084,6 +1085,32 @@
             "timeout_datetime": None,
         }
 
+    def test_execution_timeout_deprecated_and_migrated(self) -> None:
+        """execution_timeout is migrated to response_timeout with a deprecation warning."""
+        with pytest.warns(AirflowProviderDeprecationWarning, match="Use `response_timeout` instead"):
+            op = HITLOperator(
+                task_id="test",
+                subject="Review",
+                options=["OK"],
+                execution_timeout=datetime.timedelta(minutes=10),
+            )
+
+        assert op.response_timeout == datetime.timedelta(minutes=10)
+        assert op.execution_timeout is None
+
+    def test_response_timeout_does_not_clear_execution_timeout(self) -> None:
+        """When response_timeout is set, execution_timeout is left untouched."""
+        op = HITLOperator(
+            task_id="test",
+            subject="Review",
+            options=["OK"],
+            response_timeout=datetime.timedelta(minutes=5),
+            execution_timeout=datetime.timedelta(minutes=30),
+        )
+
+        assert op.response_timeout == datetime.timedelta(minutes=5)
+        assert op.execution_timeout == datetime.timedelta(minutes=30)
+
     def test_hitl_operator_execute_complete_enriches_summary(self) -> None:
         """execute_complete() adds response fields directly into hitl_summary."""
         op = HITLOperator(
@@ -1257,7 +1284,7 @@
             task_id="test",
             subject="Release v2.0?",
             body="Please approve the production deployment.",
-            execution_timeout=datetime.timedelta(minutes=30),
+            response_timeout=datetime.timedelta(minutes=30),
         )
 
         # -- After __init__: only base + approval keys --