Don't auto-add to context just by virtue of arrowing (#33102)

* Don't auto-add to context just by virtue of arrowing

* no add tasks

* Add back removed import

* Fixup tests

* fixup! Fixup tests

---------

Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
diff --git a/airflow/models/taskmixin.py b/airflow/models/taskmixin.py
index 0dd013a..f52749c 100644
--- a/airflow/models/taskmixin.py
+++ b/airflow/models/taskmixin.py
@@ -24,7 +24,6 @@
 
 from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
 from airflow.serialization.enums import DagAttributeTypes
-from airflow.utils.setup_teardown import SetupTeardownContext
 from airflow.utils.types import NOTSET, ArgNotSet
 
 if TYPE_CHECKING:
@@ -97,15 +96,11 @@
     def __lshift__(self, other: DependencyMixin | Sequence[DependencyMixin]):
         """Implements Task << Task."""
         self.set_upstream(other)
-        self.set_setup_teardown_ctx_dependencies(other)
-        self.set_taskgroup_ctx_dependencies(other)
         return other
 
     def __rshift__(self, other: DependencyMixin | Sequence[DependencyMixin]):
         """Implements Task >> Task."""
         self.set_downstream(other)
-        self.set_setup_teardown_ctx_dependencies(other)
-        self.set_taskgroup_ctx_dependencies(other)
         return other
 
     def __rrshift__(self, other: DependencyMixin | Sequence[DependencyMixin]):
@@ -136,22 +131,6 @@
             for o in obj:
                 yield from cls._iter_references(o)
 
