| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| from unittest.mock import call, MagicMock, patch |
| |
| import pytest |
| |
| from superset.commands.exceptions import TagForbiddenError, TagNotFoundValidationError |
| from superset.commands.utils import ( |
| compute_owner_list, |
| populate_owner_list, |
| Tag, |
| TagType, |
| update_tags, |
| User, |
| validate_tags, |
| ) |
| from superset.tags.models import ObjectType |
| |
| OBJECT_TYPES = {ObjectType.chart, ObjectType.chart} |
| MOCK_TAGS = [ |
| Tag( |
| id=1, |
| name="first", |
| type=TagType.custom, |
| ), |
| Tag( |
| id=2, |
| name="second", |
| type=TagType.custom, |
| ), |
| Tag( |
| id=3, |
| name="third", |
| type=TagType.custom, |
| ), |
| Tag( |
| id=4, |
| name="type:dashboard", |
| type=TagType.type, |
| ), |
| Tag( |
| id=4, |
| name="owner:1", |
| type=TagType.owner, |
| ), |
| Tag( |
| id=4, |
| name="avorited_by:2", |
| type=TagType.favorited_by, |
| ), |
| ] |
| |
| |
| @patch("superset.commands.utils.g") |
| def test_populate_owner_list_default_to_user(mock_user): |
| """ |
| Test the ``populate_owner_list`` method when no owners are provided |
| and default_to_user is True (non-admin). |
| """ |
| owner_list = populate_owner_list([], True) |
| assert owner_list == [mock_user.user] |
| |
| |
| @patch("superset.commands.utils.g") |
| def test_populate_owner_list_default_to_user_handle_none(mock_user): |
| """ |
| Test the ``populate_owner_list`` method when owners is None |
| and default_to_user is True (non-admin). |
| """ |
| owner_list = populate_owner_list(None, True) |
| assert owner_list == [mock_user.user] |
| |
| |
| @patch("superset.commands.utils.g") |
| @patch("superset.commands.utils.security_manager") |
| @patch("superset.commands.utils.get_user_id") |
| def test_populate_owner_list_admin_user(mock_user_id, mock_sm, mock_g): |
| """ |
| Test the ``populate_owner_list`` method when an admin is setting |
| another user as an owner and default_to_user is False. |
| """ |
| test_user = User(id=1, first_name="First", last_name="Last") |
| mock_g.user = User(id=4, first_name="Admin", last_name="User") |
| mock_user_id.return_value = 4 |
| mock_sm.is_admin = MagicMock(return_value=True) |
| mock_sm.get_user_by_id = MagicMock(return_value=test_user) |
| |
| owner_list = populate_owner_list([1], False) |
| assert owner_list == [test_user] |
| |
| |
| @patch("superset.commands.utils.g") |
| @patch("superset.commands.utils.security_manager") |
| @patch("superset.commands.utils.get_user_id") |
| def test_populate_owner_list_admin_user_empty_list(mock_user_id, mock_sm, mock_g): |
| """ |
| Test the ``populate_owner_list`` method when an admin is setting an empty list |
| of owners. |
| """ |
| mock_g.user = User(id=4, first_name="Admin", last_name="User") |
| mock_user_id.return_value = 4 |
| mock_sm.is_admin = MagicMock(return_value=True) |
| owner_list = populate_owner_list([], False) |
| assert owner_list == [] |
| |
| |
| @patch("superset.commands.utils.g") |
| @patch("superset.commands.utils.security_manager") |
| @patch("superset.commands.utils.get_user_id") |
| def test_populate_owner_list_non_admin(mock_user_id, mock_sm, mock_g): |
| """ |
| Test the ``populate_owner_list`` method when a non admin is adding |
| another user as an owner and default_to_user is False (both get added). |
| """ |
| test_user = User(id=1, first_name="First", last_name="Last") |
| mock_g.user = User(id=4, first_name="Non", last_name="admin") |
| mock_user_id.return_value = 4 |
| mock_sm.is_admin = MagicMock(return_value=False) |
| mock_sm.get_user_by_id = MagicMock(return_value=test_user) |
| |
| owner_list = populate_owner_list([1], False) |
| assert owner_list == [mock_g.user, test_user] |
| |
| |
| @patch("superset.commands.utils.populate_owner_list") |
| def test_compute_owner_list_new_owners(mock_populate_owner_list): |
| """ |
| Test the ``compute_owner_list`` method when replacing the owner list. |
| """ |
| current_owners = [User(id=1), User(id=2), User(id=3)] |
| new_owners = [4, 5, 6] |
| |
| compute_owner_list(current_owners, new_owners) |
| mock_populate_owner_list.assert_called_once_with(new_owners, default_to_user=False) |
| |
| |
| @patch("superset.commands.utils.populate_owner_list") |
| def test_compute_owner_list_no_new_owners(mock_populate_owner_list): |
| """ |
| Test the ``compute_owner_list`` method when replacing new_owners is None. |
| """ |
| current_owners = [User(id=1), User(id=2), User(id=3)] |
| new_owners = None |
| |
| compute_owner_list(current_owners, new_owners) |
| mock_populate_owner_list.assert_called_once_with([1, 2, 3], default_to_user=False) |
| |
| |
| @patch("superset.commands.utils.populate_owner_list") |
| def test_compute_owner_list_new_owner_empty_list(mock_populate_owner_list): |
| """ |
| Test the ``compute_owner_list`` method when new_owners is an empty list. |
| """ |
| current_owners = [User(id=1), User(id=2), User(id=3)] |
| new_owners = [] |
| |
| compute_owner_list(current_owners, new_owners) |
| mock_populate_owner_list.assert_called_once_with(new_owners, default_to_user=False) |
| |
| |
| @patch("superset.commands.utils.populate_owner_list") |
| def test_compute_owner_list_no_owners(mock_populate_owner_list): |
| """ |
| Test the ``compute_owner_list`` method when current ownership is an empty list. |
| """ |
| current_owners = [] |
| new_owners = [4, 5, 6] |
| |
| compute_owner_list(current_owners, new_owners) |
| mock_populate_owner_list.assert_called_once_with(new_owners, default_to_user=False) |
| |
| |
| @patch("superset.commands.utils.populate_owner_list") |
| def test_compute_owner_list_no_owners_handle_none(mock_populate_owner_list): |
| """ |
| Test the ``compute_owner_list`` method when current ownership is None. |
| """ |
| current_owners = None |
| new_owners = [4, 5, 6] |
| |
| compute_owner_list(current_owners, new_owners) |
| mock_populate_owner_list.assert_called_once_with(new_owners, default_to_user=False) |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.security_manager") |
| def test_validate_tags_new_tags_is_none(mock_sm, object_type): |
| """ |
| Test the ``validate_tags`` method when new_tags is None. |
| """ |
| validate_tags(object_type, MOCK_TAGS, None) |
| mock_sm.can_access.assert_not_called() |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.security_manager") |
| def test_validate_tags_empty_list_can_write_on_tag(mock_sm, object_type): |
| """ |
| Test the ``validate_tags`` method when new_tags is an empty list and |
| user has permission to write on tag. |
| """ |
| mock_sm.can_access = MagicMock(return_value=True) |
| validate_tags(object_type, MOCK_TAGS, []) |
| mock_sm.can_access.assert_called_once_with("can_write", "Tag") |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.security_manager") |
| def test_validate_tags_empty_list_can_tag_on_object(mock_sm, object_type): |
| """ |
| Test the ``validate_tags`` method when new_tags is an empty list and |
| user has permission to tag objects. |
| """ |
| mock_sm.can_access = MagicMock(side_effect=[False, True]) |
| validate_tags(object_type, MOCK_TAGS, []) |
| mock_sm.can_access.assert_has_calls( |
| [call("can_write", "Tag"), call("can_tag", object_type.name.capitalize())] |
| ) |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.security_manager") |
| def test_validate_tags_empty_list_missing_permission(mock_sm, object_type): |
| """ |
| Test the ``validate_tags`` method when new_tags is an empty list and |
| the user doesn't have the required permission. |
| """ |
| mock_sm.can_access = MagicMock(side_effect=[False, False]) |
| with pytest.raises(TagForbiddenError): |
| validate_tags(object_type, MOCK_TAGS, []) |
| mock_sm.can_access.assert_has_calls( |
| [call("can_write", "Tag"), call("can_tag", object_type.name.capitalize())] |
| ) |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.security_manager") |
| def test_validate_tags_no_changes_can_write_on_tag(mock_sm, object_type): |
| """ |
| Test the ``validate_tags`` method when new_tags is equal to existing tags |
| and user has permission to write on tag. |
| """ |
| new_tags = [tag.id for tag in MOCK_TAGS if tag.type == TagType.custom] |
| validate_tags(object_type, MOCK_TAGS, new_tags) |
| mock_sm.can_access.assert_not_called() |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.security_manager") |
| def test_validate_tags_no_changes_can_tag_on_object(mock_sm, object_type): |
| """ |
| Test the ``validate_tags`` method when new_tags is equal to existing tags |
| and user has permission to tag objects. |
| """ |
| new_tags = [tag.id for tag in MOCK_TAGS if tag.type == TagType.custom] |
| validate_tags(object_type, MOCK_TAGS, new_tags) |
| mock_sm.can_access.assert_not_called() |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.security_manager") |
| def test_validate_tags_no_changes_missing_permission(mock_sm, object_type): |
| """ |
| Test the ``validate_tags`` method when new_tags is equal to existing tags |
| the user doens't have the required perms. |
| """ |
| new_tags = [tag.id for tag in MOCK_TAGS if tag.type == TagType.custom] |
| validate_tags(object_type, MOCK_TAGS, new_tags) |
| mock_sm.can_access.assert_not_called() |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.security_manager") |
| @patch("superset.commands.utils.TagDAO.find_by_id") |
| def test_validate_tags_add_new_tags_can_write_on_tag( |
| mock_tag_find_by_id, mock_sm, object_type |
| ): |
| """ |
| Test the ``validate_tags`` method when new_tags are added and user has |
| permission to write on tag. |
| """ |
| new_tag_ids = [tag.id for tag in MOCK_TAGS if tag.type == TagType.custom] |
| new_tag = { |
| "id": 10, |
| "name": "New test tag", |
| "type": TagType.custom, |
| } |
| new_tag_ids.append(new_tag["id"]) |
| |
| mock_tag_find_by_id.return_value = new_tag |
| mock_sm.can_access = MagicMock(return_value=True) |
| |
| validate_tags(object_type, MOCK_TAGS, new_tag_ids) |
| |
| mock_sm.can_access.assert_called_once_with("can_write", "Tag") |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.security_manager") |
| @patch("superset.commands.utils.TagDAO.find_by_id") |
| def test_validate_tags_add_new_tags_can_tag_on_object( |
| mock_tag_find_by_id, mock_sm, object_type |
| ): |
| """ |
| Test the ``validate_tags`` method when new_tags are added and user has |
| permission to tag objects. |
| """ |
| current_tags = [tag for tag in MOCK_TAGS if tag.type == TagType.custom] |
| new_tag = current_tags.pop() |
| new_tag_ids = [tag.id for tag in current_tags] |
| new_tag_ids.append(new_tag.id) |
| |
| mock_sm.can_access = MagicMock(side_effect=[False, True]) |
| mock_tag_find_by_id.return_value = new_tag |
| |
| validate_tags(object_type, current_tags, new_tag_ids) |
| |
| mock_sm.can_access.assert_has_calls( |
| [call("can_write", "Tag"), call("can_tag", object_type.name.capitalize())] |
| ) |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.security_manager") |
| @patch("superset.commands.utils.TagDAO.find_by_name") |
| def test_validate_tags_can_write_on_tag_unable_to_find_tag( |
| mock_tag_find_by_id, mock_sm, object_type |
| ): |
| """ |
| Test the ``validate_tags`` method when an un-existing tag is being |
| added and user has permission to write on tag. |
| """ |
| fake_id = 100 |
| mock_sm.can_access = MagicMock(return_value=True) |
| mock_tag_find_by_id.return_value = None |
| with pytest.raises(TagNotFoundValidationError): |
| validate_tags(object_type, MOCK_TAGS, [fake_id]) |
| mock_sm.can_access.assert_called_once_with("can_write", "Tag") |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.security_manager") |
| @patch("superset.commands.utils.TagDAO.find_by_name") |
| def test_validate_tags_can_tag_on_object_unable_to_find_tag( |
| mock_tag_find_by_id, mock_sm, object_type |
| ): |
| """ |
| Test the ``validate_tags`` method when an un-existing tag is being |
| added and user has permission to tag on object. |
| """ |
| fake_id = 100 |
| mock_sm.can_access = MagicMock(side_effect=[False, True]) |
| mock_tag_find_by_id.return_value = None |
| with pytest.raises(TagNotFoundValidationError): |
| validate_tags(object_type, MOCK_TAGS, [fake_id]) |
| mock_sm.can_access.assert_has_calls( |
| [call("can_write", "Tag"), call("can_tag", object_type.name.capitalize())] |
| ) |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.TagDAO") |
| def test_update_tags_adding_tags(mock_tag_dao, object_type): |
| """ |
| Test the ``update_tags`` method when adding tags. |
| """ |
| current_tags = [tag for tag in MOCK_TAGS if tag.type == TagType.custom] |
| new_tag = current_tags.pop() |
| new_tags = [tag for tag in MOCK_TAGS if tag.type == TagType.custom] |
| new_tag_ids = [tag.id for tag in new_tags] |
| |
| mock_tag_dao.find_by_ids.return_value = [new_tag] |
| |
| update_tags(object_type, 1, current_tags, new_tag_ids) |
| |
| mock_tag_dao.find_by_ids.assert_called_once_with([new_tag.id]) |
| mock_tag_dao.delete_tagged_object.assert_not_called() |
| mock_tag_dao.create_custom_tagged_objects.assert_called_once_with( |
| object_type, 1, [new_tag.name] |
| ) |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.TagDAO") |
| def test_update_tags_removing_tags(mock_tag_dao, object_type): |
| """ |
| Test the ``update_tags`` method when removing existing tags. |
| """ |
| new_tags = [tag for tag in MOCK_TAGS if tag.type == TagType.custom] |
| tag_to_be_removed = new_tags.pop() |
| new_tag_ids = [tag.id for tag in new_tags] |
| |
| update_tags(object_type, 1, MOCK_TAGS, new_tag_ids) |
| |
| mock_tag_dao.delete_tagged_object.assert_called_once_with( |
| object_type, 1, tag_to_be_removed.name |
| ) |
| mock_tag_dao.create_custom_tagged_objects.assert_not_called() |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.TagDAO") |
| def test_update_tags_adding_and_removing_tags(mock_tag_dao, object_type): |
| """ |
| Test the ``update_tags`` method when adding and removing existing tags. |
| """ |
| new_tags = [tag for tag in MOCK_TAGS if tag.type == TagType.custom] |
| tag_to_be_removed = new_tags.pop() |
| new_tag = Tag(id=10, name="my new tag", type=TagType.custom) |
| new_tags.append(new_tag) |
| new_tag_ids = [tag.id for tag in new_tags] |
| |
| mock_tag_dao.find_by_ids.return_value = [new_tag] |
| |
| update_tags(object_type, 1, MOCK_TAGS, new_tag_ids) |
| |
| mock_tag_dao.delete_tagged_object.assert_called_once_with( |
| object_type, 1, tag_to_be_removed.name |
| ) |
| mock_tag_dao.find_by_ids.assert_called_once_with([new_tag.id]) |
| mock_tag_dao.create_custom_tagged_objects.assert_called_once_with( |
| object_type, 1, ["my new tag"] |
| ) |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.TagDAO") |
| def test_update_tags_removing_all_tags(mock_tag_dao, object_type): |
| """ |
| Test the ``update_tags`` method when removing all tags. |
| """ |
| update_tags(object_type, 1, MOCK_TAGS, []) |
| |
| mock_tag_dao.delete_tagged_object.assert_has_calls( |
| [ |
| call(object_type, 1, tag.name) |
| for tag in MOCK_TAGS |
| if tag.type == TagType.custom |
| ] |
| ) |
| mock_tag_dao.create_custom_tagged_objects.assert_not_called() |
| |
| |
| @pytest.mark.parametrize("object_type", OBJECT_TYPES) |
| @patch("superset.commands.utils.TagDAO") |
| def test_update_tags_no_tags(mock_tag_dao, object_type): |
| """ |
| Test the ``update_tags`` method when the asset only has system tags. |
| """ |
| system_tags = [tag for tag in MOCK_TAGS if tag.type != TagType.custom] |
| new_tags = [tag for tag in MOCK_TAGS if tag.type == TagType.custom] |
| new_tag_ids = [tag.id for tag in new_tags] |
| new_tag_names = [tag.name for tag in new_tags] |
| |
| mock_tag_dao.find_by_ids.return_value = new_tags |
| |
| update_tags(object_type, 1, system_tags, new_tag_ids) |
| |
| mock_tag_dao.delete_tagged_object.assert_not_called() |
| mock_tag_dao.find_by_ids.assert_called_once_with(new_tag_ids) |
| mock_tag_dao.create_custom_tagged_objects.assert_called_once_with( |
| object_type, 1, new_tag_names |
| ) |