blob: 9cffec7a8951c3174558e629bf8254fc7ba158e2 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from unittest import mock
import pytest
from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.serialization.dag_dependency import DagDependency
from airflow.utils import dot_renderer, timezone
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup
from tests_common.test_utils.compat import BashOperator
from tests_common.test_utils.db import clear_db_dags
try:
from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
from airflow.operators.python import PythonOperator # type: ignore[no-redef,attr-defined]
START_DATE = timezone.utcnow()
pytestmark = pytest.mark.db_test
class TestDotRenderer:
def setup_class(self):
clear_db_dags()
def teardown_method(self):
clear_db_dags()
def test_should_render_dag_dependencies(self):
dag_dep_1 = DagDependency(
source="dag_one", target="dag_two", dependency_type="Sensor", dependency_id="task_1"
)
dag_dep_2 = DagDependency(
source="dag_two", target="dag_three", dependency_type="Sensor", dependency_id="task_2"
)
dag_dependency_list = []
dag_dependency_list.append(dag_dep_1)
dag_dependency_list.append(dag_dep_2)
dag_dependency_dict = {}
dag_dependency_dict["dag_one"] = dag_dependency_list
dot = dot_renderer.render_dag_dependencies(dag_dependency_dict)
assert "dag_one -> task_1" in dot.source
assert "task_1 -> dag_two" in dot.source
assert "dag_two -> task_2" in dot.source
assert "task_2 -> dag_three" in dot.source
def test_should_render_dag(self):
with DAG(dag_id="DAG_ID", schedule=None) as dag:
task_1 = BashOperator(start_date=START_DATE, task_id="first", bash_command="echo 1")
task_2 = BashOperator(start_date=START_DATE, task_id="second", bash_command="echo 1")
task_3 = PythonOperator(start_date=START_DATE, task_id="third", python_callable=mock.MagicMock())
task_1 >> task_2
task_1 >> task_3
dot = dot_renderer.render_dag(dag)
source = dot.source
# Should render DAG title
assert "label=DAG_ID" in source
assert "first" in source
assert "second" in source
assert "third" in source
assert "first -> second" in source
assert "first -> third" in source
assert 'fillcolor="#f0ede4"' in source
assert 'fillcolor="#f0ede4"' in source
def test_should_render_dag_with_task_instances(self, session, dag_maker):
with dag_maker(dag_id="DAG_ID", session=session) as dag:
task_1 = BashOperator(start_date=START_DATE, task_id="first", bash_command="echo 1")
task_2 = BashOperator(start_date=START_DATE, task_id="second", bash_command="echo 1")
task_3 = PythonOperator(start_date=START_DATE, task_id="third", python_callable=mock.MagicMock())
task_1 >> task_2
task_1 >> task_3
tis = {ti.task_id: ti for ti in dag_maker.create_dagrun(logical_date=START_DATE).task_instances}
tis["first"].state = State.SCHEDULED
tis["second"].state = State.SUCCESS
tis["third"].state = State.RUNNING
dot = dot_renderer.render_dag(dag, tis=tis.values())
source = dot.source
# Should render DAG title
assert "label=DAG_ID" in source
assert (
'first [color=black fillcolor=tan label=first shape=rectangle style="filled,rounded"]' in source
)
assert (
'second [color=white fillcolor=green label=second shape=rectangle style="filled,rounded"]'
in source
)
assert (
'third [color=black fillcolor=lime label=third shape=rectangle style="filled,rounded"]' in source
)
def test_should_render_dag_with_mapped_operator(self, session, dag_maker):
with dag_maker(dag_id="DAG_ID", session=session) as dag:
BashOperator.partial(task_id="first", task_display_name="First Task").expand(
bash_command=["echo hello", "echo world"]
)
dot = dot_renderer.render_dag(dag)
source = dot.source
# Should render DAG title
assert "label=DAG_ID" in source
assert (
'first [color="#000000" fillcolor="#f0ede4" label="First Task" shape=rectangle style="filled,rounded"]'
in source
)
def test_render_task_group(self):
with DAG(dag_id="example_task_group", schedule=None, start_date=START_DATE) as dag:
start = EmptyOperator(task_id="start")
with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1:
task_1 = EmptyOperator(task_id="task_1")
task_2 = BashOperator(task_id="task_2", bash_command="echo 1")
task_3 = EmptyOperator(task_id="task_3")
task_1 >> [task_2, task_3]
with TaskGroup("section_2", tooltip="Tasks for section_2") as section_2:
task_1 = EmptyOperator(task_id="task_1")
with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2"):
task_2 = BashOperator(task_id="task_2", bash_command="echo 1")
task_3 = EmptyOperator(task_id="task_3")
task_4 = EmptyOperator(task_id="task_4")
[task_2, task_3] >> task_4
end = EmptyOperator(task_id="end")
start >> section_1 >> section_2 >> end
dot = dot_renderer.render_dag(dag)
assert dot.source.strip() == "\n".join(
[
"digraph example_task_group {",
"\tgraph [label=example_task_group labelloc=t rankdir=LR]",
'\tend [color="#000000" fillcolor="#e8f7e4" label=end shape=rectangle '
'style="filled,rounded"]',
"\tsubgraph cluster_section_1 {",
'\t\tcolor="#000000" fillcolor="#6495ed7f" label=section_1 shape=rectangle style=filled',
'\t\t"section_1.upstream_join_id" [color="#000000" fillcolor=CornflowerBlue height=0.2 '
'label="" shape=circle style="filled,rounded" width=0.2]',
'\t\t"section_1.downstream_join_id" [color="#000000" fillcolor=CornflowerBlue height=0.2 '
'label="" shape=circle style="filled,rounded" width=0.2]',
'\t\t"section_1.task_1" [color="#000000" fillcolor="#e8f7e4" label=task_1 shape=rectangle '
'style="filled,rounded"]',
'\t\t"section_1.task_2" [color="#000000" fillcolor="#f0ede4" label=task_2 shape=rectangle '
'style="filled,rounded"]',
'\t\t"section_1.task_3" [color="#000000" fillcolor="#e8f7e4" label=task_3 shape=rectangle '
'style="filled,rounded"]',
"\t}",
"\tsubgraph cluster_section_2 {",
'\t\tcolor="#000000" fillcolor="#6495ed7f" label=section_2 shape=rectangle style=filled',
'\t\t"section_2.upstream_join_id" [color="#000000" fillcolor=CornflowerBlue height=0.2 '
'label="" shape=circle style="filled,rounded" width=0.2]',
'\t\t"section_2.downstream_join_id" [color="#000000" fillcolor=CornflowerBlue height=0.2 '
'label="" shape=circle style="filled,rounded" width=0.2]',
'\t\tsubgraph "cluster_section_2.inner_section_2" {',
'\t\t\tcolor="#000000" fillcolor="#6495ed7f" label=inner_section_2 shape=rectangle '
"style=filled",
'\t\t\t"section_2.inner_section_2.task_2" [color="#000000" fillcolor="#f0ede4" label=task_2 '
'shape=rectangle style="filled,rounded"]',
'\t\t\t"section_2.inner_section_2.task_3" [color="#000000" fillcolor="#e8f7e4" label=task_3 '
'shape=rectangle style="filled,rounded"]',
'\t\t\t"section_2.inner_section_2.task_4" [color="#000000" fillcolor="#e8f7e4" label=task_4 '
'shape=rectangle style="filled,rounded"]',
"\t\t}",
'\t\t"section_2.task_1" [color="#000000" fillcolor="#e8f7e4" label=task_1 shape=rectangle '
'style="filled,rounded"]',
"\t}",
'\tstart [color="#000000" fillcolor="#e8f7e4" label=start shape=rectangle '
'style="filled,rounded"]',
'\t"section_1.downstream_join_id" -> "section_2.upstream_join_id"',
'\t"section_1.task_1" -> "section_1.task_2"',
'\t"section_1.task_1" -> "section_1.task_3"',
'\t"section_1.task_2" -> "section_1.downstream_join_id"',
'\t"section_1.task_3" -> "section_1.downstream_join_id"',
'\t"section_1.upstream_join_id" -> "section_1.task_1"',
'\t"section_2.downstream_join_id" -> end',
'\t"section_2.inner_section_2.task_2" -> "section_2.inner_section_2.task_4"',
'\t"section_2.inner_section_2.task_3" -> "section_2.inner_section_2.task_4"',
'\t"section_2.inner_section_2.task_4" -> "section_2.downstream_join_id"',
'\t"section_2.task_1" -> "section_2.downstream_join_id"',
'\t"section_2.upstream_join_id" -> "section_2.inner_section_2.task_2"',
'\t"section_2.upstream_join_id" -> "section_2.inner_section_2.task_3"',
'\t"section_2.upstream_join_id" -> "section_2.task_1"',
'\tstart -> "section_1.upstream_join_id"',
"}",
]
)