-    def set_setup_teardown_ctx_dependencies(self, other: DependencyMixin | Sequence[DependencyMixin]):
-        if not SetupTeardownContext.active:
-            return
-        for op, _ in self._iter_references([self, other]):
-            SetupTeardownContext.update_context_map(op)
-
-    def set_taskgroup_ctx_dependencies(self, other: DependencyMixin | Sequence[DependencyMixin]):
-        from airflow.utils.task_group import TaskGroupContext
-
-        if not TaskGroupContext.active:
-            return
-        task_group = TaskGroupContext.get_current_task_group(None)
-        for op, _ in self._iter_references([self, other]):
-            if task_group:
-                op.add_to_taskgroup(task_group)
-
 
 class TaskMixin(DependencyMixin):
     """Mixin to provide task-related things.
diff --git a/tests/decorators/test_setup_teardown.py b/tests/decorators/test_setup_teardown.py
index bcfb81b..567ac5c 100644
--- a/tests/decorators/test_setup_teardown.py
+++ b/tests/decorators/test_setup_teardown.py
@@ -1130,59 +1130,6 @@
             "mytask",
         }
 
-    def test_tasks_decorators_called_outside_context_manager_can_link_up(self, dag_maker):
-        @setup
-        def setuptask():
-            print("setup")
-
-        @task()
-        def mytask():
-            print("mytask")
-
-        @task()
-        def mytask2():
-            print("mytask 2")
-
-        @teardown
-        def teardowntask():
-            print("teardown")
-
-        with dag_maker() as dag:
-            task1 = mytask()
-            task2 = mytask2()
-            with setuptask() >> teardowntask():
-                task1 >> task2
-
-        assert len(dag.task_group.children) == 4
-        assert not dag.task_group.children["setuptask"].upstream_task_ids
-        assert dag.task_group.children["setuptask"].downstream_task_ids == {"mytask", "teardowntask"}
-        assert dag.task_group.children["mytask"].upstream_task_ids == {"setuptask"}
-        assert dag.task_group.children["mytask"].downstream_task_ids == {"mytask2"}
-        assert dag.task_group.children["mytask2"].upstream_task_ids == {"mytask"}
-        assert dag.task_group.children["mytask2"].downstream_task_ids == {"teardowntask"}
-        assert dag.task_group.children["teardowntask"].upstream_task_ids == {"mytask2", "setuptask"}
-        assert not dag.task_group.children["teardowntask"].downstream_task_ids
-
-    def test_classic_tasks_called_outside_context_manager_can_link_up(self, dag_maker):
-
-        with dag_maker() as dag:
-            setuptask = BashOperator(task_id="setuptask", bash_command="echo 1").as_setup()
-            teardowntask = BashOperator(task_id="teardowntask", bash_command="echo 1").as_teardown()
-            mytask = BashOperator(task_id="mytask", bash_command="echo 1")
-            mytask2 = BashOperator(task_id="mytask2", bash_command="echo 1")
-            with setuptask >> teardowntask:
-                mytask >> mytask2
-
-        assert len(dag.task_group.children) == 4
-        assert not dag.task_group.children["setuptask"].upstream_task_ids
-        assert dag.task_group.children["setuptask"].downstream_task_ids == {"mytask", "teardowntask"}
-        assert dag.task_group.children["mytask"].upstream_task_ids == {"setuptask"}
-        assert dag.task_group.children["mytask"].downstream_task_ids == {"mytask2"}
-        assert dag.task_group.children["mytask2"].upstream_task_ids == {"mytask"}
-        assert dag.task_group.children["mytask2"].downstream_task_ids == {"teardowntask"}
-        assert dag.task_group.children["teardowntask"].upstream_task_ids == {"mytask2", "setuptask"}
-        assert not dag.task_group.children["teardowntask"].downstream_task_ids
-
     def test_tasks_decorators_called_outside_context_manager_can_link_up_with_scope(self, dag_maker):
         @setup
         def setuptask():
diff --git a/tests/models/test_taskmixin.py b/tests/models/test_taskmixin.py
index c1795f2..95aefd0 100644
--- a/tests/models/test_taskmixin.py
+++ b/tests/models/test_taskmixin.py
@@ -209,100 +209,3 @@
             ValueError, match="Cannot mark task 'my_ok_task__2' as setup; task is already a teardown."
         ):
             m.operator.is_setup = True
-
-
-def test_set_setup_teardown_ctx_dependencies_using_decorated_tasks(dag_maker):
-
-    with dag_maker():
-        t1 = make_task("t1", type_="decorated")
-        setuptask = make_task("setuptask", type_="decorated", setup_=True)
-        teardowntask = make_task("teardowntask", type_="decorated", teardown_=True)
-        with setuptask >> teardowntask as scope:
-            scope.add_task(t1)
-
-    assert t1.operator.upstream_task_ids == {"setuptask"}
-    assert t1.operator.downstream_task_ids == {"teardowntask"}
-
-    with dag_maker():
-        t1 = make_task("t1", type_="decorated")
-        t2 = make_task("t2", type_="decorated")
-        setuptask = make_task("setuptask", type_="decorated", setup_=True)
-        teardowntask = make_task("teardowntask", type_="decorated", teardown_=True)
-        with setuptask >> teardowntask:
-            t1 >> t2
-    assert t1.operator.upstream_task_ids == {"setuptask"}
-    assert t2.operator.downstream_task_ids == {"teardowntask"}
-
-    with dag_maker():
-        t1 = make_task("t1", type_="decorated")
-        t2 = make_task("t2", type_="decorated")
-        t3 = make_task("t3", type_="decorated")
-        setuptask = make_task("setuptask", type_="decorated", setup_=True)
-        teardowntask = make_task("teardowntask", type_="decorated", teardown_=True)
-        with setuptask >> teardowntask:
-            t1 >> [t2, t3]
-
-    assert t1.operator.upstream_task_ids == {"setuptask"}
-    assert t2.operator.downstream_task_ids == {"teardowntask"}
-    assert t3.operator.downstream_task_ids == {"teardowntask"}
-
-    with dag_maker():
-        t1 = make_task("t1", type_="decorated")
-        t2 = make_task("t2", type_="decorated")
-        t3 = make_task("t3", type_="decorated")
-        setuptask = make_task("setuptask", type_="decorated", setup_=True)
-        teardowntask = make_task("teardowntask", type_="decorated", teardown_=True)
-        with setuptask >> teardowntask:
-            [t1, t2] >> t3
-
-    assert t1.operator.upstream_task_ids == {"setuptask"}
-    assert t2.operator.upstream_task_ids == {"setuptask"}
-    assert t3.operator.downstream_task_ids == {"teardowntask"}
-
-
-def test_set_setup_teardown_ctx_dependencies_using_classic_tasks(dag_maker):
-    with dag_maker():
-        t1 = make_task("t1", type_="classic")
-        setuptask = make_task("setuptask", type_="classic", setup_=True)
-        teardowntask = make_task("teardowntask", type_="classic", teardown_=True)
-        with setuptask >> teardowntask as scope:
-            scope.add_task(t1)
-
-    assert t1.upstream_task_ids == {"setuptask"}
-    assert t1.downstream_task_ids == {"teardowntask"}
-
-    with dag_maker():
-        t1 = make_task("t1", type_="classic")
-        t2 = make_task("t2", type_="classic")
-        setuptask = make_task("setuptask", type_="classic", setup_=True)
-        teardowntask = make_task("teardowntask", type_="classic", teardown_=True)
-        with setuptask >> teardowntask:
-            t1 >> t2
-    assert t1.upstream_task_ids == {"setuptask"}
-    assert t2.downstream_task_ids == {"teardowntask"}
-
-    with dag_maker():
-        t1 = make_task("t1", type_="classic")
-        t2 = make_task("t2", type_="classic")
-        t3 = make_task("t3", type_="classic")
-        setuptask = make_task("setuptask", type_="classic", setup_=True)
-        teardowntask = make_task("teardowntask", type_="classic", teardown_=True)
-        with setuptask >> teardowntask:
-            t1 >> [t2, t3]
-
-    assert t1.upstream_task_ids == {"setuptask"}
-    assert t2.downstream_task_ids == {"teardowntask"}
-    assert t3.downstream_task_ids == {"teardowntask"}
-
-    with dag_maker():
-        t1 = make_task("t1", type_="classic")
-        t2 = make_task("t2", type_="classic")
-        t3 = make_task("t3", type_="classic")
-        setuptask = make_task("setuptask", type_="classic", setup_=True)
-        teardowntask = make_task("teardowntask", type_="classic", teardown_=True)
-        with setuptask >> teardowntask:
-            [t1, t2] >> t3
-
-    assert t1.upstream_task_ids == {"setuptask"}
-    assert t2.upstream_task_ids == {"setuptask"}
-    assert t3.downstream_task_ids == {"teardowntask"}
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index 4475e71..c021d98 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -23,7 +23,7 @@
 import pytest
 
 from airflow.decorators import dag, task as task_decorator, task_group as task_group_decorator
-from airflow.exceptions import AirflowException, TaskAlreadyInTaskGroup
+from airflow.exceptions import TaskAlreadyInTaskGroup
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG
 from airflow.models.xcom_arg import XComArg
@@ -1479,72 +1479,3 @@
         tg1 >> w2
     assert t1.downstream_task_ids == set()
     assert w1.downstream_task_ids == {"tg1.t1", "w2"}
-
-
-def test_tasks_defined_outside_taskgrooup(dag_maker):
-    # Test that classic tasks defined outside a task group are added to the root task group
-    # when the relationships are defined inside the task group
-    with dag_maker() as dag:
-        t1 = make_task("t1")
-        t2 = make_task("t2")
-        t3 = make_task("t3")
-        with TaskGroup(group_id="tg1"):
-            t1 >> t2 >> t3
-    dag.validate()
-    assert dag.task_group.children.keys() == {"tg1"}
-    assert dag.task_group.children["tg1"].children.keys() == {"t1", "t2", "t3"}
-    assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == set()
-    assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids == {"t2"}
-    assert dag.task_group.children["tg1"].children["t2"].upstream_task_ids == {"t1"}
-    assert dag.task_group.children["tg1"].children["t2"].downstream_task_ids == {"t3"}
-    assert dag.task_group.children["tg1"].children["t3"].upstream_task_ids == {"t2"}
-    assert dag.task_group.children["tg1"].children["t3"].downstream_task_ids == set()
-
-    # Test that decorated tasks defined outside a task group are added to the root task group
-    # when relationships are defined inside the task group
-    with dag_maker() as dag:
-        t1 = make_task("t1", type_="decorated")
-        t2 = make_task("t2", type_="decorated")
-        t3 = make_task("t3", type_="decorated")
-        with TaskGroup(group_id="tg1"):
-            t1 >> t2 >> t3
-    dag.validate()
-    assert dag.task_group.children.keys() == {"tg1"}
-    assert dag.task_group.children["tg1"].children.keys() == {"t1", "t2", "t3"}
-    assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == set()
-    assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids == {"t2"}
-    assert dag.task_group.children["tg1"].children["t2"].upstream_task_ids == {"t1"}
-    assert dag.task_group.children["tg1"].children["t2"].downstream_task_ids == {"t3"}
-    assert dag.task_group.children["tg1"].children["t3"].upstream_task_ids == {"t2"}
-    assert dag.task_group.children["tg1"].children["t3"].downstream_task_ids == set()
-
-    # Test adding single decorated task defined outside a task group to a task group
-    with dag_maker() as dag:
-        t1 = make_task("t1", type_="decorated")
-        with TaskGroup(group_id="tg1") as tg1:
-            tg1.add_task(t1)
-    dag.validate()
-    assert dag.task_group.children.keys() == {"tg1"}
-    assert dag.task_group.children["tg1"].children.keys() == {"t1"}
-    assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == set()
-    assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids == set()
-
-    # Test adding single classic task defined outside a task group to a task group
-    with dag_maker() as dag:
-        t1 = make_task("t1")
-        with TaskGroup(group_id="tg1") as tg1:
-            tg1.add_task(t1)
-    dag.validate()
-    assert dag.task_group.children.keys() == {"tg1"}
-    assert dag.task_group.children["tg1"].children.keys() == {"t1"}
-    assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == set()
-    assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids == set()
-
-    with pytest.raises(
-        AirflowException,
-        match="Using this method on a task group that's not a context manager is not supported.",
-    ):
-        with dag_maker():
-            t1 = make_task("t1")
-            tg1 = TaskGroup(group_id="tg1")
-            tg1.add_task(t1)