Don't auto-add to context just by virtue of arrowing (#33102)
* Don't auto-add to context just by virtue of arrowing
* no add tasks
* Add back removed import
* Fixup tests
* fixup! Fixup tests
---------
Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
diff --git a/airflow/models/taskmixin.py b/airflow/models/taskmixin.py
index 0dd013a..f52749c 100644
--- a/airflow/models/taskmixin.py
+++ b/airflow/models/taskmixin.py
@@ -24,7 +24,6 @@
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.serialization.enums import DagAttributeTypes
-from airflow.utils.setup_teardown import SetupTeardownContext
from airflow.utils.types import NOTSET, ArgNotSet
if TYPE_CHECKING:
@@ -97,15 +96,11 @@
def __lshift__(self, other: DependencyMixin | Sequence[DependencyMixin]):
"""Implements Task << Task."""
self.set_upstream(other)
- self.set_setup_teardown_ctx_dependencies(other)
- self.set_taskgroup_ctx_dependencies(other)
return other
def __rshift__(self, other: DependencyMixin | Sequence[DependencyMixin]):
"""Implements Task >> Task."""
self.set_downstream(other)
- self.set_setup_teardown_ctx_dependencies(other)
- self.set_taskgroup_ctx_dependencies(other)
return other
def __rrshift__(self, other: DependencyMixin | Sequence[DependencyMixin]):
@@ -136,22 +131,6 @@
for o in obj:
yield from cls._iter_references(o)
- def set_setup_teardown_ctx_dependencies(self, other: DependencyMixin | Sequence[DependencyMixin]):
- if not SetupTeardownContext.active:
- return
- for op, _ in self._iter_references([self, other]):
- SetupTeardownContext.update_context_map(op)
-
- def set_taskgroup_ctx_dependencies(self, other: DependencyMixin | Sequence[DependencyMixin]):
- from airflow.utils.task_group import TaskGroupContext
-
- if not TaskGroupContext.active:
- return
- task_group = TaskGroupContext.get_current_task_group(None)
- for op, _ in self._iter_references([self, other]):
- if task_group:
- op.add_to_taskgroup(task_group)
-
class TaskMixin(DependencyMixin):
"""Mixin to provide task-related things.
diff --git a/tests/decorators/test_setup_teardown.py b/tests/decorators/test_setup_teardown.py
index bcfb81b..567ac5c 100644
--- a/tests/decorators/test_setup_teardown.py
+++ b/tests/decorators/test_setup_teardown.py
@@ -1130,59 +1130,6 @@
"mytask",
}
- def test_tasks_decorators_called_outside_context_manager_can_link_up(self, dag_maker):
- @setup
- def setuptask():
- print("setup")
-
- @task()
- def mytask():
- print("mytask")
-
- @task()
- def mytask2():
- print("mytask 2")
-
- @teardown
- def teardowntask():
- print("teardown")
-
- with dag_maker() as dag:
- task1 = mytask()
- task2 = mytask2()
- with setuptask() >> teardowntask():
- task1 >> task2
-
- assert len(dag.task_group.children) == 4
- assert not dag.task_group.children["setuptask"].upstream_task_ids
- assert dag.task_group.children["setuptask"].downstream_task_ids == {"mytask", "teardowntask"}
- assert dag.task_group.children["mytask"].upstream_task_ids == {"setuptask"}
- assert dag.task_group.children["mytask"].downstream_task_ids == {"mytask2"}
- assert dag.task_group.children["mytask2"].upstream_task_ids == {"mytask"}
- assert dag.task_group.children["mytask2"].downstream_task_ids == {"teardowntask"}
- assert dag.task_group.children["teardowntask"].upstream_task_ids == {"mytask2", "setuptask"}
- assert not dag.task_group.children["teardowntask"].downstream_task_ids
-
- def test_classic_tasks_called_outside_context_manager_can_link_up(self, dag_maker):
-
- with dag_maker() as dag:
- setuptask = BashOperator(task_id="setuptask", bash_command="echo 1").as_setup()
- teardowntask = BashOperator(task_id="teardowntask", bash_command="echo 1").as_teardown()
- mytask = BashOperator(task_id="mytask", bash_command="echo 1")
- mytask2 = BashOperator(task_id="mytask2", bash_command="echo 1")
- with setuptask >> teardowntask:
- mytask >> mytask2
-
- assert len(dag.task_group.children) == 4
- assert not dag.task_group.children["setuptask"].upstream_task_ids
- assert dag.task_group.children["setuptask"].downstream_task_ids == {"mytask", "teardowntask"}
- assert dag.task_group.children["mytask"].upstream_task_ids == {"setuptask"}
- assert dag.task_group.children["mytask"].downstream_task_ids == {"mytask2"}
- assert dag.task_group.children["mytask2"].upstream_task_ids == {"mytask"}
- assert dag.task_group.children["mytask2"].downstream_task_ids == {"teardowntask"}
- assert dag.task_group.children["teardowntask"].upstream_task_ids == {"mytask2", "setuptask"}
- assert not dag.task_group.children["teardowntask"].downstream_task_ids
-
def test_tasks_decorators_called_outside_context_manager_can_link_up_with_scope(self, dag_maker):
@setup
def setuptask():
diff --git a/tests/models/test_taskmixin.py b/tests/models/test_taskmixin.py
index c1795f2..95aefd0 100644
--- a/tests/models/test_taskmixin.py
+++ b/tests/models/test_taskmixin.py
@@ -209,100 +209,3 @@
ValueError, match="Cannot mark task 'my_ok_task__2' as setup; task is already a teardown."
):
m.operator.is_setup = True
-
-
-def test_set_setup_teardown_ctx_dependencies_using_decorated_tasks(dag_maker):
-
- with dag_maker():
- t1 = make_task("t1", type_="decorated")
- setuptask = make_task("setuptask", type_="decorated", setup_=True)
- teardowntask = make_task("teardowntask", type_="decorated", teardown_=True)
- with setuptask >> teardowntask as scope:
- scope.add_task(t1)
-
- assert t1.operator.upstream_task_ids == {"setuptask"}
- assert t1.operator.downstream_task_ids == {"teardowntask"}
-
- with dag_maker():
- t1 = make_task("t1", type_="decorated")
- t2 = make_task("t2", type_="decorated")
- setuptask = make_task("setuptask", type_="decorated", setup_=True)
- teardowntask = make_task("teardowntask", type_="decorated", teardown_=True)
- with setuptask >> teardowntask:
- t1 >> t2
- assert t1.operator.upstream_task_ids == {"setuptask"}
- assert t2.operator.downstream_task_ids == {"teardowntask"}
-
- with dag_maker():
- t1 = make_task("t1", type_="decorated")
- t2 = make_task("t2", type_="decorated")
- t3 = make_task("t3", type_="decorated")
- setuptask = make_task("setuptask", type_="decorated", setup_=True)
- teardowntask = make_task("teardowntask", type_="decorated", teardown_=True)
- with setuptask >> teardowntask:
- t1 >> [t2, t3]
-
- assert t1.operator.upstream_task_ids == {"setuptask"}
- assert t2.operator.downstream_task_ids == {"teardowntask"}
- assert t3.operator.downstream_task_ids == {"teardowntask"}
-
- with dag_maker():
- t1 = make_task("t1", type_="decorated")
- t2 = make_task("t2", type_="decorated")
- t3 = make_task("t3", type_="decorated")
- setuptask = make_task("setuptask", type_="decorated", setup_=True)
- teardowntask = make_task("teardowntask", type_="decorated", teardown_=True)
- with setuptask >> teardowntask:
- [t1, t2] >> t3
-
- assert t1.operator.upstream_task_ids == {"setuptask"}
- assert t2.operator.upstream_task_ids == {"setuptask"}
- assert t3.operator.downstream_task_ids == {"teardowntask"}
-
-
-def test_set_setup_teardown_ctx_dependencies_using_classic_tasks(dag_maker):
- with dag_maker():
- t1 = make_task("t1", type_="classic")
- setuptask = make_task("setuptask", type_="classic", setup_=True)
- teardowntask = make_task("teardowntask", type_="classic", teardown_=True)
- with setuptask >> teardowntask as scope:
- scope.add_task(t1)
-
- assert t1.upstream_task_ids == {"setuptask"}
- assert t1.downstream_task_ids == {"teardowntask"}
-
- with dag_maker():
- t1 = make_task("t1", type_="classic")
- t2 = make_task("t2", type_="classic")
- setuptask = make_task("setuptask", type_="classic", setup_=True)
- teardowntask = make_task("teardowntask", type_="classic", teardown_=True)
- with setuptask >> teardowntask:
- t1 >> t2
- assert t1.upstream_task_ids == {"setuptask"}
- assert t2.downstream_task_ids == {"teardowntask"}
-
- with dag_maker():
- t1 = make_task("t1", type_="classic")
- t2 = make_task("t2", type_="classic")
- t3 = make_task("t3", type_="classic")
- setuptask = make_task("setuptask", type_="classic", setup_=True)
- teardowntask = make_task("teardowntask", type_="classic", teardown_=True)
- with setuptask >> teardowntask:
- t1 >> [t2, t3]
-
- assert t1.upstream_task_ids == {"setuptask"}
- assert t2.downstream_task_ids == {"teardowntask"}
- assert t3.downstream_task_ids == {"teardowntask"}
-
- with dag_maker():
- t1 = make_task("t1", type_="classic")
- t2 = make_task("t2", type_="classic")
- t3 = make_task("t3", type_="classic")
- setuptask = make_task("setuptask", type_="classic", setup_=True)
- teardowntask = make_task("teardowntask", type_="classic", teardown_=True)
- with setuptask >> teardowntask:
- [t1, t2] >> t3
-
- assert t1.upstream_task_ids == {"setuptask"}
- assert t2.upstream_task_ids == {"setuptask"}
- assert t3.downstream_task_ids == {"teardowntask"}
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index 4475e71..c021d98 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -23,7 +23,7 @@
import pytest
from airflow.decorators import dag, task as task_decorator, task_group as task_group_decorator
-from airflow.exceptions import AirflowException, TaskAlreadyInTaskGroup
+from airflow.exceptions import TaskAlreadyInTaskGroup
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
@@ -1479,72 +1479,3 @@
tg1 >> w2
assert t1.downstream_task_ids == set()
assert w1.downstream_task_ids == {"tg1.t1", "w2"}
-
-
-def test_tasks_defined_outside_taskgrooup(dag_maker):
- # Test that classic tasks defined outside a task group are added to the root task group
- # when the relationships are defined inside the task group
- with dag_maker() as dag:
- t1 = make_task("t1")
- t2 = make_task("t2")
- t3 = make_task("t3")
- with TaskGroup(group_id="tg1"):
- t1 >> t2 >> t3
- dag.validate()
- assert dag.task_group.children.keys() == {"tg1"}
- assert dag.task_group.children["tg1"].children.keys() == {"t1", "t2", "t3"}
- assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == set()
- assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids == {"t2"}
- assert dag.task_group.children["tg1"].children["t2"].upstream_task_ids == {"t1"}
- assert dag.task_group.children["tg1"].children["t2"].downstream_task_ids == {"t3"}
- assert dag.task_group.children["tg1"].children["t3"].upstream_task_ids == {"t2"}
- assert dag.task_group.children["tg1"].children["t3"].downstream_task_ids == set()
-
- # Test that decorated tasks defined outside a task group are added to the root task group
- # when relationships are defined inside the task group
- with dag_maker() as dag:
- t1 = make_task("t1", type_="decorated")
- t2 = make_task("t2", type_="decorated")
- t3 = make_task("t3", type_="decorated")
- with TaskGroup(group_id="tg1"):
- t1 >> t2 >> t3
- dag.validate()
- assert dag.task_group.children.keys() == {"tg1"}
- assert dag.task_group.children["tg1"].children.keys() == {"t1", "t2", "t3"}
- assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == set()
- assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids == {"t2"}
- assert dag.task_group.children["tg1"].children["t2"].upstream_task_ids == {"t1"}
- assert dag.task_group.children["tg1"].children["t2"].downstream_task_ids == {"t3"}
- assert dag.task_group.children["tg1"].children["t3"].upstream_task_ids == {"t2"}
- assert dag.task_group.children["tg1"].children["t3"].downstream_task_ids == set()
-
- # Test adding single decorated task defined outside a task group to a task group
- with dag_maker() as dag:
- t1 = make_task("t1", type_="decorated")
- with TaskGroup(group_id="tg1") as tg1:
- tg1.add_task(t1)
- dag.validate()
- assert dag.task_group.children.keys() == {"tg1"}
- assert dag.task_group.children["tg1"].children.keys() == {"t1"}
- assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == set()
- assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids == set()
-
- # Test adding single classic task defined outside a task group to a task group
- with dag_maker() as dag:
- t1 = make_task("t1")
- with TaskGroup(group_id="tg1") as tg1:
- tg1.add_task(t1)
- dag.validate()
- assert dag.task_group.children.keys() == {"tg1"}
- assert dag.task_group.children["tg1"].children.keys() == {"t1"}
- assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == set()
- assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids == set()
-
- with pytest.raises(
- AirflowException,
- match="Using this method on a task group that's not a context manager is not supported.",
- ):
- with dag_maker():
- t1 = make_task("t1")
- tg1 = TaskGroup(group_id="tg1")
- tg1.add_task(t1)