Fix tests/decorators/test_python.py for database isolation tests (#41387)
* Pass serialized parameter for dag_maker
* Serialisation of object is on __exit__ moving out the dag definition out of dag_maker context
(cherry picked from commit 278f3c4b6eaf823a0c6eb538a6f61d21e0f955d8)
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index fb8c75b..9d2b9a1 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -1000,50 +1000,52 @@
def test_teardown_trigger_rule_selective_application(dag_maker, session):
- with dag_maker(session=session) as dag:
+ with dag_maker(session=session, serialized=True) as created_dag:
+ dag = created_dag
- @dag.task
- def my_work():
- return "abc"
+ @dag.task
+ def my_work():
+ return "abc"
- @setup
- @dag.task
- def my_setup():
- return "abc"
+ @setup
+ @dag.task
+ def my_setup():
+ return "abc"
- @teardown
- @dag.task
- def my_teardown():
- return "abc"
+ @teardown
+ @dag.task
+ def my_teardown():
+ return "abc"
- work_task = my_work()
- setup_task = my_setup()
- teardown_task = my_teardown()
+ work_task = my_work()
+ setup_task = my_setup()
+ teardown_task = my_teardown()
assert work_task.operator.trigger_rule == TriggerRule.ALL_SUCCESS
assert setup_task.operator.trigger_rule == TriggerRule.ALL_SUCCESS
assert teardown_task.operator.trigger_rule == TriggerRule.ALL_DONE_SETUP_SUCCESS
def test_teardown_trigger_rule_override_behavior(dag_maker, session):
- with dag_maker(session=session) as dag:
+ with dag_maker(session=session, serialized=True) as created_dag:
+ dag = created_dag
- @dag.task(trigger_rule=TriggerRule.ONE_SUCCESS)
- def my_work():
- return "abc"
+ @dag.task(trigger_rule=TriggerRule.ONE_SUCCESS)
+ def my_work():
+ return "abc"
- @setup
- @dag.task(trigger_rule=TriggerRule.ONE_SUCCESS)
- def my_setup():
- return "abc"
+ @setup
+ @dag.task(trigger_rule=TriggerRule.ONE_SUCCESS)
+ def my_setup():
+ return "abc"
- @teardown
- @dag.task(trigger_rule=TriggerRule.ONE_SUCCESS)
- def my_teardown():
- return "abc"
+ @teardown
+ @dag.task(trigger_rule=TriggerRule.ONE_SUCCESS)
+ def my_teardown():
+ return "abc"
- work_task = my_work()
- setup_task = my_setup()
- with pytest.raises(Exception, match="Trigger rule not configurable for teardown tasks."):
- my_teardown()
+ work_task = my_work()
+ setup_task = my_setup()
+ with pytest.raises(Exception, match="Trigger rule not configurable for teardown tasks."):
+ my_teardown()
assert work_task.operator.trigger_rule == TriggerRule.ONE_SUCCESS
assert setup_task.operator.trigger_rule == TriggerRule.ONE_SUCCESS