blob: 06ac5bbef85fd694e86533628fc5830a7827ec6f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
import unittest.mock
from datetime import datetime
import pytest
from airflow import DAG
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.models import DagBag, DagModel
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.empty import EmptyOperator
from airflow.security import permissions
from airflow.utils.session import provide_session
from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags
@pytest.fixture()
def current_file_token(url_safe_serializer) -> str:
return url_safe_serializer.dumps(__file__)
DAG_ID = "test_dag"
TASK_ID = "op1"
DAG2_ID = "test_dag2"
DAG3_ID = "test_dag3"
@pytest.fixture(scope="module")
def configured_app(minimal_app_for_api):
app = minimal_app_for_api
create_user(
app, # type: ignore
username="test",
role_name="Test",
permissions=[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG),
],
)
create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore
create_user(app, username="test_granular_permissions", role_name="TestGranularDag") # type: ignore
app.appbuilder.sm.sync_perm_for_dag( # type: ignore
"TEST_DAG_1",
access_control={"TestGranularDag": [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]},
)
app.appbuilder.sm.sync_perm_for_dag( # type: ignore
"TEST_DAG_1",
access_control={"TestGranularDag": [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]},
)
with DAG(
DAG_ID,
start_date=datetime(2020, 6, 15),
doc_md="details",
params={"foo": 1},
tags=["example"],
) as dag:
EmptyOperator(task_id=TASK_ID)
with DAG(DAG2_ID, start_date=datetime(2020, 6, 15)) as dag2: # no doc_md
EmptyOperator(task_id=TASK_ID)
with DAG(DAG3_ID) as dag3: # DAG start_date set to None
EmptyOperator(task_id=TASK_ID, start_date=datetime(2019, 6, 12))
dag_bag = DagBag(os.devnull, include_examples=False)
dag_bag.dags = {dag.dag_id: dag, dag2.dag_id: dag2, dag3.dag_id: dag3}
app.dag_bag = dag_bag
yield app
delete_user(app, username="test") # type: ignore
delete_user(app, username="test_no_permissions") # type: ignore
delete_user(app, username="test_granular_permissions") # type: ignore
class TestDagEndpoint:
@staticmethod
def clean_db():
clear_db_runs()
clear_db_dags()
clear_db_serialized_dags()
@pytest.fixture(autouse=True)
def setup_attrs(self, configured_app) -> None:
self.clean_db()
self.app = configured_app
self.client = self.app.test_client() # type:ignore
self.dag_id = DAG_ID
self.dag2_id = DAG2_ID
self.dag3_id = DAG3_ID
def teardown_method(self) -> None:
self.clean_db()
@provide_session
def _create_dag_models(self, count, dag_id_prefix="TEST_DAG", is_paused=False, session=None):
for num in range(1, count + 1):
dag_model = DagModel(
dag_id=f"{dag_id_prefix}_{num}",
fileloc=f"/tmp/dag_{num}.py",
schedule_interval="2 2 * * *",
is_active=True,
is_paused=is_paused,
)
session.add(dag_model)
@provide_session
def _create_deactivated_dag(self, session=None):
dag_model = DagModel(
dag_id="TEST_DAG_DELETED_1",
fileloc="/tmp/dag_del_1.py",
schedule_interval="2 2 * * *",
is_active=False,
)
session.add(dag_model)
class TestGetDag(TestDagEndpoint):
@conf_vars({("webserver", "secret_key"): "mysecret"})
def test_should_respond_200(self):
self._create_dag_models(1)
response = self.client.get("/api/v1/dags/TEST_DAG_1", environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 200
assert {
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": "Ii90bXAvZGFnXzEucHki.EnmIdPaUPo26lHQClbWMbDFD1Pk",
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {"__type": "CronExpression", "value": "2 2 * * *"},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
} == response.json
@conf_vars({("webserver", "secret_key"): "mysecret"})
def test_should_respond_200_with_schedule_interval_none(self, session):
dag_model = DagModel(
dag_id="TEST_DAG_1",
fileloc="/tmp/dag_1.py",
schedule_interval=None,
is_paused=False,
)
session.add(dag_model)
session.commit()
response = self.client.get("/api/v1/dags/TEST_DAG_1", environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 200
assert {
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": "Ii90bXAvZGFnXzEucHki.EnmIdPaUPo26lHQClbWMbDFD1Pk",
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": None,
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
} == response.json
def test_should_respond_200_with_granular_dag_access(self):
self._create_dag_models(1)
response = self.client.get(
"/api/v1/dags/TEST_DAG_1", environ_overrides={"REMOTE_USER": "test_granular_permissions"}
)
assert response.status_code == 200
def test_should_respond_404(self):
response = self.client.get("/api/v1/dags/INVALID_DAG", environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 404
def test_should_raises_401_unauthenticated(self):
self._create_dag_models(1)
response = self.client.get("/api/v1/dags/TEST_DAG_1")
assert_401(response)
def test_should_raise_403_forbidden(self):
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/details", environ_overrides={"REMOTE_USER": "test_no_permissions"}
)
assert response.status_code == 403
def test_should_respond_403_with_granular_access_for_different_dag(self):
self._create_dag_models(3)
response = self.client.get(
"/api/v1/dags/TEST_DAG_2", environ_overrides={"REMOTE_USER": "test_granular_permissions"}
)
assert response.status_code == 403
class TestGetDagDetails(TestDagEndpoint):
def test_should_respond_200(self, current_file_token):
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/details", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
last_parsed = response.json["last_parsed"]
expected = {
"catchup": True,
"concurrency": 16,
"max_active_tasks": 16,
"dag_id": "test_dag",
"dag_run_timeout": None,
"default_view": "grid",
"description": None,
"doc_md": "details",
"fileloc": __file__,
"file_token": current_file_token,
"is_paused": None,
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ["airflow"],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"value": 1,
"description": None,
"schema": {},
}
},
"schedule_interval": {
"__type": "TimeDelta",
"days": 1,
"microseconds": 0,
"seconds": 0,
},
"start_date": "2020-06-15T00:00:00+00:00",
"tags": [{"name": "example"}],
"timezone": "Timezone('UTC')",
"max_active_runs": 16,
"pickle_id": None,
"end_date": None,
"is_paused_upon_creation": None,
"last_parsed": last_parsed,
"render_template_as_native_obj": False,
}
assert response.json == expected
def test_should_response_200_with_doc_md_none(self, current_file_token):
response = self.client.get(
f"/api/v1/dags/{self.dag2_id}/details", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
last_parsed = response.json["last_parsed"]
expected = {
"catchup": True,
"concurrency": 16,
"max_active_tasks": 16,
"dag_id": "test_dag2",
"dag_run_timeout": None,
"default_view": "grid",
"description": None,
"doc_md": None,
"fileloc": __file__,
"file_token": current_file_token,
"is_paused": None,
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ["airflow"],
"params": {},
"schedule_interval": {
"__type": "TimeDelta",
"days": 1,
"microseconds": 0,
"seconds": 0,
},
"start_date": "2020-06-15T00:00:00+00:00",
"tags": [],
"timezone": "Timezone('UTC')",
"max_active_runs": 16,
"pickle_id": None,
"end_date": None,
"is_paused_upon_creation": None,
"last_parsed": last_parsed,
"render_template_as_native_obj": False,
}
assert response.json == expected
def test_should_response_200_for_null_start_date(self, current_file_token):
response = self.client.get(
f"/api/v1/dags/{self.dag3_id}/details", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
last_parsed = response.json["last_parsed"]
expected = {
"catchup": True,
"concurrency": 16,
"max_active_tasks": 16,
"dag_id": "test_dag3",
"dag_run_timeout": None,
"default_view": "grid",
"description": None,
"doc_md": None,
"fileloc": __file__,
"file_token": current_file_token,
"is_paused": None,
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ["airflow"],
"params": {},
"schedule_interval": {
"__type": "TimeDelta",
"days": 1,
"microseconds": 0,
"seconds": 0,
},
"start_date": None,
"tags": [],
"timezone": "Timezone('UTC')",
"max_active_runs": 16,
"pickle_id": None,
"end_date": None,
"is_paused_upon_creation": None,
"last_parsed": last_parsed,
"render_template_as_native_obj": False,
}
assert response.json == expected
def test_should_respond_200_serialized(self, current_file_token):
# Get the dag out of the dagbag before we patch it to an empty one
SerializedDagModel.write_dag(self.app.dag_bag.get_dag(self.dag_id))
# Create empty app with empty dagbag to check if DAG is read from db
dag_bag = DagBag(os.devnull, include_examples=False, read_dags_from_db=True)
patcher = unittest.mock.patch.object(self.app, "dag_bag", dag_bag)
patcher.start()
expected = {
"catchup": True,
"concurrency": 16,
"max_active_tasks": 16,
"dag_id": "test_dag",
"dag_run_timeout": None,
"default_view": "grid",
"description": None,
"doc_md": "details",
"fileloc": __file__,
"file_token": current_file_token,
"is_paused": None,
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ["airflow"],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"value": 1,
"description": None,
"schema": {},
}
},
"schedule_interval": {
"__type": "TimeDelta",
"days": 1,
"microseconds": 0,
"seconds": 0,
},
"start_date": "2020-06-15T00:00:00+00:00",
"tags": [{"name": "example"}],
"timezone": "Timezone('UTC')",
"max_active_runs": 16,
"pickle_id": None,
"end_date": None,
"is_paused_upon_creation": None,
"render_template_as_native_obj": False,
}
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/details", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
expected.update({"last_parsed": response.json["last_parsed"]})
assert response.json == expected
patcher.stop()
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/details", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
expected = {
"catchup": True,
"concurrency": 16,
"max_active_tasks": 16,
"dag_id": "test_dag",
"dag_run_timeout": None,
"default_view": "grid",
"description": None,
"doc_md": "details",
"fileloc": __file__,
"file_token": current_file_token,
"is_paused": None,
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ["airflow"],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"value": 1,
"description": None,
"schema": {},
}
},
"schedule_interval": {"__type": "TimeDelta", "days": 1, "microseconds": 0, "seconds": 0},
"start_date": "2020-06-15T00:00:00+00:00",
"tags": [{"name": "example"}],
"timezone": "Timezone('UTC')",
"max_active_runs": 16,
"pickle_id": None,
"end_date": None,
"is_paused_upon_creation": None,
"render_template_as_native_obj": False,
}
expected.update({"last_parsed": response.json["last_parsed"]})
assert response.json == expected
def test_should_raises_401_unauthenticated(self):
response = self.client.get(f"/api/v1/dags/{self.dag_id}/details")
assert_401(response)
def test_should_raise_404_when_dag_is_not_found(self):
response = self.client.get(
"/api/v1/dags/non_existing_dag_id/details", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 404
assert response.json == {
"detail": "The DAG with dag_id: non_existing_dag_id was not found",
"status": 404,
"title": "DAG not found",
"type": EXCEPTIONS_LINK_MAP[404],
}
class TestGetDags(TestDagEndpoint):
@provide_session
def test_should_respond_200(self, session, url_safe_serializer):
self._create_dag_models(2)
self._create_deactivated_dag()
dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
assert len(dags_query.all()) == 3
response = self.client.get("api/v1/dags", environ_overrides={"REMOTE_USER": "test"})
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
file_token2 = url_safe_serializer.dumps("/tmp/dag_2.py")
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
{
"dag_id": "TEST_DAG_2",
"description": None,
"fileloc": "/tmp/dag_2.py",
"file_token": file_token2,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
],
"total_entries": 2,
} == response.json
def test_only_active_true_returns_active_dags(self, url_safe_serializer):
self._create_dag_models(1)
self._create_deactivated_dag()
response = self.client.get("api/v1/dags?only_active=True", environ_overrides={"REMOTE_USER": "test"})
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
}
],
"total_entries": 1,
} == response.json
def test_only_active_false_returns_all_dags(self, url_safe_serializer):
self._create_dag_models(1)
self._create_deactivated_dag()
response = self.client.get("api/v1/dags?only_active=False", environ_overrides={"REMOTE_USER": "test"})
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
file_token_2 = url_safe_serializer.dumps("/tmp/dag_del_1.py")
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
{
"dag_id": "TEST_DAG_DELETED_1",
"description": None,
"fileloc": "/tmp/dag_del_1.py",
"file_token": file_token_2,
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
],
"total_entries": 2,
} == response.json
@pytest.mark.parametrize(
"url, expected_dag_ids",
[
("api/v1/dags?tags=t1", ["TEST_DAG_1", "TEST_DAG_3"]),
("api/v1/dags?tags=t2", ["TEST_DAG_2", "TEST_DAG_3"]),
("api/v1/dags?tags=t1,t2", ["TEST_DAG_1", "TEST_DAG_2", "TEST_DAG_3"]),
("api/v1/dags", ["TEST_DAG_1", "TEST_DAG_2", "TEST_DAG_3", "TEST_DAG_4"]),
],
)
def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1", tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4")
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
dag4.sync_to_db()
response = self.client.get(url, environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 200
dag_ids = [dag["dag_id"] for dag in response.json["dags"]]
assert expected_dag_ids == dag_ids
@pytest.mark.parametrize(
"url, expected_dag_ids",
[
("api/v1/dags?dag_id_pattern=DAG_1", {"TEST_DAG_1", "SAMPLE_DAG_1"}),
("api/v1/dags?dag_id_pattern=SAMPLE_DAG", {"SAMPLE_DAG_1", "SAMPLE_DAG_2"}),
(
"api/v1/dags?dag_id_pattern=_DAG_",
{"TEST_DAG_1", "TEST_DAG_2", "SAMPLE_DAG_1", "SAMPLE_DAG_2"},
),
],
)
def test_filter_dags_by_dag_id_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1")
dag2 = DAG(dag_id="TEST_DAG_2")
dag3 = DAG(dag_id="SAMPLE_DAG_1")
dag4 = DAG(dag_id="SAMPLE_DAG_2")
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
dag4.sync_to_db()
response = self.client.get(url, environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 200
dag_ids = {dag["dag_id"] for dag in response.json["dags"]}
assert expected_dag_ids == dag_ids
def test_should_respond_200_with_granular_dag_access(self):
self._create_dag_models(3)
response = self.client.get(
"/api/v1/dags", environ_overrides={"REMOTE_USER": "test_granular_permissions"}
)
assert response.status_code == 200
assert len(response.json["dags"]) == 1
assert response.json["dags"][0]["dag_id"] == "TEST_DAG_1"
@pytest.mark.parametrize(
"url, expected_dag_ids",
[
("api/v1/dags?limit=1", ["TEST_DAG_1"]),
("api/v1/dags?limit=2", ["TEST_DAG_1", "TEST_DAG_10"]),
(
"api/v1/dags?offset=5",
["TEST_DAG_5", "TEST_DAG_6", "TEST_DAG_7", "TEST_DAG_8", "TEST_DAG_9"],
),
(
"api/v1/dags?offset=0",
[
"TEST_DAG_1",
"TEST_DAG_10",
"TEST_DAG_2",
"TEST_DAG_3",
"TEST_DAG_4",
"TEST_DAG_5",
"TEST_DAG_6",
"TEST_DAG_7",
"TEST_DAG_8",
"TEST_DAG_9",
],
),
("api/v1/dags?limit=1&offset=5", ["TEST_DAG_5"]),
("api/v1/dags?limit=1&offset=1", ["TEST_DAG_10"]),
("api/v1/dags?limit=2&offset=2", ["TEST_DAG_2", "TEST_DAG_3"]),
],
)
def test_should_respond_200_and_handle_pagination(self, url, expected_dag_ids):
self._create_dag_models(10)
response = self.client.get(url, environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 200
dag_ids = [dag["dag_id"] for dag in response.json["dags"]]
assert expected_dag_ids == dag_ids
assert 10 == response.json["total_entries"]
def test_should_respond_200_default_limit(self):
self._create_dag_models(101)
response = self.client.get("api/v1/dags", environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 200
assert 100 == len(response.json["dags"])
assert 101 == response.json["total_entries"]
def test_should_raises_401_unauthenticated(self):
response = self.client.get("api/v1/dags")
assert_401(response)
def test_should_respond_403_unauthorized(self):
self._create_dag_models(1)
response = self.client.get("api/v1/dags", environ_overrides={"REMOTE_USER": "test_no_permissions"})
assert response.status_code == 403
def test_paused_true_returns_paused_dags(self, url_safe_serializer):
self._create_dag_models(1, dag_id_prefix="TEST_DAG_PAUSED", is_paused=True)
self._create_dag_models(1, dag_id_prefix="TEST_DAG_UNPAUSED", is_paused=False)
response = self.client.get("api/v1/dags?paused=True", environ_overrides={"REMOTE_USER": "test"})
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_PAUSED_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": True,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
}
],
"total_entries": 1,
} == response.json
def test_paused_false_returns_unpaused_dags(self, url_safe_serializer):
self._create_dag_models(1, dag_id_prefix="TEST_DAG_PAUSED", is_paused=True)
self._create_dag_models(1, dag_id_prefix="TEST_DAG_UNPAUSED", is_paused=False)
response = self.client.get("api/v1/dags?paused=False", environ_overrides={"REMOTE_USER": "test"})
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_UNPAUSED_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
}
],
"total_entries": 1,
} == response.json
def test_paused_none_returns_all_dags(self, url_safe_serializer):
self._create_dag_models(1, dag_id_prefix="TEST_DAG_PAUSED", is_paused=True)
self._create_dag_models(1, dag_id_prefix="TEST_DAG_UNPAUSED", is_paused=False)
response = self.client.get("api/v1/dags", environ_overrides={"REMOTE_USER": "test"})
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_PAUSED_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": True,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
{
"dag_id": "TEST_DAG_UNPAUSED_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
],
"total_entries": 2,
} == response.json
class TestPatchDag(TestDagEndpoint):
def test_should_respond_200_on_patch_is_paused(self, url_safe_serializer):
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
dag_model = self._create_dag_model()
response = self.client.patch(
f"/api/v1/dags/{dag_model.dag_id}",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
expected_response = {
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
}
assert response.json == expected_response
def test_should_respond_200_on_patch_with_granular_dag_access(self):
self._create_dag_models(1)
response = self.client.patch(
"/api/v1/dags/TEST_DAG_1",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test_granular_permissions"},
)
assert response.status_code == 200
def test_should_respond_400_on_invalid_request(self):
patch_body = {
"is_paused": True,
"schedule_interval": {
"__type": "CronExpression",
"value": "1 1 * * *",
},
}
dag_model = self._create_dag_model()
response = self.client.patch(f"/api/v1/dags/{dag_model.dag_id}", json=patch_body)
assert response.status_code == 400
assert response.json == {
"detail": "Property is read-only - 'schedule_interval'",
"status": 400,
"title": "Bad Request",
"type": EXCEPTIONS_LINK_MAP[400],
}
def test_should_respond_404(self):
response = self.client.get("/api/v1/dags/INVALID_DAG", environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 404
@provide_session
def _create_dag_model(self, session=None):
dag_model = DagModel(
dag_id="TEST_DAG_1", fileloc="/tmp/dag_1.py", schedule_interval="2 2 * * *", is_paused=True
)
session.add(dag_model)
return dag_model
def test_should_raises_401_unauthenticated(self):
dag_model = self._create_dag_model()
response = self.client.patch(
f"/api/v1/dags/{dag_model.dag_id}",
json={
"is_paused": False,
},
)
assert_401(response)
def test_should_respond_200_with_update_mask(self, url_safe_serializer):
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
dag_model = self._create_dag_model()
payload = {
"is_paused": False,
}
response = self.client.patch(
f"/api/v1/dags/{dag_model.dag_id}?update_mask=is_paused",
json=payload,
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
expected_response = {
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
}
assert response.json == expected_response
@pytest.mark.parametrize(
"payload, update_mask, error_message",
[
(
{
"is_paused": True,
},
"update_mask=description",
"Only `is_paused` field can be updated through the REST API",
),
(
{
"is_paused": True,
},
"update_mask=schedule_interval, description",
"Only `is_paused` field can be updated through the REST API",
),
],
)
def test_should_respond_400_for_invalid_fields_in_update_mask(self, payload, update_mask, error_message):
dag_model = self._create_dag_model()
response = self.client.patch(
f"/api/v1/dags/{dag_model.dag_id}?{update_mask}",
json=payload,
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 400
assert response.json["detail"] == error_message
def test_should_respond_403_unauthorized(self):
dag_model = self._create_dag_model()
response = self.client.patch(
f"/api/v1/dags/{dag_model.dag_id}",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test_no_permissions"},
)
assert response.status_code == 403
class TestPatchDags(TestDagEndpoint):
@provide_session
def test_should_respond_200_on_patch_is_paused(self, session, url_safe_serializer):
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
file_token2 = url_safe_serializer.dumps("/tmp/dag_2.py")
self._create_dag_models(2)
self._create_deactivated_dag()
dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
assert len(dags_query.all()) == 3
response = self.client.patch(
"/api/v1/dags?dag_id_pattern=~",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
{
"dag_id": "TEST_DAG_2",
"description": None,
"fileloc": "/tmp/dag_2.py",
"file_token": file_token2,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
],
"total_entries": 2,
} == response.json
def test_only_active_true_returns_active_dags(self, url_safe_serializer):
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
self._create_dag_models(1)
self._create_deactivated_dag()
response = self.client.patch(
"/api/v1/dags?only_active=True&dag_id_pattern=~",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
}
],
"total_entries": 1,
} == response.json
def test_only_active_false_returns_all_dags(self, url_safe_serializer):
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
self._create_dag_models(1)
self._create_deactivated_dag()
response = self.client.patch(
"/api/v1/dags?only_active=False&dag_id_pattern=~",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)
file_token_2 = url_safe_serializer.dumps("/tmp/dag_del_1.py")
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
{
"dag_id": "TEST_DAG_DELETED_1",
"description": None,
"fileloc": "/tmp/dag_del_1.py",
"file_token": file_token_2,
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
],
"total_entries": 2,
} == response.json
@pytest.mark.parametrize(
"url, expected_dag_ids",
[
("api/v1/dags?tags=t1&dag_id_pattern=~", ["TEST_DAG_1", "TEST_DAG_3"]),
("api/v1/dags?tags=t2&dag_id_pattern=~", ["TEST_DAG_2", "TEST_DAG_3"]),
("api/v1/dags?tags=t1,t2&dag_id_pattern=~", ["TEST_DAG_1", "TEST_DAG_2", "TEST_DAG_3"]),
("api/v1/dags?dag_id_pattern=~", ["TEST_DAG_1", "TEST_DAG_2", "TEST_DAG_3", "TEST_DAG_4"]),
],
)
def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1", tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4")
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
dag4.sync_to_db()
response = self.client.patch(
url,
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
dag_ids = [dag["dag_id"] for dag in response.json["dags"]]
assert expected_dag_ids == dag_ids
@pytest.mark.parametrize(
"url, expected_dag_ids",
[
("api/v1/dags?dag_id_pattern=DAG_1", {"TEST_DAG_1", "SAMPLE_DAG_1"}),
("api/v1/dags?dag_id_pattern=SAMPLE_DAG", {"SAMPLE_DAG_1", "SAMPLE_DAG_2"}),
(
"api/v1/dags?dag_id_pattern=_DAG_",
{"TEST_DAG_1", "TEST_DAG_2", "SAMPLE_DAG_1", "SAMPLE_DAG_2"},
),
],
)
def test_filter_dags_by_dag_id_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1")
dag2 = DAG(dag_id="TEST_DAG_2")
dag3 = DAG(dag_id="SAMPLE_DAG_1")
dag4 = DAG(dag_id="SAMPLE_DAG_2")
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
dag4.sync_to_db()
response = self.client.patch(
url,
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
dag_ids = {dag["dag_id"] for dag in response.json["dags"]}
assert expected_dag_ids == dag_ids
def test_should_respond_200_with_granular_dag_access(self):
self._create_dag_models(3)
response = self.client.patch(
"api/v1/dags?dag_id_pattern=~",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test_granular_permissions"},
)
assert response.status_code == 200
assert len(response.json["dags"]) == 1
assert response.json["dags"][0]["dag_id"] == "TEST_DAG_1"
@pytest.mark.parametrize(
"url, expected_dag_ids",
[
("api/v1/dags?limit=1&dag_id_pattern=~", ["TEST_DAG_1"]),
("api/v1/dags?limit=2&dag_id_pattern=~", ["TEST_DAG_1", "TEST_DAG_10"]),
(
"api/v1/dags?offset=5&dag_id_pattern=~",
["TEST_DAG_5", "TEST_DAG_6", "TEST_DAG_7", "TEST_DAG_8", "TEST_DAG_9"],
),
(
"api/v1/dags?offset=0&dag_id_pattern=~",
[
"TEST_DAG_1",
"TEST_DAG_10",
"TEST_DAG_2",
"TEST_DAG_3",
"TEST_DAG_4",
"TEST_DAG_5",
"TEST_DAG_6",
"TEST_DAG_7",
"TEST_DAG_8",
"TEST_DAG_9",
],
),
("api/v1/dags?limit=1&offset=5&dag_id_pattern=~", ["TEST_DAG_5"]),
("api/v1/dags?limit=1&offset=1&dag_id_pattern=~", ["TEST_DAG_10"]),
("api/v1/dags?limit=2&offset=2&dag_id_pattern=~", ["TEST_DAG_2", "TEST_DAG_3"]),
],
)
def test_should_respond_200_and_handle_pagination(self, url, expected_dag_ids):
self._create_dag_models(10)
response = self.client.patch(
url,
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
dag_ids = [dag["dag_id"] for dag in response.json["dags"]]
assert expected_dag_ids == dag_ids
assert 10 == response.json["total_entries"]
def test_should_respond_200_default_limit(self):
self._create_dag_models(101)
response = self.client.patch(
"api/v1/dags?dag_id_pattern=~",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
assert 100 == len(response.json["dags"])
assert 101 == response.json["total_entries"]
def test_should_raises_401_unauthenticated(self):
response = self.client.patch(
"api/v1/dags?dag_id_pattern=~",
json={
"is_paused": False,
},
)
assert_401(response)
def test_should_respond_403_unauthorized(self):
self._create_dag_models(1)
response = self.client.patch(
"api/v1/dags?dag_id_pattern=~",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test_no_permissions"},
)
assert response.status_code == 403
def test_should_respond_200_and_pause_dags(self, url_safe_serializer):
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
file_token2 = url_safe_serializer.dumps("/tmp/dag_2.py")
self._create_dag_models(2)
response = self.client.patch(
"/api/v1/dags?dag_id_pattern=~",
json={
"is_paused": True,
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": True,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
{
"dag_id": "TEST_DAG_2",
"description": None,
"fileloc": "/tmp/dag_2.py",
"file_token": file_token2,
"is_paused": True,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
],
"total_entries": 2,
} == response.json
@provide_session
def test_should_respond_200_and_pause_dag_pattern(self, session, url_safe_serializer):
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
self._create_dag_models(10)
file_token10 = url_safe_serializer.dumps("/tmp/dag_10.py")
response = self.client.patch(
"/api/v1/dags?dag_id_pattern=TEST_DAG_1",
json={
"is_paused": True,
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": True,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
{
"dag_id": "TEST_DAG_10",
"description": None,
"fileloc": "/tmp/dag_10.py",
"file_token": file_token10,
"is_paused": True,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
],
"total_entries": 2,
} == response.json
dags_not_updated = session.query(DagModel).filter(~DagModel.is_paused)
assert len(dags_not_updated.all()) == 8
dags_updated = session.query(DagModel).filter(DagModel.is_paused)
assert len(dags_updated.all()) == 2
@provide_session
def test_should_respond_200_and_reverse_ordering(self, session, url_safe_serializer):
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
self._create_dag_models(2)
file_token10 = url_safe_serializer.dumps("/tmp/dag_2.py")
response = self.client.get(
"/api/v1/dags?order_by=-dag_id",
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_2",
"description": None,
"fileloc": "/tmp/dag_2.py",
"file_token": file_token10,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
],
"total_entries": 2,
} == response.json
def test_should_respons_400_dag_id_pattern_missing(self):
self._create_dag_models(1)
response = self.client.patch(
"/api/v1/dags?only_active=True",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 400