blob: 3ff7c4b82b9d246a2bc8b1638a2be0096b01325b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
import unittest.mock
from datetime import datetime
import pytest
from airflow import DAG
from airflow.models import DagBag
from airflow.models.expandinput import EXPAND_INPUT_EMPTY
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.empty import EmptyOperator
from airflow.security import permissions
from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user
from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags
@pytest.fixture(scope="module")
def configured_app(minimal_app_for_api):
app = minimal_app_for_api
create_user(
app, # type: ignore
username="test",
role_name="Test",
permissions=[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
],
)
create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore
yield app
delete_user(app, username="test") # type: ignore
delete_user(app, username="test_no_permissions") # type: ignore
class TestTaskEndpoint:
dag_id = "test_dag"
mapped_dag_id = "test_mapped_task"
task_id = "op1"
task_id2 = "op2"
task_id3 = "op3"
mapped_task_id = "mapped_task"
task1_start_date = datetime(2020, 6, 15)
task2_start_date = datetime(2020, 6, 16)
@pytest.fixture(scope="class")
def setup_dag(self, configured_app):
with DAG(self.dag_id, start_date=self.task1_start_date, doc_md="details") as dag:
task1 = EmptyOperator(task_id=self.task_id, params={"foo": "bar"})
task2 = EmptyOperator(task_id=self.task_id2, start_date=self.task2_start_date)
with DAG(self.mapped_dag_id, start_date=self.task1_start_date) as mapped_dag:
EmptyOperator(task_id=self.task_id3)
# Use the private _expand() method to avoid the empty kwargs check.
# We don't care about how the operator runs here, only its presence.
EmptyOperator.partial(task_id=self.mapped_task_id)._expand(EXPAND_INPUT_EMPTY, strict=False)
task1 >> task2
dag_bag = DagBag(os.devnull, include_examples=False)
dag_bag.dags = {dag.dag_id: dag, mapped_dag.dag_id: mapped_dag}
configured_app.dag_bag = dag_bag # type:ignore
@staticmethod
def clean_db():
clear_db_runs()
clear_db_dags()
clear_db_serialized_dags()
@pytest.fixture(autouse=True)
def setup_attrs(self, configured_app, setup_dag) -> None:
self.clean_db()
self.app = configured_app
self.client = self.app.test_client() # type:ignore
def teardown_method(self) -> None:
self.clean_db()
class TestGetTask(TestTaskEndpoint):
def test_should_respond_200(self):
expected = {
"class_ref": {
"class_name": "EmptyOperator",
"module_path": "airflow.operators.empty",
},
"depends_on_past": False,
"downstream_task_ids": [self.task_id2],
"end_date": None,
"execution_timeout": None,
"extra_links": [],
"operator_name": "EmptyOperator",
"owner": "airflow",
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"value": "bar",
"description": None,
"schema": {},
}
},
"pool": "default_pool",
"pool_slots": 1.0,
"priority_weight": 1.0,
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
"retry_exponential_backoff": False,
"start_date": "2020-06-15T00:00:00+00:00",
"task_id": "op1",
"template_fields": [],
"trigger_rule": "all_success",
"ui_color": "#e8f7e4",
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
"is_mapped": False,
}
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/tasks/{self.task_id}", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
assert response.json == expected
def test_mapped_task(self):
expected = {
"class_ref": {"class_name": "EmptyOperator", "module_path": "airflow.operators.empty"},
"depends_on_past": False,
"downstream_task_ids": [],
"end_date": None,
"execution_timeout": None,
"extra_links": [],
"is_mapped": True,
"operator_name": "EmptyOperator",
"owner": "airflow",
"params": {},
"pool": "default_pool",
"pool_slots": 1.0,
"priority_weight": 1.0,
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "microseconds": 0, "seconds": 300},
"retry_exponential_backoff": False,
"start_date": "2020-06-15T00:00:00+00:00",
"task_id": "mapped_task",
"template_fields": [],
"trigger_rule": "all_success",
"ui_color": "#e8f7e4",
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
}
response = self.client.get(
f"/api/v1/dags/{self.mapped_dag_id}/tasks/{self.mapped_task_id}",
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
assert response.json == expected
def test_should_respond_200_serialized(self):
# Get the dag out of the dagbag before we patch it to an empty one
SerializedDagModel.write_dag(self.app.dag_bag.get_dag(self.dag_id))
dag_bag = DagBag(os.devnull, include_examples=False, read_dags_from_db=True)
patcher = unittest.mock.patch.object(self.app, "dag_bag", dag_bag)
patcher.start()
expected = {
"class_ref": {
"class_name": "EmptyOperator",
"module_path": "airflow.operators.empty",
},
"depends_on_past": False,
"downstream_task_ids": [self.task_id2],
"end_date": None,
"execution_timeout": None,
"extra_links": [],
"operator_name": "EmptyOperator",
"owner": "airflow",
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"value": "bar",
"description": None,
"schema": {},
}
},
"pool": "default_pool",
"pool_slots": 1.0,
"priority_weight": 1.0,
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
"retry_exponential_backoff": False,
"start_date": "2020-06-15T00:00:00+00:00",
"task_id": "op1",
"template_fields": [],
"trigger_rule": "all_success",
"ui_color": "#e8f7e4",
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
"is_mapped": False,
}
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/tasks/{self.task_id}", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
assert response.json == expected
patcher.stop()
def test_should_respond_404(self):
task_id = "xxxx_not_existing"
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/tasks/{task_id}", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 404
def test_should_raises_401_unauthenticated(self):
response = self.client.get(f"/api/v1/dags/{self.dag_id}/tasks/{self.task_id}")
assert_401(response)
def test_should_raise_403_forbidden(self):
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/tasks", environ_overrides={"REMOTE_USER": "test_no_permissions"}
)
assert response.status_code == 403
class TestGetTasks(TestTaskEndpoint):
def test_should_respond_200(self):
expected = {
"tasks": [
{
"class_ref": {
"class_name": "EmptyOperator",
"module_path": "airflow.operators.empty",
},
"depends_on_past": False,
"downstream_task_ids": [self.task_id2],
"end_date": None,
"execution_timeout": None,
"extra_links": [],
"operator_name": "EmptyOperator",
"owner": "airflow",
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"value": "bar",
"description": None,
"schema": {},
}
},
"pool": "default_pool",
"pool_slots": 1.0,
"priority_weight": 1.0,
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
"retry_exponential_backoff": False,
"start_date": "2020-06-15T00:00:00+00:00",
"task_id": "op1",
"template_fields": [],
"trigger_rule": "all_success",
"ui_color": "#e8f7e4",
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
"is_mapped": False,
},
{
"class_ref": {
"class_name": "EmptyOperator",
"module_path": "airflow.operators.empty",
},
"depends_on_past": False,
"downstream_task_ids": [],
"end_date": None,
"execution_timeout": None,
"extra_links": [],
"operator_name": "EmptyOperator",
"owner": "airflow",
"params": {},
"pool": "default_pool",
"pool_slots": 1.0,
"priority_weight": 1.0,
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
"retry_exponential_backoff": False,
"start_date": "2020-06-16T00:00:00+00:00",
"task_id": self.task_id2,
"template_fields": [],
"trigger_rule": "all_success",
"ui_color": "#e8f7e4",
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
"is_mapped": False,
},
],
"total_entries": 2,
}
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/tasks", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
assert response.json == expected
def test_get_tasks_mapped(self):
expected = {
"tasks": [
{
"class_ref": {"class_name": "EmptyOperator", "module_path": "airflow.operators.empty"},
"depends_on_past": False,
"downstream_task_ids": [],
"end_date": None,
"execution_timeout": None,
"extra_links": [],
"is_mapped": True,
"operator_name": "EmptyOperator",
"owner": "airflow",
"params": {},
"pool": "default_pool",
"pool_slots": 1.0,
"priority_weight": 1.0,
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "microseconds": 0, "seconds": 300},
"retry_exponential_backoff": False,
"start_date": "2020-06-15T00:00:00+00:00",
"task_id": "mapped_task",
"template_fields": [],
"trigger_rule": "all_success",
"ui_color": "#e8f7e4",
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
},
{
"class_ref": {
"class_name": "EmptyOperator",
"module_path": "airflow.operators.empty",
},
"depends_on_past": False,
"downstream_task_ids": [],
"end_date": None,
"execution_timeout": None,
"extra_links": [],
"operator_name": "EmptyOperator",
"owner": "airflow",
"params": {},
"pool": "default_pool",
"pool_slots": 1.0,
"priority_weight": 1.0,
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
"retry_exponential_backoff": False,
"start_date": "2020-06-15T00:00:00+00:00",
"task_id": self.task_id3,
"template_fields": [],
"trigger_rule": "all_success",
"ui_color": "#e8f7e4",
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
"is_mapped": False,
},
],
"total_entries": 2,
}
response = self.client.get(
f"/api/v1/dags/{self.mapped_dag_id}/tasks", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
assert response.json == expected
def test_should_respond_200_ascending_order_by_start_date(self):
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/tasks?order_by=start_date",
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
assert self.task1_start_date < self.task2_start_date
assert response.json["tasks"][0]["task_id"] == self.task_id
assert response.json["tasks"][1]["task_id"] == self.task_id2
def test_should_respond_200_descending_order_by_start_date(self):
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/tasks?order_by=-start_date",
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
# - means is descending
assert self.task1_start_date < self.task2_start_date
assert response.json["tasks"][0]["task_id"] == self.task_id2
assert response.json["tasks"][1]["task_id"] == self.task_id
def test_should_raise_400_for_invalid_order_by_name(self):
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/tasks?order_by=invalid_task_colume_name",
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 400
assert response.json["detail"] == "'EmptyOperator' object has no attribute 'invalid_task_colume_name'"
def test_should_respond_404(self):
dag_id = "xxxx_not_existing"
response = self.client.get(f"/api/v1/dags/{dag_id}/tasks", environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 404
def test_should_raises_401_unauthenticated(self):
response = self.client.get(f"/api/v1/dags/{self.dag_id}/tasks")
assert_401(response)