blob: 2eb1d7cd1a4746c6078a1565f6b07625db40659e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from datetime import datetime
from airflow import DAG
from airflow.api_connexion.schemas.dag_schema import (
DAGCollection,
DAGCollectionSchema,
DAGDetailSchema,
DAGSchema,
)
from airflow.models import DagModel, DagTag
def test_serialize_test_dag_schema(url_safe_serializer):
dag_model = DagModel(
dag_id="test_dag_id",
root_dag_id="test_root_dag_id",
is_paused=True,
is_active=True,
is_subdag=False,
fileloc="/root/airflow/dags/my_dag.py",
owners="airflow1,airflow2",
description="The description",
schedule_interval="5 4 * * *",
tags=[DagTag(name="tag-1"), DagTag(name="tag-2")],
)
serialized_dag = DAGSchema().dump(dag_model)
assert {
"dag_id": "test_dag_id",
"description": "The description",
"fileloc": "/root/airflow/dags/my_dag.py",
"file_token": url_safe_serializer.dumps("/root/airflow/dags/my_dag.py"),
"is_paused": True,
"is_active": True,
"is_subdag": False,
"owners": ["airflow1", "airflow2"],
"root_dag_id": "test_root_dag_id",
"schedule_interval": {"__type": "CronExpression", "value": "5 4 * * *"},
"tags": [{"name": "tag-1"}, {"name": "tag-2"}],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": None,
"pickle_id": None,
} == serialized_dag
def test_serialize_test_dag_collection_schema(url_safe_serializer):
dag_model_a = DagModel(dag_id="test_dag_id_a", fileloc="/tmp/a.py")
dag_model_b = DagModel(dag_id="test_dag_id_b", fileloc="/tmp/a.py")
schema = DAGCollectionSchema()
instance = DAGCollection(dags=[dag_model_a, dag_model_b], total_entries=2)
assert {
"dags": [
{
"dag_id": "test_dag_id_a",
"description": None,
"fileloc": "/tmp/a.py",
"file_token": url_safe_serializer.dumps("/tmp/a.py"),
"is_paused": None,
"is_subdag": None,
"is_active": None,
"owners": [],
"root_dag_id": None,
"schedule_interval": None,
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": None,
"pickle_id": None,
},
{
"dag_id": "test_dag_id_b",
"description": None,
"fileloc": "/tmp/a.py",
"file_token": url_safe_serializer.dumps("/tmp/a.py"),
"is_active": None,
"is_paused": None,
"is_subdag": None,
"owners": [],
"root_dag_id": None,
"schedule_interval": None,
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": None,
"pickle_id": None,
},
],
"total_entries": 2,
} == schema.dump(instance)
def test_serialize_test_dag_detail_schema(url_safe_serializer):
dag = DAG(
dag_id="test_dag",
start_date=datetime(2020, 6, 19),
doc_md="docs",
orientation="LR",
default_view="duration",
params={"foo": 1},
tags=["example1", "example2"],
)
schema = DAGDetailSchema()
expected = {
"catchup": True,
"concurrency": 16,
"max_active_tasks": 16,
"dag_id": "test_dag",
"dag_run_timeout": None,
"default_view": "duration",
"description": None,
"doc_md": "docs",
"fileloc": __file__,
"file_token": url_safe_serializer.dumps(__file__),
"is_active": None,
"is_paused": None,
"is_subdag": False,
"orientation": "LR",
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"value": 1,
"description": None,
"schema": {},
}
},
"schedule_interval": {"__type": "TimeDelta", "days": 1, "seconds": 0, "microseconds": 0},
"start_date": "2020-06-19T00:00:00+00:00",
"tags": [{"name": "example1"}, {"name": "example2"}],
"timezone": "Timezone('UTC')",
"max_active_runs": 16,
"pickle_id": None,
"end_date": None,
"is_paused_upon_creation": None,
"render_template_as_native_obj": False,
}
obj = schema.dump(dag)
expected.update({"last_parsed": obj["last_parsed"]})
assert obj == expected