blob: db8282f943f7b0499db3a8ca87a9b465883f24a7 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Unit tests for SerializedDagModel."""
import unittest
from airflow import DAG, example_dags as example_dags_module
from airflow.models import DagBag
from airflow.models.dagcode import DagCode
from airflow.models.serialized_dag import SerializedDagModel as SDM
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils.session import create_session
from tests.test_utils.asserts import assert_queries_count
# To move it to a shared module.
def make_example_dags(module):
"""Loads DAGs from a module for test."""
dagbag = DagBag(module.__path__[0])
return dagbag.dags
def clear_db_serialized_dags():
with create_session() as session:
session.query(SDM).delete()
class SerializedDagModelTest(unittest.TestCase):
"""Unit tests for SerializedDagModel."""
def setUp(self):
clear_db_serialized_dags()
def tearDown(self):
clear_db_serialized_dags()
def test_dag_fileloc_hash(self):
"""Verifies the correctness of hashing file path."""
assert DagCode.dag_fileloc_hash('/airflow/dags/test_dag.py') == 33826252060516589
def _write_example_dags(self):
example_dags = make_example_dags(example_dags_module)
for dag in example_dags.values():
SDM.write_dag(dag)
return example_dags
def test_write_dag(self):
"""DAGs can be written into database."""
example_dags = self._write_example_dags()
with create_session() as session:
for dag in example_dags.values():
assert SDM.has_dag(dag.dag_id)
result = session.query(SDM.fileloc, SDM.data).filter(SDM.dag_id == dag.dag_id).one()
assert result.fileloc == dag.full_filepath
# Verifies JSON schema.
SerializedDAG.validate_schema(result.data)
def test_serialized_dag_is_updated_only_if_dag_is_changed(self):
"""Test Serialized DAG is updated if DAG is changed"""
example_dags = make_example_dags(example_dags_module)
example_bash_op_dag = example_dags.get("example_bash_operator")
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
assert dag_updated is True
with create_session() as session:
s_dag = session.query(SDM).get(example_bash_op_dag.dag_id)
# Test that if DAG is not changed, Serialized DAG is not re-written and last_updated
# column is not updated
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
s_dag_1 = session.query(SDM).get(example_bash_op_dag.dag_id)
assert s_dag_1.dag_hash == s_dag.dag_hash
assert s_dag.last_updated == s_dag_1.last_updated
assert dag_updated is False
# Update DAG
example_bash_op_dag.tags += ["new_tag"]
assert set(example_bash_op_dag.tags) == {"example", "example2", "new_tag"}
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
s_dag_2 = session.query(SDM).get(example_bash_op_dag.dag_id)
assert s_dag.last_updated != s_dag_2.last_updated
assert s_dag.dag_hash != s_dag_2.dag_hash
assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"]
assert dag_updated is True
def test_read_dags(self):
"""DAGs can be read from database."""
example_dags = self._write_example_dags()
serialized_dags = SDM.read_all_dags()
assert len(example_dags) == len(serialized_dags)
for dag_id, dag in example_dags.items():
serialized_dag = serialized_dags[dag_id]
assert serialized_dag.dag_id == dag.dag_id
assert set(serialized_dag.task_dict) == set(dag.task_dict)
def test_remove_dags_by_id(self):
"""DAGs can be removed from database."""
example_dags_list = list(self._write_example_dags().values())
# Remove SubDags from the list as they are not stored in DB in a separate row
# and are directly added in Json blob of the main DAG
filtered_example_dags_list = [dag for dag in example_dags_list if not dag.is_subdag]
# Tests removing by dag_id.
dag_removed_by_id = filtered_example_dags_list[0]
SDM.remove_dag(dag_removed_by_id.dag_id)
assert not SDM.has_dag(dag_removed_by_id.dag_id)
def test_remove_dags_by_filepath(self):
"""DAGs can be removed from database."""
example_dags_list = list(self._write_example_dags().values())
# Remove SubDags from the list as they are not stored in DB in a separate row
# and are directly added in Json blob of the main DAG
filtered_example_dags_list = [dag for dag in example_dags_list if not dag.is_subdag]
# Tests removing by file path.
dag_removed_by_file = filtered_example_dags_list[0]
# remove repeated files for those DAGs that define multiple dags in the same file (set comprehension)
example_dag_files = list({dag.full_filepath for dag in filtered_example_dags_list})
example_dag_files.remove(dag_removed_by_file.full_filepath)
SDM.remove_deleted_dags(example_dag_files)
assert not SDM.has_dag(dag_removed_by_file.dag_id)
def test_bulk_sync_to_db(self):
dags = [
DAG("dag_1"),
DAG("dag_2"),
DAG("dag_3"),
]
with assert_queries_count(10):
SDM.bulk_sync_to_db(dags)