blob: 4fd8a5282e6e6a9a752091e43e10133dab686b5a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from datetime import datetime
from itsdangerous import URLSafeSerializer
from airflow import DAG
from airflow.api_connexion.schemas.dag_schema import (
DAGCollection,
DAGCollectionSchema,
DAGDetailSchema,
DAGSchema,
)
from airflow.configuration import conf
from airflow.models import DagModel, DagTag
SERIALIZER = URLSafeSerializer(conf.get('webserver', 'SECRET_KEY'))
class TestDagSchema(unittest.TestCase):
def test_serialize(self):
dag_model = DagModel(
dag_id="test_dag_id",
root_dag_id="test_root_dag_id",
is_paused=True,
is_active=True,
is_subdag=False,
fileloc="/root/airflow/dags/my_dag.py",
owners="airflow1,airflow2",
description="The description",
schedule_interval="5 4 * * *",
tags=[DagTag(name="tag-1"), DagTag(name="tag-2")],
)
serialized_dag = DAGSchema().dump(dag_model)
assert {
"dag_id": "test_dag_id",
"description": "The description",
"fileloc": "/root/airflow/dags/my_dag.py",
"file_token": SERIALIZER.dumps("/root/airflow/dags/my_dag.py"),
"is_paused": True,
"is_active": True,
"is_subdag": False,
"owners": ["airflow1", "airflow2"],
"root_dag_id": "test_root_dag_id",
"schedule_interval": {"__type": "CronExpression", "value": "5 4 * * *"},
"tags": [{"name": "tag-1"}, {"name": "tag-2"}],
} == serialized_dag
class TestDAGCollectionSchema(unittest.TestCase):
def test_serialize(self):
dag_model_a = DagModel(dag_id="test_dag_id_a", fileloc="/tmp/a.py")
dag_model_b = DagModel(dag_id="test_dag_id_b", fileloc="/tmp/a.py")
schema = DAGCollectionSchema()
instance = DAGCollection(dags=[dag_model_a, dag_model_b], total_entries=2)
assert {
"dags": [
{
"dag_id": "test_dag_id_a",
"description": None,
"fileloc": "/tmp/a.py",
"file_token": SERIALIZER.dumps("/tmp/a.py"),
"is_paused": None,
"is_subdag": None,
"is_active": None,
"owners": [],
"root_dag_id": None,
"schedule_interval": None,
"tags": [],
},
{
"dag_id": "test_dag_id_b",
"description": None,
"fileloc": "/tmp/a.py",
"file_token": SERIALIZER.dumps("/tmp/a.py"),
"is_active": None,
"is_paused": None,
"is_subdag": None,
"owners": [],
"root_dag_id": None,
"schedule_interval": None,
"tags": [],
},
],
"total_entries": 2,
} == schema.dump(instance)
class TestDAGDetailSchema:
def test_serialize(self):
dag = DAG(
dag_id="test_dag",
start_date=datetime(2020, 6, 19),
doc_md="docs",
orientation="LR",
default_view="duration",
params={"foo": 1},
tags=['example1', 'example2'],
)
schema = DAGDetailSchema()
expected = {
'catchup': True,
'concurrency': 16,
'dag_id': 'test_dag',
'dag_run_timeout': None,
'default_view': 'duration',
'description': None,
'doc_md': 'docs',
'fileloc': __file__,
"file_token": SERIALIZER.dumps(__file__),
"is_active": None,
'is_paused': None,
'is_subdag': False,
'orientation': 'LR',
'owners': [],
'params': {'foo': 1},
'schedule_interval': {'__type': 'TimeDelta', 'days': 1, 'seconds': 0, 'microseconds': 0},
'start_date': '2020-06-19T00:00:00+00:00',
'tags': [{'name': "example1"}, {'name': "example2"}],
'timezone': "Timezone('UTC')",
}
assert schema.dump(dag) == expected