| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import annotations |
| |
| import html |
| import json |
| import unittest.mock |
| import urllib.parse |
| from getpass import getuser |
| |
| import pendulum |
| import pytest |
| import time_machine |
| |
| from airflow import settings |
| from airflow.models.dag import DAG, DagModel |
| from airflow.models.dagbag import DagBag |
| from airflow.models.dagcode import DagCode |
| from airflow.models.taskinstance import TaskInstance |
| from airflow.models.taskreschedule import TaskReschedule |
| from airflow.models.xcom import XCom |
| from airflow.operators.bash import BashOperator |
| from airflow.operators.empty import EmptyOperator |
| from airflow.providers.celery.executors.celery_executor import CeleryExecutor |
| from airflow.security import permissions |
| from airflow.utils import timezone |
| from airflow.utils.log.logging_mixin import ExternalLoggingMixin |
| from airflow.utils.session import create_session |
| from airflow.utils.state import DagRunState, State |
| from airflow.utils.types import DagRunType |
| from airflow.www.views import TaskInstanceModelView |
| from tests.test_utils.api_connexion_utils import create_user, delete_roles, delete_user |
| from tests.test_utils.config import conf_vars |
| from tests.test_utils.db import clear_db_runs, clear_db_xcom |
| from tests.test_utils.www import check_content_in_response, check_content_not_in_response, client_with_login |
| |
| pytestmark = pytest.mark.db_test |
| |
| DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) |
| STR_DEFAULT_DATE = urllib.parse.quote(DEFAULT_DATE.strftime("%Y-%m-%dT%H:%M:%S.%f%z")) |
| |
| DEFAULT_VAL = urllib.parse.quote_plus(str(DEFAULT_DATE)) |
| |
| DEFAULT_DAGRUN = "TEST_DAGRUN" |
| |
| |
| @pytest.fixture(scope="module", autouse=True) |
| def reset_dagruns(): |
| """Clean up stray garbage from other tests.""" |
| clear_db_runs() |
| |
| |
| @pytest.fixture(autouse=True) |
| def init_dagruns(app, reset_dagruns): |
| with time_machine.travel(DEFAULT_DATE, tick=False): |
| app.dag_bag.get_dag("example_bash_operator").create_dagrun( |
| run_id=DEFAULT_DAGRUN, |
| run_type=DagRunType.SCHEDULED, |
| execution_date=DEFAULT_DATE, |
| data_interval=(DEFAULT_DATE, DEFAULT_DATE), |
| start_date=timezone.utcnow(), |
| state=State.RUNNING, |
| ) |
| XCom.set( |
| key="return_value", |
| value="{'x':1}", |
| task_id="runme_0", |
| dag_id="example_bash_operator", |
| execution_date=DEFAULT_DATE, |
| ) |
| app.dag_bag.get_dag("example_subdag_operator").create_dagrun( |
| run_id=DEFAULT_DAGRUN, |
| run_type=DagRunType.SCHEDULED, |
| execution_date=DEFAULT_DATE, |
| data_interval=(DEFAULT_DATE, DEFAULT_DATE), |
| start_date=timezone.utcnow(), |
| state=State.RUNNING, |
| ) |
| app.dag_bag.get_dag("example_xcom").create_dagrun( |
| run_id=DEFAULT_DAGRUN, |
| run_type=DagRunType.SCHEDULED, |
| execution_date=DEFAULT_DATE, |
| data_interval=(DEFAULT_DATE, DEFAULT_DATE), |
| start_date=timezone.utcnow(), |
| state=State.RUNNING, |
| ) |
| app.dag_bag.get_dag("latest_only").create_dagrun( |
| run_id=DEFAULT_DAGRUN, |
| run_type=DagRunType.SCHEDULED, |
| execution_date=DEFAULT_DATE, |
| data_interval=(DEFAULT_DATE, DEFAULT_DATE), |
| start_date=timezone.utcnow(), |
| state=State.RUNNING, |
| ) |
| app.dag_bag.get_dag("example_task_group").create_dagrun( |
| run_id=DEFAULT_DAGRUN, |
| run_type=DagRunType.SCHEDULED, |
| execution_date=DEFAULT_DATE, |
| data_interval=(DEFAULT_DATE, DEFAULT_DATE), |
| start_date=timezone.utcnow(), |
| state=State.RUNNING, |
| ) |
| yield |
| clear_db_runs() |
| clear_db_xcom() |
| |
| |
| @pytest.fixture(scope="module") |
| def client_ti_without_dag_edit(app): |
| create_user( |
| app, |
| username="all_ti_permissions_except_dag_edit", |
| role_name="all_ti_permissions_except_dag_edit", |
| permissions=[ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), |
| (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE), |
| (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_TASK_INSTANCE), |
| (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN), |
| ], |
| ) |
| |
| yield client_with_login( |
| app, |
| username="all_ti_permissions_except_dag_edit", |
| password="all_ti_permissions_except_dag_edit", |
| ) |
| |
| delete_user(app, username="all_ti_permissions_except_dag_edit") # type: ignore |
| delete_roles(app) |
| |
| |
| @pytest.mark.parametrize( |
| "url, contents", |
| [ |
| pytest.param( |
| "/", |
| [ |
| "/delete?dag_id=example_bash_operator", |
| "return confirmDeleteDag(this, 'example_bash_operator')", |
| ], |
| id="delete-dag-button-normal", |
| ), |
| pytest.param( |
| f"task?task_id=runme_0&dag_id=example_bash_operator&execution_date={DEFAULT_VAL}", |
| ["Grid"], |
| id="task-grid-button", |
| ), |
| pytest.param( |
| f"task?task_id=runme_0&dag_id=example_bash_operator&execution_date={DEFAULT_VAL}", |
| ["Task Instance Details"], |
| id="task", |
| ), |
| pytest.param( |
| f"log?task_id=runme_0&dag_id=example_bash_operator&execution_date={DEFAULT_VAL}", |
| ["Grid"], |
| id="log-grid-button", |
| ), |
| pytest.param( |
| f"xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={DEFAULT_VAL}", |
| ["XCom"], |
| id="xcom", |
| ), |
| pytest.param( |
| f"xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={DEFAULT_VAL}", |
| ["Grid"], |
| id="xcom-grid-button", |
| ), |
| pytest.param("xcom/list", ["List XComs"], id="xcom-list"), |
| pytest.param( |
| f"rendered-templates?task_id=runme_0&dag_id=example_bash_operator&execution_date={DEFAULT_VAL}", |
| ["Rendered Template"], |
| id="rendered-templates", |
| ), |
| pytest.param( |
| f"rendered-templates?task_id=runme_0&dag_id=example_bash_operator&execution_date={DEFAULT_VAL}", |
| ["Grid"], |
| id="rendered-templates-grid-button", |
| ), |
| pytest.param( |
| "object/graph_data?dag_id=example_bash_operator", |
| ["runme_1"], |
| id="graph-data", |
| ), |
| pytest.param( |
| "object/graph_data?dag_id=example_subdag_operator.section-1", |
| ["section-1-task-1"], |
| id="graph-data-subdag", |
| ), |
| pytest.param( |
| "object/grid_data?dag_id=example_bash_operator", |
| ["runme_1"], |
| id="grid-data", |
| ), |
| pytest.param( |
| "object/grid_data?dag_id=example_subdag_operator.section-1", |
| ["section-1-task-1"], |
| id="grid-data-subdag", |
| ), |
| pytest.param( |
| "duration?days=30&dag_id=example_bash_operator", |
| ["example_bash_operator"], |
| id="duration-url-param", |
| ), |
| pytest.param( |
| "dags/example_bash_operator/duration?days=30", |
| ["example_bash_operator"], |
| id="duration", |
| ), |
| pytest.param( |
| "duration?days=30&dag_id=missing_dag", |
| ["seems to be missing"], |
| id="duration-missing-url-param", |
| ), |
| pytest.param( |
| "dags/missing_dag/duration?days=30", |
| ["seems to be missing"], |
| id="duration-missing", |
| ), |
| pytest.param( |
| "tries?days=30&dag_id=example_bash_operator", |
| ["example_bash_operator"], |
| id="tries-url-param", |
| ), |
| pytest.param( |
| "dags/example_bash_operator/tries?days=30", |
| ["example_bash_operator"], |
| id="tries", |
| ), |
| pytest.param( |
| "landing_times?days=30&dag_id=example_bash_operator", |
| ["example_bash_operator"], |
| id="landing-times-url-param", |
| ), |
| pytest.param( |
| "dags/example_bash_operator/landing-times?days=30", |
| ["example_bash_operator"], |
| id="landing-times", |
| ), |
| pytest.param( |
| "gantt?dag_id=example_bash_operator", |
| ["example_bash_operator"], |
| id="gantt-url-param", |
| ), |
| pytest.param( |
| "dags/example_bash_operator/gantt", |
| ["example_bash_operator"], |
| id="gantt", |
| ), |
| pytest.param( |
| "dag-dependencies", |
| ["child_task1", "test_trigger_dagrun"], |
| id="dag-dependencies", |
| ), |
| # Test that Graph, Tree, Calendar & Dag Details View uses the DagBag |
| # already created in views.py |
| pytest.param( |
| "graph?dag_id=example_bash_operator", |
| ["example_bash_operator"], |
| id="existing-dagbag-graph-url-param", |
| ), |
| pytest.param( |
| "dags/example_bash_operator/graph", |
| ["example_bash_operator"], |
| id="existing-dagbag-graph", |
| ), |
| pytest.param( |
| "tree?dag_id=example_bash_operator", |
| ["example_bash_operator"], |
| id="existing-dagbag-tree-url-param", |
| ), |
| pytest.param( |
| "dags/example_bash_operator/grid", |
| ["example_bash_operator"], |
| id="existing-dagbag-grid", |
| ), |
| pytest.param( |
| "calendar?dag_id=example_bash_operator", |
| ["example_bash_operator"], |
| id="existing-dagbag-calendar-url-param", |
| ), |
| pytest.param( |
| "dags/example_bash_operator/calendar", |
| ["example_bash_operator"], |
| id="existing-dagbag-calendar", |
| ), |
| pytest.param( |
| "dags/latest_only/calendar", |
| ["latest_only"], |
| id="existing-dagbag-non-cron-schedule-calendar", |
| ), |
| pytest.param( |
| "dag_details?dag_id=example_bash_operator", |
| ["example_bash_operator"], |
| id="existing-dagbag-dag-details-url-param", |
| ), |
| pytest.param( |
| "dags/example_bash_operator/details", |
| ["example_bash_operator"], |
| id="existing-dagbag-dag-details", |
| ), |
| pytest.param( |
| f"confirm?task_id=runme_0&dag_id=example_bash_operator&state=success" |
| f"&dag_run_id={DEFAULT_DAGRUN}", |
| ["Wait a minute"], |
| id="confirm-success", |
| ), |
| pytest.param( |
| f"confirm?task_id=runme_0&dag_id=example_bash_operator&state=failed&dag_run_id={DEFAULT_DAGRUN}", |
| ["Wait a minute"], |
| id="confirm-failed", |
| ), |
| pytest.param( |
| f"confirm?task_id=runme_0&dag_id=invalid_dag&state=failed&dag_run_id={DEFAULT_DAGRUN}", |
| ["DAG invalid_dag not found"], |
| id="confirm-failed", |
| ), |
| pytest.param( |
| f"confirm?task_id=invalid_task&dag_id=example_bash_operator&state=failed" |
| f"&dag_run_id={DEFAULT_DAGRUN}", |
| ["Task invalid_task not found"], |
| id="confirm-failed", |
| ), |
| pytest.param( |
| f"confirm?task_id=runme_0&dag_id=example_bash_operator&state=invalid" |
| f"&dag_run_id={DEFAULT_DAGRUN}", |
| ["Invalid state invalid, must be either 'success' or 'failed'"], |
| id="confirm-invalid", |
| ), |
| ], |
| ) |
| def test_views_get(admin_client, url, contents): |
| resp = admin_client.get(url, follow_redirects=True) |
| for content in contents: |
| check_content_in_response(content, resp) |
| |
| |
| def test_xcom_return_value_is_not_bytes(admin_client): |
| url = f"xcom?dag_id=example_bash_operator&task_id=runme_0&execution_date={DEFAULT_VAL}&map_index=-1" |
| resp = admin_client.get(url, follow_redirects=True) |
| # check that {"x":1} is in the response |
| content = "{'x':1}" |
| check_content_in_response(content, resp) |
| # check that b'{"x":1}' is not in the response |
| content = "b'"{\\'x\\':1}"'" |
| check_content_not_in_response(content, resp) |
| |
| |
| def test_rendered_task_view(admin_client): |
| url = f"task?task_id=runme_0&dag_id=example_bash_operator&execution_date={DEFAULT_VAL}" |
| resp = admin_client.get(url, follow_redirects=True) |
| resp_html = resp.data.decode("utf-8") |
| assert resp.status_code == 200 |
| assert "<td>_try_number</td>" not in resp_html |
| assert "<td>try_number</td>" in resp_html |
| |
| |
| def test_rendered_k8s(admin_client): |
| url = f"rendered-k8s?task_id=runme_0&dag_id=example_bash_operator&execution_date={DEFAULT_VAL}" |
| with unittest.mock.patch.object(settings, "IS_K8S_OR_K8SCELERY_EXECUTOR", True): |
| resp = admin_client.get(url, follow_redirects=True) |
| check_content_in_response("K8s Pod Spec", resp) |
| |
| |
| @conf_vars({("core", "executor"): "LocalExecutor"}) |
| def test_rendered_k8s_without_k8s(admin_client): |
| url = f"rendered-k8s?task_id=runme_0&dag_id=example_bash_operator&execution_date={DEFAULT_VAL}" |
| resp = admin_client.get(url, follow_redirects=True) |
| assert 404 == resp.status_code |
| |
| |
| def test_tree_trigger_origin_tree_view(app, admin_client): |
| app.dag_bag.get_dag("test_tree_view").create_dagrun( |
| run_type=DagRunType.SCHEDULED, |
| execution_date=DEFAULT_DATE, |
| data_interval=(DEFAULT_DATE, DEFAULT_DATE), |
| start_date=timezone.utcnow(), |
| state=State.RUNNING, |
| ) |
| |
| url = "tree?dag_id=test_tree_view" |
| resp = admin_client.get(url, follow_redirects=True) |
| params = {"origin": "/dags/test_tree_view/grid"} |
| href = f"/dags/test_tree_view/trigger?{html.escape(urllib.parse.urlencode(params))}" |
| check_content_in_response(href, resp) |
| |
| |
| def test_graph_trigger_origin_grid_view(app, admin_client): |
| app.dag_bag.get_dag("test_tree_view").create_dagrun( |
| run_type=DagRunType.SCHEDULED, |
| execution_date=DEFAULT_DATE, |
| data_interval=(DEFAULT_DATE, DEFAULT_DATE), |
| start_date=timezone.utcnow(), |
| state=State.RUNNING, |
| ) |
| |
| url = "/dags/test_tree_view/graph" |
| resp = admin_client.get(url, follow_redirects=True) |
| params = {"origin": "/dags/test_tree_view/grid?tab=graph"} |
| href = f"/dags/test_tree_view/trigger?{html.escape(urllib.parse.urlencode(params))}" |
| check_content_in_response(href, resp) |
| |
| |
| def test_gantt_trigger_origin_grid_view(app, admin_client): |
| app.dag_bag.get_dag("test_tree_view").create_dagrun( |
| run_type=DagRunType.SCHEDULED, |
| execution_date=DEFAULT_DATE, |
| data_interval=(DEFAULT_DATE, DEFAULT_DATE), |
| start_date=timezone.utcnow(), |
| state=State.RUNNING, |
| ) |
| |
| url = "/dags/test_tree_view/gantt" |
| resp = admin_client.get(url, follow_redirects=True) |
| params = {"origin": "/dags/test_tree_view/grid?tab=gantt"} |
| href = f"/dags/test_tree_view/trigger?{html.escape(urllib.parse.urlencode(params))}" |
| check_content_in_response(href, resp) |
| |
| |
| def test_graph_view_without_dag_permission(app, one_dag_perm_user_client): |
| url = "/dags/example_bash_operator/graph" |
| resp = one_dag_perm_user_client.get(url, follow_redirects=True) |
| assert resp.status_code == 200 |
| assert ( |
| resp.request.url |
| == "http://localhost/dags/example_bash_operator/grid?tab=graph&dag_run_id=TEST_DAGRUN" |
| ) |
| check_content_in_response("example_bash_operator", resp) |
| |
| url = "/dags/example_xcom/graph" |
| resp = one_dag_perm_user_client.get(url, follow_redirects=True) |
| assert resp.status_code == 200 |
| assert resp.request.url == "http://localhost/home" |
| check_content_in_response("Access is Denied", resp) |
| |
| |
| def test_last_dagruns(admin_client): |
| resp = admin_client.post("last_dagruns", follow_redirects=True) |
| check_content_in_response("example_bash_operator", resp) |
| |
| |
| def test_last_dagruns_success_when_selecting_dags(admin_client): |
| resp = admin_client.post( |
| "last_dagruns", data={"dag_ids": ["example_subdag_operator"]}, follow_redirects=True |
| ) |
| assert resp.status_code == 200 |
| stats = json.loads(resp.data.decode("utf-8")) |
| assert "example_bash_operator" not in stats |
| assert "example_subdag_operator" in stats |
| |
| # Multiple |
| resp = admin_client.post( |
| "last_dagruns", |
| data={"dag_ids": ["example_subdag_operator", "example_bash_operator"]}, |
| follow_redirects=True, |
| ) |
| stats = json.loads(resp.data.decode("utf-8")) |
| assert "example_bash_operator" in stats |
| assert "example_subdag_operator" in stats |
| check_content_not_in_response("example_xcom", resp) |
| |
| |
| def test_code(admin_client): |
| url = "code?dag_id=example_bash_operator" |
| resp = admin_client.get(url, follow_redirects=True) |
| check_content_not_in_response("Failed to load DAG file Code", resp) |
| check_content_in_response("example_bash_operator", resp) |
| |
| |
| def test_code_from_db(admin_client): |
| dag = DagBag(include_examples=True).get_dag("example_bash_operator") |
| DagCode(dag.fileloc, DagCode._get_code_from_file(dag.fileloc)).sync_to_db() |
| url = "code?dag_id=example_bash_operator" |
| resp = admin_client.get(url, follow_redirects=True) |
| check_content_not_in_response("Failed to load DAG file Code", resp) |
| check_content_in_response("example_bash_operator", resp) |
| |
| |
| def test_code_from_db_all_example_dags(admin_client): |
| dagbag = DagBag(include_examples=True) |
| for dag in dagbag.dags.values(): |
| DagCode(dag.fileloc, DagCode._get_code_from_file(dag.fileloc)).sync_to_db() |
| url = "code?dag_id=example_bash_operator" |
| resp = admin_client.get(url, follow_redirects=True) |
| check_content_not_in_response("Failed to load DAG file Code", resp) |
| check_content_in_response("example_bash_operator", resp) |
| |
| |
| @pytest.mark.parametrize( |
| "url, data, content", |
| [ |
| ("paused?dag_id=example_bash_operator&is_paused=false", None, "OK"), |
| ( |
| "failed", |
| dict( |
| task_id="run_this_last", |
| dag_id="example_bash_operator", |
| dag_run_id=DEFAULT_DAGRUN, |
| upstream="false", |
| downstream="false", |
| future="false", |
| past="false", |
| origin="/graph?dag_id=example_bash_operator", |
| ), |
| "Marked failed on 1 task instances", |
| ), |
| ( |
| "success", |
| dict( |
| task_id="run_this_last", |
| dag_id="example_bash_operator", |
| dag_run_id=DEFAULT_DAGRUN, |
| upstream="false", |
| downstream="false", |
| future="false", |
| past="false", |
| origin="/graph?dag_id=example_bash_operator", |
| ), |
| "Marked success on 1 task instances", |
| ), |
| ( |
| "clear", |
| dict( |
| task_id="runme_1", |
| dag_id="example_bash_operator", |
| execution_date=DEFAULT_DATE, |
| upstream="false", |
| downstream="false", |
| future="false", |
| past="false", |
| only_failed="false", |
| ), |
| "example_bash_operator", |
| ), |
| ( |
| "clear", |
| dict( |
| group_id="section_1", |
| dag_id="example_task_group", |
| execution_date=DEFAULT_DATE, |
| upstream="false", |
| downstream="false", |
| future="false", |
| past="false", |
| only_failed="false", |
| ), |
| "example_task_group", |
| ), |
| ], |
| ids=[ |
| "paused", |
| "failed-flash-hint", |
| "success-flash-hint", |
| "clear", |
| "clear-task-group", |
| ], |
| ) |
| def test_views_post(admin_client, url, data, content): |
| resp = admin_client.post(url, data=data, follow_redirects=True) |
| check_content_in_response(content, resp) |
| |
| |
| @pytest.mark.parametrize("url", ["failed", "success"]) |
| def test_dag_never_run(admin_client, url): |
| dag_id = "example_bash_operator" |
| form = dict( |
| task_id="run_this_last", |
| dag_id=dag_id, |
| execution_date=DEFAULT_DATE, |
| upstream="false", |
| downstream="false", |
| future="false", |
| past="false", |
| origin="/graph?dag_id=example_bash_operator", |
| ) |
| clear_db_runs() |
| resp = admin_client.post(url, data=form, follow_redirects=True) |
| check_content_in_response(f"Cannot mark tasks as {url}, seem that DAG {dag_id} has never run", resp) |
| |
| |
| class _ForceHeartbeatCeleryExecutor(CeleryExecutor): |
| def heartbeat(self): |
| return True |
| |
| |
| @pytest.fixture |
| def new_id_example_bash_operator(): |
| dag_id = "example_bash_operator" |
| test_dag_id = "non_existent_dag" |
| with create_session() as session: |
| dag_query = session.query(DagModel).filter(DagModel.dag_id == dag_id) |
| dag_query.first().tags = [] # To avoid "FOREIGN KEY constraint" error) |
| with create_session() as session: |
| dag_query.update({"dag_id": test_dag_id}) |
| yield test_dag_id |
| with create_session() as session: |
| session.query(DagModel).filter(DagModel.dag_id == test_dag_id).update({"dag_id": dag_id}) |
| |
| |
| def test_delete_dag_button_for_dag_on_scheduler_only(admin_client, new_id_example_bash_operator): |
| # The delete-dag URL should be generated correctly |
| test_dag_id = new_id_example_bash_operator |
| resp = admin_client.get("/", follow_redirects=True) |
| check_content_in_response(f"/delete?dag_id={test_dag_id}", resp) |
| check_content_in_response(f"return confirmDeleteDag(this, '{test_dag_id}')", resp) |
| |
| |
| @pytest.fixture |
| def new_dag_to_delete(): |
| dag = DAG("new_dag_to_delete", is_paused_upon_creation=True) |
| session = settings.Session() |
| dag.sync_to_db(session=session) |
| return dag |
| |
| |
| @pytest.fixture |
| def per_dag_perm_user_client(app, new_dag_to_delete): |
| sm = app.appbuilder.sm |
| perm = f"{permissions.RESOURCE_DAG_PREFIX}{new_dag_to_delete.dag_id}" |
| |
| sm.create_permission(permissions.ACTION_CAN_DELETE, perm) |
| |
| create_user( |
| app, |
| username="test_user_per_dag_perms", |
| role_name="User with some perms", |
| permissions=[ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), |
| (permissions.ACTION_CAN_DELETE, perm), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), |
| ], |
| ) |
| |
| sm.find_user(username="test_user_per_dag_perms") |
| |
| yield client_with_login( |
| app, |
| username="test_user_per_dag_perms", |
| password="test_user_per_dag_perms", |
| ) |
| |
| delete_user(app, username="test_user_per_dag_perms") # type: ignore |
| delete_roles(app) |
| |
| |
| @pytest.fixture |
| def one_dag_perm_user_client(app): |
| username = "test_user_one_dag_perm" |
| dag_id = "example_bash_operator" |
| sm = app.appbuilder.sm |
| perm = f"{permissions.RESOURCE_DAG_PREFIX}{dag_id}" |
| |
| sm.create_permission(permissions.ACTION_CAN_READ, perm) |
| |
| create_user( |
| app, |
| username=username, |
| role_name="User with permission to access only one dag", |
| permissions=[ |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), |
| (permissions.ACTION_CAN_READ, perm), |
| ], |
| ) |
| |
| sm.find_user(username=username) |
| |
| yield client_with_login( |
| app, |
| username=username, |
| password=username, |
| ) |
| |
| delete_user(app, username=username) # type: ignore |
| delete_roles(app) |
| |
| |
| def test_delete_just_dag_per_dag_permissions(new_dag_to_delete, per_dag_perm_user_client): |
| resp = per_dag_perm_user_client.post( |
| f"delete?dag_id={new_dag_to_delete.dag_id}&next=/home", follow_redirects=True |
| ) |
| check_content_in_response(f"Deleting DAG with id {new_dag_to_delete.dag_id}.", resp) |
| |
| |
| def test_delete_just_dag_resource_permissions(new_dag_to_delete, user_client): |
| resp = user_client.post(f"delete?dag_id={new_dag_to_delete.dag_id}&next=/home", follow_redirects=True) |
| check_content_in_response(f"Deleting DAG with id {new_dag_to_delete.dag_id}.", resp) |
| |
| |
| @pytest.mark.parametrize("endpoint", ["graph", "tree"]) |
| def test_show_external_log_redirect_link_with_local_log_handler(capture_templates, admin_client, endpoint): |
| """Do not show external links if log handler is local.""" |
| url = f"{endpoint}?dag_id=example_bash_operator" |
| with capture_templates() as templates: |
| admin_client.get(url, follow_redirects=True) |
| ctx = templates[0].local_context |
| assert not ctx["show_external_log_redirect"] |
| assert ctx["external_log_name"] is None |
| |
| |
| class _ExternalHandler(ExternalLoggingMixin): |
| _supports_external_link = True |
| LOG_NAME = "ExternalLog" |
| |
| @property |
| def log_name(self) -> str: |
| return self.LOG_NAME |
| |
| def get_external_log_url(self, *args, **kwargs) -> str: |
| return "http://external-service.com" |
| |
| @property |
| def supports_external_link(self) -> bool: |
| return self._supports_external_link |
| |
| |
| @pytest.mark.parametrize("endpoint", ["graph", "tree"]) |
| @unittest.mock.patch( |
| "airflow.utils.log.log_reader.TaskLogReader.log_handler", |
| new_callable=unittest.mock.PropertyMock, |
| return_value=_ExternalHandler(), |
| ) |
| def test_show_external_log_redirect_link_with_external_log_handler( |
| _, capture_templates, admin_client, endpoint |
| ): |
| """Show external links if log handler is external.""" |
| url = f"{endpoint}?dag_id=example_bash_operator" |
| with capture_templates() as templates: |
| admin_client.get(url, follow_redirects=True) |
| ctx = templates[0].local_context |
| assert ctx["show_external_log_redirect"] |
| assert ctx["external_log_name"] == _ExternalHandler.LOG_NAME |
| |
| |
| @pytest.mark.parametrize("endpoint", ["graph", "tree"]) |
| @unittest.mock.patch( |
| "airflow.utils.log.log_reader.TaskLogReader.log_handler", |
| new_callable=unittest.mock.PropertyMock, |
| return_value=_ExternalHandler(), |
| ) |
| def test_external_log_redirect_link_with_external_log_handler_not_shown( |
| _external_handler, capture_templates, admin_client, endpoint |
| ): |
| """Show external links if log handler is external.""" |
| _external_handler.return_value._supports_external_link = False |
| url = f"{endpoint}?dag_id=example_bash_operator" |
| with capture_templates() as templates: |
| admin_client.get(url, follow_redirects=True) |
| ctx = templates[0].local_context |
| assert not ctx["show_external_log_redirect"] |
| assert ctx["external_log_name"] is None |
| |
| |
| def _get_appbuilder_pk_string(model_view_cls, instance) -> str: |
| """Utility to get Flask-Appbuilder's string format "pk" for an object. |
| |
| Used to generate requests to FAB action views without *too* much difficulty. |
| The implementation relies on FAB internals, but unfortunately I don't see |
| a better way around it. |
| |
| Example usage:: |
| |
| from airflow.www.views import TaskInstanceModelView |
| |
| ti = session.Query(TaskInstance).filter(...).one() |
| pk = _get_appbuilder_pk_string(TaskInstanceModelView, ti) |
| client.post("...", data={"action": "...", "rowid": pk}) |
| """ |
| pk_value = model_view_cls.datamodel.get_pk_value(instance) |
| return model_view_cls._serialize_pk_if_composite(model_view_cls, pk_value) |
| |
| |
| def test_task_instance_delete(session, admin_client, create_task_instance): |
| task_instance_to_delete = create_task_instance( |
| task_id="test_task_instance_delete", |
| execution_date=timezone.utcnow(), |
| state=State.DEFERRED, |
| ) |
| composite_key = _get_appbuilder_pk_string(TaskInstanceModelView, task_instance_to_delete) |
| task_id = task_instance_to_delete.task_id |
| |
| assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 1 |
| admin_client.post(f"/taskinstance/delete/{composite_key}", follow_redirects=True) |
| assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 0 |
| |
| |
| def test_task_instance_delete_permission_denied(session, client_ti_without_dag_edit, create_task_instance): |
| task_instance_to_delete = create_task_instance( |
| task_id="test_task_instance_delete_permission_denied", |
| execution_date=timezone.utcnow(), |
| state=State.DEFERRED, |
| session=session, |
| ) |
| session.commit() |
| composite_key = _get_appbuilder_pk_string(TaskInstanceModelView, task_instance_to_delete) |
| task_id = task_instance_to_delete.task_id |
| |
| assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 1 |
| resp = client_ti_without_dag_edit.post(f"/taskinstance/delete/{composite_key}", follow_redirects=True) |
| check_content_in_response("Access is Denied", resp) |
| assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 1 |
| |
| |
| @pytest.mark.parametrize( |
| "client_fixture, should_succeed", |
| [ |
| ("admin_client", True), |
| ("user_client", True), |
| ("viewer_client", False), |
| ("anonymous_client", False), |
| ], |
| ) |
| def test_task_instance_clear(session, request, client_fixture, should_succeed): |
| client = request.getfixturevalue(client_fixture) |
| task_id = "runme_0" |
| initial_state = State.SUCCESS |
| |
| # Set the state to success for clearing. |
| ti_q = session.query(TaskInstance).filter(TaskInstance.task_id == task_id) |
| ti_q.update({"state": initial_state}) |
| session.commit() |
| |
| # Send a request to clear. |
| rowid = _get_appbuilder_pk_string(TaskInstanceModelView, ti_q.one()) |
| resp = client.post( |
| "/taskinstance/action_post", |
| data={"action": "clear", "rowid": rowid}, |
| follow_redirects=True, |
| ) |
| assert resp.status_code == 200 |
| if not should_succeed and client_fixture != "anonymous_client": |
| check_content_in_response("Access is Denied", resp) |
| |
| # Now the state should be None. |
| state = session.query(TaskInstance.state).filter(TaskInstance.task_id == task_id).scalar() |
| assert state == (State.NONE if should_succeed else initial_state) |
| |
| |
| def test_task_instance_clear_downstream(session, admin_client, dag_maker): |
| """Ensures clearing a task instance clears its downstream dependencies exclusively""" |
| with dag_maker( |
| dag_id="test_dag_id", |
| serialized=True, |
| session=session, |
| start_date=pendulum.DateTime(2023, 1, 1, 0, 0, 0, tzinfo=pendulum.UTC), |
| ): |
| EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2") |
| EmptyOperator(task_id="task_3") |
| |
| run1 = dag_maker.create_dagrun( |
| run_id="run_1", |
| state=DagRunState.SUCCESS, |
| run_type=DagRunType.SCHEDULED, |
| execution_date=dag_maker.dag.start_date, |
| start_date=dag_maker.dag.start_date, |
| session=session, |
| ) |
| |
| run2 = dag_maker.create_dagrun( |
| run_id="run_2", |
| state=DagRunState.SUCCESS, |
| run_type=DagRunType.SCHEDULED, |
| execution_date=dag_maker.dag.start_date.add(days=1), |
| start_date=dag_maker.dag.start_date.add(days=1), |
| session=session, |
| ) |
| |
| for run in (run1, run2): |
| for ti in run.task_instances: |
| ti.state = State.SUCCESS |
| |
| # Clear task_1 from dag run 1 |
| run1_ti1 = run1.get_task_instance(task_id="task_1") |
| rowid = _get_appbuilder_pk_string(TaskInstanceModelView, run1_ti1) |
| resp = admin_client.post( |
| "/taskinstance/action_post", |
| data={"action": "clear_downstream", "rowid": rowid}, |
| follow_redirects=True, |
| ) |
| assert resp.status_code == 200 |
| |
| # Assert that task_1 and task_2 of dag run 1 are cleared, but task_3 is left untouched |
| run1_ti1.refresh_from_db(session=session) |
| run1_ti2 = run1.get_task_instance(task_id="task_2") |
| run1_ti3 = run1.get_task_instance(task_id="task_3") |
| |
| assert run1_ti1.state == State.NONE |
| assert run1_ti2.state == State.NONE |
| assert run1_ti3.state == State.SUCCESS |
| |
| # Assert that task_1 of dag run 2 is left untouched |
| run2_ti1 = run2.get_task_instance(task_id="task_1") |
| assert run2_ti1.state == State.SUCCESS |
| |
| |
| def test_task_instance_clear_failure(admin_client): |
| rowid = '["12345"]' # F.A.B. crashes if the rowid is *too* invalid. |
| resp = admin_client.post( |
| "/taskinstance/action_post", |
| data={"action": "clear", "rowid": rowid}, |
| follow_redirects=True, |
| ) |
| assert resp.status_code == 200 |
| check_content_in_response("Failed to clear task instances:", resp) |
| |
| |
| @pytest.mark.parametrize( |
| "action, expected_state", |
| [ |
| ("set_failed", State.FAILED), |
| ("set_success", State.SUCCESS), |
| ("set_retry", State.UP_FOR_RETRY), |
| ("set_skipped", State.SKIPPED), |
| ], |
| ids=["failed", "success", "retry", "skipped"], |
| ) |
| def test_task_instance_set_state(session, admin_client, action, expected_state): |
| task_id = "runme_0" |
| |
| # Send a request to clear. |
| ti_q = session.query(TaskInstance).filter(TaskInstance.task_id == task_id) |
| rowid = _get_appbuilder_pk_string(TaskInstanceModelView, ti_q.one()) |
| resp = admin_client.post( |
| "/taskinstance/action_post", |
| data={"action": action, "rowid": rowid}, |
| follow_redirects=True, |
| ) |
| assert resp.status_code == 200 |
| |
| # Now the state should be modified. |
| state = session.query(TaskInstance.state).filter(TaskInstance.task_id == task_id).scalar() |
| assert state == expected_state |
| |
| |
| @pytest.mark.parametrize( |
| "action", |
| [ |
| "set_failed", |
| "set_success", |
| "set_retry", |
| "set_skipped", |
| ], |
| ) |
| def test_task_instance_set_state_failure(admin_client, action): |
| rowid = '["12345"]' # F.A.B. crashes if the rowid is *too* invalid. |
| resp = admin_client.post( |
| "/taskinstance/action_post", |
| data={"action": action, "rowid": rowid}, |
| follow_redirects=True, |
| ) |
| assert resp.status_code == 200 |
| check_content_in_response("Failed to set state", resp) |
| |
| |
| @pytest.mark.parametrize( |
| "task_search_tuples", |
| [ |
| [("example_xcom", "bash_push"), ("example_bash_operator", "run_this_last")], |
| [("example_subdag_operator", "some-other-task")], |
| ], |
| ids=["multiple_tasks", "one_task"], |
| ) |
| def test_action_muldelete_task_instance(session, admin_client, task_search_tuples): |
| # get task instances to delete |
| tasks_to_delete = [] |
| for task_search_tuple in task_search_tuples: |
| dag_id, task_id = task_search_tuple |
| tasks_to_delete.append( |
| session.query(TaskInstance) |
| .filter(TaskInstance.task_id == task_id, TaskInstance.dag_id == dag_id) |
| .one() |
| ) |
| |
| # add task reschedules for those tasks to make sure that the delete cascades to the required tables |
| trs = [ |
| TaskReschedule( |
| task_id=task.task_id, |
| dag_id=task.dag_id, |
| run_id=task.run_id, |
| try_number=1, |
| start_date=timezone.datetime(2021, 1, 1), |
| end_date=timezone.datetime(2021, 1, 2), |
| reschedule_date=timezone.datetime(2021, 1, 3), |
| ) |
| for task in tasks_to_delete |
| ] |
| session.bulk_save_objects(trs) |
| session.flush() |
| |
| # run the function to test |
| resp = admin_client.post( |
| "/taskinstance/action_post", |
| data={ |
| "action": "muldelete", |
| "rowid": [_get_appbuilder_pk_string(TaskInstanceModelView, task) for task in tasks_to_delete], |
| }, |
| follow_redirects=True, |
| ) |
| |
| # assert expected behavior for that function and its response |
| assert resp.status_code == 200 |
| for task_search_tuple in task_search_tuples: |
| dag_id, task_id = task_search_tuple |
| assert ( |
| session.query(TaskInstance) |
| .filter(TaskInstance.task_id == task_id, TaskInstance.dag_id == dag_id) |
| .count() |
| == 0 |
| ) |
| assert session.query(TaskReschedule).count() == 0 |
| |
| |
| def test_graph_view_doesnt_fail_on_recursion_error(app, dag_maker, admin_client): |
| """Test that the graph view doesn't fail on a recursion error.""" |
| from airflow.models.baseoperator import chain |
| |
| with dag_maker("test_fails_with_recursion") as dag: |
| tasks = [ |
| BashOperator( |
| task_id=f"task_{i}", |
| bash_command="echo test", |
| ) |
| for i in range(1, 1000 + 1) |
| ] |
| chain(*tasks) |
| with unittest.mock.patch.object(app, "dag_bag") as mocked_dag_bag: |
| mocked_dag_bag.get_dag.return_value = dag |
| url = f"/dags/{dag.dag_id}/graph" |
| resp = admin_client.get(url, follow_redirects=True) |
| assert resp.status_code == 200 |
| |
| |
| def test_task_instances(admin_client): |
| """Test task_instances view.""" |
| resp = admin_client.get( |
| f"/object/task_instances?dag_id=example_bash_operator&execution_date={STR_DEFAULT_DATE}", |
| follow_redirects=True, |
| ) |
| assert resp.status_code == 200 |
| assert resp.json == { |
| "also_run_this": { |
| "custom_operator_name": None, |
| "dag_id": "example_bash_operator", |
| "duration": None, |
| "end_date": None, |
| "execution_date": DEFAULT_DATE.isoformat(), |
| "executor": None, |
| "executor_config": {}, |
| "external_executor_id": None, |
| "hostname": "", |
| "job_id": None, |
| "map_index": -1, |
| "max_tries": 0, |
| "next_kwargs": None, |
| "next_method": None, |
| "operator": "BashOperator", |
| "pid": None, |
| "pool": "default_pool", |
| "pool_slots": 1, |
| "priority_weight": 2, |
| "queue": "default", |
| "queued_by_job_id": None, |
| "queued_dttm": None, |
| "rendered_map_index": None, |
| "run_id": "TEST_DAGRUN", |
| "start_date": None, |
| "state": None, |
| "task_display_name": "also_run_this", |
| "task_id": "also_run_this", |
| "trigger_id": None, |
| "trigger_timeout": None, |
| "try_number": 0, |
| "unixname": getuser(), |
| "updated_at": DEFAULT_DATE.isoformat(), |
| }, |
| "run_after_loop": { |
| "custom_operator_name": None, |
| "dag_id": "example_bash_operator", |
| "duration": None, |
| "end_date": None, |
| "execution_date": DEFAULT_DATE.isoformat(), |
| "executor": None, |
| "executor_config": {}, |
| "external_executor_id": None, |
| "hostname": "", |
| "job_id": None, |
| "map_index": -1, |
| "max_tries": 0, |
| "next_kwargs": None, |
| "next_method": None, |
| "operator": "BashOperator", |
| "pid": None, |
| "pool": "default_pool", |
| "pool_slots": 1, |
| "priority_weight": 2, |
| "queue": "default", |
| "queued_by_job_id": None, |
| "queued_dttm": None, |
| "rendered_map_index": None, |
| "run_id": "TEST_DAGRUN", |
| "start_date": None, |
| "state": None, |
| "task_display_name": "run_after_loop", |
| "task_id": "run_after_loop", |
| "trigger_id": None, |
| "trigger_timeout": None, |
| "try_number": 0, |
| "unixname": getuser(), |
| "updated_at": DEFAULT_DATE.isoformat(), |
| }, |
| "run_this_last": { |
| "custom_operator_name": None, |
| "dag_id": "example_bash_operator", |
| "duration": None, |
| "end_date": None, |
| "execution_date": DEFAULT_DATE.isoformat(), |
| "executor": None, |
| "executor_config": {}, |
| "external_executor_id": None, |
| "hostname": "", |
| "job_id": None, |
| "map_index": -1, |
| "max_tries": 0, |
| "next_kwargs": None, |
| "next_method": None, |
| "operator": "EmptyOperator", |
| "pid": None, |
| "pool": "default_pool", |
| "pool_slots": 1, |
| "priority_weight": 1, |
| "queue": "default", |
| "queued_by_job_id": None, |
| "queued_dttm": None, |
| "rendered_map_index": None, |
| "run_id": "TEST_DAGRUN", |
| "start_date": None, |
| "state": None, |
| "task_display_name": "run_this_last", |
| "task_id": "run_this_last", |
| "trigger_id": None, |
| "trigger_timeout": None, |
| "try_number": 0, |
| "unixname": getuser(), |
| "updated_at": DEFAULT_DATE.isoformat(), |
| }, |
| "runme_0": { |
| "custom_operator_name": None, |
| "dag_id": "example_bash_operator", |
| "duration": None, |
| "end_date": None, |
| "execution_date": DEFAULT_DATE.isoformat(), |
| "executor": None, |
| "executor_config": {}, |
| "external_executor_id": None, |
| "hostname": "", |
| "job_id": None, |
| "map_index": -1, |
| "max_tries": 0, |
| "next_kwargs": None, |
| "next_method": None, |
| "operator": "BashOperator", |
| "pid": None, |
| "pool": "default_pool", |
| "pool_slots": 1, |
| "priority_weight": 3, |
| "queue": "default", |
| "queued_by_job_id": None, |
| "queued_dttm": None, |
| "rendered_map_index": None, |
| "run_id": "TEST_DAGRUN", |
| "start_date": None, |
| "state": None, |
| "task_display_name": "runme_0", |
| "task_id": "runme_0", |
| "trigger_id": None, |
| "trigger_timeout": None, |
| "try_number": 0, |
| "unixname": getuser(), |
| "updated_at": DEFAULT_DATE.isoformat(), |
| }, |
| "runme_1": { |
| "custom_operator_name": None, |
| "dag_id": "example_bash_operator", |
| "duration": None, |
| "end_date": None, |
| "execution_date": DEFAULT_DATE.isoformat(), |
| "executor": None, |
| "executor_config": {}, |
| "external_executor_id": None, |
| "hostname": "", |
| "job_id": None, |
| "map_index": -1, |
| "max_tries": 0, |
| "next_kwargs": None, |
| "next_method": None, |
| "operator": "BashOperator", |
| "pid": None, |
| "pool": "default_pool", |
| "pool_slots": 1, |
| "priority_weight": 3, |
| "queue": "default", |
| "queued_by_job_id": None, |
| "queued_dttm": None, |
| "rendered_map_index": None, |
| "run_id": "TEST_DAGRUN", |
| "start_date": None, |
| "state": None, |
| "task_display_name": "runme_1", |
| "task_id": "runme_1", |
| "trigger_id": None, |
| "trigger_timeout": None, |
| "try_number": 0, |
| "unixname": getuser(), |
| "updated_at": DEFAULT_DATE.isoformat(), |
| }, |
| "runme_2": { |
| "custom_operator_name": None, |
| "dag_id": "example_bash_operator", |
| "duration": None, |
| "end_date": None, |
| "execution_date": DEFAULT_DATE.isoformat(), |
| "executor": None, |
| "executor_config": {}, |
| "external_executor_id": None, |
| "hostname": "", |
| "job_id": None, |
| "map_index": -1, |
| "max_tries": 0, |
| "next_kwargs": None, |
| "next_method": None, |
| "operator": "BashOperator", |
| "pid": None, |
| "pool": "default_pool", |
| "pool_slots": 1, |
| "priority_weight": 3, |
| "queue": "default", |
| "queued_by_job_id": None, |
| "queued_dttm": None, |
| "rendered_map_index": None, |
| "run_id": "TEST_DAGRUN", |
| "start_date": None, |
| "state": None, |
| "task_display_name": "runme_2", |
| "task_id": "runme_2", |
| "trigger_id": None, |
| "trigger_timeout": None, |
| "try_number": 0, |
| "unixname": getuser(), |
| "updated_at": DEFAULT_DATE.isoformat(), |
| }, |
| "this_will_skip": { |
| "custom_operator_name": None, |
| "dag_id": "example_bash_operator", |
| "duration": None, |
| "end_date": None, |
| "execution_date": DEFAULT_DATE.isoformat(), |
| "executor": None, |
| "executor_config": {}, |
| "external_executor_id": None, |
| "hostname": "", |
| "job_id": None, |
| "map_index": -1, |
| "max_tries": 0, |
| "next_kwargs": None, |
| "next_method": None, |
| "operator": "BashOperator", |
| "pid": None, |
| "pool": "default_pool", |
| "pool_slots": 1, |
| "priority_weight": 2, |
| "queue": "default", |
| "queued_by_job_id": None, |
| "queued_dttm": None, |
| "rendered_map_index": None, |
| "run_id": "TEST_DAGRUN", |
| "start_date": None, |
| "state": None, |
| "task_display_name": "this_will_skip", |
| "task_id": "this_will_skip", |
| "trigger_id": None, |
| "trigger_timeout": None, |
| "try_number": 0, |
| "unixname": getuser(), |
| "updated_at": DEFAULT_DATE.isoformat(), |
| }, |
| } |