| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import annotations |
| |
| import pytest |
| |
| from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP |
| from airflow.security import permissions |
| from airflow.www.fab_security.sqla.models import Role |
| from airflow.www.security import EXISTING_ROLES |
| from tests.test_utils.api_connexion_utils import ( |
| assert_401, |
| create_role, |
| create_user, |
| delete_role, |
| delete_user, |
| ) |
| |
| |
| @pytest.fixture(scope="module") |
| def configured_app(minimal_app_for_api): |
| app = minimal_app_for_api |
| create_user( |
| app, # type: ignore |
| username="test", |
| role_name="Test", |
| permissions=[ |
| (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_ROLE), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_ROLE), |
| (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_ROLE), |
| (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_ROLE), |
| (permissions.ACTION_CAN_READ, permissions.RESOURCE_ACTION), |
| ], |
| ) |
| create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore |
| yield app |
| |
| delete_user(app, username="test") # type: ignore |
| delete_user(app, username="test_no_permissions") # type: ignore |
| |
| |
| class TestRoleEndpoint: |
| @pytest.fixture(autouse=True) |
| def setup_attrs(self, configured_app) -> None: |
| self.app = configured_app |
| self.client = self.app.test_client() # type:ignore |
| |
| def teardown_method(self): |
| """ |
| Delete all roles except these ones. |
| Test and TestNoPermissions are deleted by delete_user above |
| """ |
| session = self.app.appbuilder.get_session |
| existing_roles = set(EXISTING_ROLES) |
| existing_roles.update(["Test", "TestNoPermissions"]) |
| roles = session.query(Role).filter(~Role.name.in_(existing_roles)).all() |
| for role in roles: |
| delete_role(self.app, role.name) |
| |
| |
| class TestGetRoleEndpoint(TestRoleEndpoint): |
| def test_should_response_200(self): |
| response = self.client.get("/api/v1/roles/Admin", environ_overrides={"REMOTE_USER": "test"}) |
| assert response.status_code == 200 |
| assert response.json["name"] == "Admin" |
| |
| def test_should_respond_404(self): |
| response = self.client.get("/api/v1/roles/invalid-role", environ_overrides={"REMOTE_USER": "test"}) |
| assert response.status_code == 404 |
| assert { |
| "detail": "Role with name 'invalid-role' was not found", |
| "status": 404, |
| "title": "Role not found", |
| "type": EXCEPTIONS_LINK_MAP[404], |
| } == response.json |
| |
| def test_should_raises_401_unauthenticated(self): |
| response = self.client.get("/api/v1/roles/Admin") |
| assert_401(response) |
| |
| def test_should_raise_403_forbidden(self): |
| response = self.client.get( |
| "/api/v1/roles/Admin", environ_overrides={"REMOTE_USER": "test_no_permissions"} |
| ) |
| assert response.status_code == 403 |
| |
| |
| class TestGetRolesEndpoint(TestRoleEndpoint): |
| def test_should_response_200(self): |
| response = self.client.get("/api/v1/roles", environ_overrides={"REMOTE_USER": "test"}) |
| assert response.status_code == 200 |
| existing_roles = set(EXISTING_ROLES) |
| existing_roles.update(["Test", "TestNoPermissions"]) |
| assert response.json["total_entries"] == len(existing_roles) |
| roles = {role["name"] for role in response.json["roles"]} |
| assert roles == existing_roles |
| |
| def test_should_raises_401_unauthenticated(self): |
| response = self.client.get("/api/v1/roles") |
| assert_401(response) |
| |
| def test_should_raises_400_for_invalid_order_by(self): |
| response = self.client.get( |
| "/api/v1/roles?order_by=invalid", environ_overrides={"REMOTE_USER": "test"} |
| ) |
| assert response.status_code == 400 |
| msg = "Ordering with 'invalid' is disallowed or the attribute does not exist on the model" |
| assert response.json["detail"] == msg |
| |
| def test_should_raise_403_forbidden(self): |
| response = self.client.get("/api/v1/roles", environ_overrides={"REMOTE_USER": "test_no_permissions"}) |
| assert response.status_code == 403 |
| |
| |
| class TestGetRolesEndpointPaginationandFilter(TestRoleEndpoint): |
| @pytest.mark.parametrize( |
| "url, expected_roles", |
| [ |
| ("/api/v1/roles?limit=1", ["Admin"]), |
| ("/api/v1/roles?limit=2", ["Admin", "Op"]), |
| ( |
| "/api/v1/roles?offset=1", |
| ["Op", "Public", "Test", "TestNoPermissions", "User", "Viewer"], |
| ), |
| ( |
| "/api/v1/roles?offset=0", |
| ["Admin", "Op", "Public", "Test", "TestNoPermissions", "User", "Viewer"], |
| ), |
| ("/api/v1/roles?limit=1&offset=2", ["Public"]), |
| ("/api/v1/roles?limit=1&offset=1", ["Op"]), |
| ( |
| "/api/v1/roles?limit=2&offset=2", |
| ["Public", "Test"], |
| ), |
| ], |
| ) |
| def test_can_handle_limit_and_offset(self, url, expected_roles): |
| response = self.client.get(url, environ_overrides={"REMOTE_USER": "test"}) |
| assert response.status_code == 200 |
| existing_roles = set(EXISTING_ROLES) |
| existing_roles.update(["Test", "TestNoPermissions"]) |
| assert response.json["total_entries"] == len(existing_roles) |
| roles = [role["name"] for role in response.json["roles"] if role] |
| |
| assert roles == expected_roles |
| |
| |
| class TestGetPermissionsEndpoint(TestRoleEndpoint): |
| def test_should_response_200(self): |
| response = self.client.get("/api/v1/permissions", environ_overrides={"REMOTE_USER": "test"}) |
| actions = {i[0] for i in self.app.appbuilder.sm.get_all_permissions() if i} |
| assert response.status_code == 200 |
| assert response.json["total_entries"] == len(actions) |
| returned_actions = {perm["name"] for perm in response.json["actions"]} |
| assert actions == returned_actions |
| |
| def test_should_raises_401_unauthenticated(self): |
| response = self.client.get("/api/v1/permissions") |
| assert_401(response) |
| |
| def test_should_raise_403_forbidden(self): |
| response = self.client.get( |
| "/api/v1/permissions", environ_overrides={"REMOTE_USER": "test_no_permissions"} |
| ) |
| assert response.status_code == 403 |
| |
| |
| class TestPostRole(TestRoleEndpoint): |
| def test_post_should_respond_200(self): |
| payload = { |
| "name": "Test2", |
| "actions": [{"resource": {"name": "Connections"}, "action": {"name": "can_create"}}], |
| } |
| response = self.client.post("/api/v1/roles", json=payload, environ_overrides={"REMOTE_USER": "test"}) |
| assert response.status_code == 200 |
| role = self.app.appbuilder.sm.find_role("Test2") |
| assert role is not None |
| |
| @pytest.mark.parametrize( |
| "payload, error_message", |
| [ |
| ( |
| { |
| "actions": [{"resource": {"name": "Connections"}, "action": {"name": "can_create"}}], |
| }, |
| "{'name': ['Missing data for required field.']}", |
| ), |
| ( |
| { |
| "name": "TestRole", |
| "actionss": [ |
| { |
| "resource": {"name": "Connections"}, # actionss not correct |
| "action": {"name": "can_create"}, |
| } |
| ], |
| }, |
| "{'actionss': ['Unknown field.']}", |
| ), |
| ( |
| { |
| "name": "TestRole", |
| "actions": [ |
| { |
| "resources": {"name": "Connections"}, # resources is invalid, should be resource |
| "action": {"name": "can_create"}, |
| } |
| ], |
| }, |
| "{'actions': {0: {'resources': ['Unknown field.']}}}", |
| ), |
| ( |
| { |
| "name": "TestRole", |
| "actions": [ |
| {"resource": {"name": "Connections"}, "actions": {"name": "can_create"}} |
| ], # actions is invalid, should be action |
| }, |
| "{'actions': {0: {'actions': ['Unknown field.']}}}", |
| ), |
| ( |
| { |
| "name": "TestRole", |
| "actions": [ |
| { |
| "resource": {"name": "FooBars"}, # FooBars is not a resource |
| "action": {"name": "can_create"}, |
| } |
| ], |
| }, |
| "The specified resource: 'FooBars' was not found", |
| ), |
| ( |
| { |
| "name": "TestRole", |
| "actions": [ |
| {"resource": {"name": "Connections"}, "action": {"name": "can_amend"}} |
| ], # can_amend is not an action |
| }, |
| "The specified action: 'can_amend' was not found", |
| ), |
| ], |
| ) |
| def test_post_should_respond_400_for_invalid_payload(self, payload, error_message): |
| response = self.client.post("/api/v1/roles", json=payload, environ_overrides={"REMOTE_USER": "test"}) |
| assert response.status_code == 400 |
| assert response.json == { |
| "detail": error_message, |
| "status": 400, |
| "title": "Bad Request", |
| "type": EXCEPTIONS_LINK_MAP[400], |
| } |
| |
| def test_post_should_respond_409_already_exist(self): |
| payload = { |
| "name": "Test", |
| "actions": [{"resource": {"name": "Connections"}, "action": {"name": "can_create"}}], |
| } |
| response = self.client.post("/api/v1/roles", json=payload, environ_overrides={"REMOTE_USER": "test"}) |
| assert response.status_code == 409 |
| assert response.json == { |
| "detail": "Role with name 'Test' already exists; please update with the PATCH endpoint", |
| "status": 409, |
| "title": "Conflict", |
| "type": EXCEPTIONS_LINK_MAP[409], |
| } |
| |
| def test_should_raises_401_unauthenticated(self): |
| response = self.client.post( |
| "/api/v1/roles", |
| json={ |
| "name": "Test2", |
| "actions": [{"resource": {"name": "Connections"}, "action": {"name": "can_create"}}], |
| }, |
| ) |
| |
| assert_401(response) |
| |
| def test_should_raise_403_forbidden(self): |
| response = self.client.post( |
| "/api/v1/roles", |
| json={ |
| "name": "mytest2", |
| "actions": [{"resource": {"name": "Connections"}, "action": {"name": "can_create"}}], |
| }, |
| environ_overrides={"REMOTE_USER": "test_no_permissions"}, |
| ) |
| assert response.status_code == 403 |
| |
| |
| class TestDeleteRole(TestRoleEndpoint): |
| def test_delete_should_respond_204(self, session): |
| role = create_role(self.app, "mytestrole") |
| response = self.client.delete(f"/api/v1/roles/{role.name}", environ_overrides={"REMOTE_USER": "test"}) |
| assert response.status_code == 204 |
| role_obj = session.query(Role).filter(Role.name == role.name).all() |
| assert len(role_obj) == 0 |
| |
| def test_delete_should_respond_404(self): |
| response = self.client.delete( |
| "/api/v1/roles/invalidrolename", environ_overrides={"REMOTE_USER": "test"} |
| ) |
| assert response.status_code == 404 |
| assert response.json == { |
| "detail": "Role with name 'invalidrolename' was not found", |
| "status": 404, |
| "title": "Role not found", |
| "type": EXCEPTIONS_LINK_MAP[404], |
| } |
| |
| def test_should_raises_401_unauthenticated(self): |
| response = self.client.delete("/api/v1/roles/test") |
| |
| assert_401(response) |
| |
| def test_should_raise_403_forbidden(self): |
| response = self.client.delete( |
| "/api/v1/roles/test", environ_overrides={"REMOTE_USER": "test_no_permissions"} |
| ) |
| assert response.status_code == 403 |
| |
| |
| class TestPatchRole(TestRoleEndpoint): |
| @pytest.mark.parametrize( |
| "payload, expected_name, expected_actions", |
| [ |
| ({"name": "mytest"}, "mytest", []), |
| ( |
| { |
| "name": "mytest2", |
| "actions": [{"resource": {"name": "Connections"}, "action": {"name": "can_create"}}], |
| }, |
| "mytest2", |
| [{"resource": {"name": "Connections"}, "action": {"name": "can_create"}}], |
| ), |
| ], |
| ) |
| def test_patch_should_respond_200(self, payload, expected_name, expected_actions): |
| role = create_role(self.app, "mytestrole") |
| response = self.client.patch( |
| f"/api/v1/roles/{role.name}", json=payload, environ_overrides={"REMOTE_USER": "test"} |
| ) |
| assert response.status_code == 200 |
| assert response.json["name"] == expected_name |
| assert response.json["actions"] == expected_actions |
| |
| def test_patch_should_update_correct_roles_permissions(self): |
| create_role(self.app, "role_to_change") |
| create_role(self.app, "already_exists") |
| |
| response = self.client.patch( |
| "/api/v1/roles/role_to_change", |
| json={ |
| "name": "already_exists", |
| "actions": [{"action": {"name": "can_delete"}, "resource": {"name": "XComs"}}], |
| }, |
| environ_overrides={"REMOTE_USER": "test"}, |
| ) |
| assert response.status_code == 200 |
| |
| updated_permissions = self.app.appbuilder.sm.find_role("role_to_change").permissions |
| assert len(updated_permissions) == 1 |
| assert updated_permissions[0].resource.name == "XComs" |
| assert updated_permissions[0].action.name == "can_delete" |
| |
| assert len(self.app.appbuilder.sm.find_role("already_exists").permissions) == 0 |
| |
| @pytest.mark.parametrize( |
| "update_mask, payload, expected_name, expected_actions", |
| [ |
| ( |
| "?update_mask=name", |
| { |
| "name": "mytest2", |
| "actions": [{"resource": {"name": "Connections"}, "action": {"name": "can_create"}}], |
| }, |
| "mytest2", |
| [], |
| ), |
| ( |
| "?update_mask=name, actions", # both name and actions in update mask |
| { |
| "name": "mytest2", |
| "actions": [{"resource": {"name": "Connections"}, "action": {"name": "can_create"}}], |
| }, |
| "mytest2", |
| [{"resource": {"name": "Connections"}, "action": {"name": "can_create"}}], |
| ), |
| ], |
| ) |
| def test_patch_should_respond_200_with_update_mask( |
| self, update_mask, payload, expected_name, expected_actions |
| ): |
| role = create_role(self.app, "mytestrole") |
| assert role.permissions == [] |
| response = self.client.patch( |
| f"/api/v1/roles/{role.name}{update_mask}", |
| json=payload, |
| environ_overrides={"REMOTE_USER": "test"}, |
| ) |
| assert response.status_code == 200 |
| assert response.json["name"] == expected_name |
| assert response.json["actions"] == expected_actions |
| |
| def test_patch_should_respond_400_for_invalid_fields_in_update_mask(self): |
| role = create_role(self.app, "mytestrole") |
| payload = {"name": "testme"} |
| response = self.client.patch( |
| f"/api/v1/roles/{role.name}?update_mask=invalid_name", |
| json=payload, |
| environ_overrides={"REMOTE_USER": "test"}, |
| ) |
| assert response.status_code == 400 |
| assert response.json["detail"] == "'invalid_name' in update_mask is unknown" |
| |
| @pytest.mark.parametrize( |
| "payload, expected_error", |
| [ |
| ( |
| { |
| "name": "testme", |
| "permissions": [ # Using permissions instead of actions should raise |
| {"resource": {"name": "Connections"}, "action": {"name": "can_create"}} |
| ], |
| }, |
| "{'permissions': ['Unknown field.']}", |
| ), |
| ( |
| { |
| "name": "testme", |
| "actions": [ |
| { |
| "view_menu": {"name": "Connections"}, # Using view_menu instead of resource |
| "action": {"name": "can_create"}, |
| } |
| ], |
| }, |
| "{'actions': {0: {'view_menu': ['Unknown field.']}}}", |
| ), |
| ( |
| { |
| "name": "testme", |
| "actions": [ |
| { |
| "resource": {"name": "FooBars"}, # Using wrong resource name |
| "action": {"name": "can_create"}, |
| } |
| ], |
| }, |
| "The specified resource: 'FooBars' was not found", |
| ), |
| ( |
| { |
| "name": "testme", |
| "actions": [ |
| { |
| "resource": {"name": "Connections"}, # Using wrong action name |
| "action": {"name": "can_invalid"}, |
| } |
| ], |
| }, |
| "The specified action: 'can_invalid' was not found", |
| ), |
| ], |
| ) |
| def test_patch_should_respond_400_for_invalid_update(self, payload, expected_error): |
| role = create_role(self.app, "mytestrole") |
| response = self.client.patch( |
| f"/api/v1/roles/{role.name}", |
| json=payload, |
| environ_overrides={"REMOTE_USER": "test"}, |
| ) |
| assert response.status_code == 400 |
| assert response.json["detail"] == expected_error |
| |
| def test_should_raises_401_unauthenticated(self): |
| response = self.client.patch( |
| "/api/v1/roles/test", |
| json={ |
| "name": "mytest2", |
| "actions": [{"resource": {"name": "Connections"}, "action": {"name": "can_create"}}], |
| }, |
| ) |
| |
| assert_401(response) |
| |
| def test_should_raise_403_forbidden(self): |
| response = self.client.patch( |
| "/api/v1/roles/test", |
| json={ |
| "name": "mytest2", |
| "actions": [{"resource": {"name": "Connections"}, "action": {"name": "can_create"}}], |
| }, |
| environ_overrides={"REMOTE_USER": "test_no_permissions"}, |
| ) |
| assert response.status_code == 403 |