blob: 81f067a48e4c592cf01dbb0fdc6f6a98dbdf466e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Unit tests for alerting in Superset"""
import json
import logging
from unittest.mock import patch
import pytest
from contextlib2 import contextmanager
from flask_appbuilder.security.sqla.models import Role
from superset.extensions import db, security_manager
from superset.migrations.shared.security_converge import (
add_pvms,
migrate_roles,
Pvm,
PvmMigrationMapType,
)
from tests.test_app import app
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@contextmanager
def create_old_role(pvm_map: PvmMigrationMapType, external_pvms):
with app.app_context():
pvms = []
for old_pvm, new_pvms in pvm_map.items():
pvms.append(
security_manager.add_permission_view_menu(
old_pvm.permission, old_pvm.view
)
)
for external_pvm in external_pvms:
pvms.append(
security_manager.find_permission_view_menu(
external_pvm.permission, external_pvm.view
)
)
new_role = Role(name="Dummy Role", permissions=pvms)
db.session.add(new_role)
db.session.commit()
yield new_role
new_role = (
db.session.query(Role).filter(Role.name == "Dummy Role").one_or_none()
)
new_role.permissions = []
db.session.merge(new_role)
for old_pvm, new_pvms in pvm_map.items():
security_manager.del_permission_view_menu(old_pvm.permission, old_pvm.view)
for new_pvm in new_pvms:
security_manager.del_permission_view_menu(
new_pvm.permission, new_pvm.view
)
db.session.delete(new_role)
db.session.commit()
@pytest.mark.parametrize(
"descriptiom, new_pvms, pvm_map, external_pvms, deleted_views, deleted_permissions",
[
(
"Many to one readonly",
{"NewDummy": ("can_read",)},
{
Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),),
Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_read"),),
},
(),
("DummyView",),
(),
),
(
"Many to one with new permission",
{"NewDummy": ("can_new_perm", "can_write")},
{
Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_new_perm"),),
Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_write"),),
},
(),
("DummyView",),
(),
),
(
"Many to one with multiple permissions",
{"NewDummy": ("can_read", "can_write",)},
{
Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),),
Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_read"),),
Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),),
Pvm("DummyView", "can_delete"): (Pvm("NewDummy", "can_write"),),
},
(),
("DummyView",),
(),
),
(
"Many to one with multiple views",
{"NewDummy": ("can_read", "can_write",)},
{
Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),),
Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_read"),),
Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),),
Pvm("DummyView", "can_delete"): (Pvm("NewDummy", "can_write"),),
Pvm("DummySecondView", "can_list"): (Pvm("NewDummy", "can_read"),),
Pvm("DummySecondView", "can_show"): (Pvm("NewDummy", "can_read"),),
Pvm("DummySecondView", "can_add"): (Pvm("NewDummy", "can_write"),),
Pvm("DummySecondView", "can_delete"): (Pvm("NewDummy", "can_write"),),
},
(),
("DummyView", "DummySecondView"),
(),
),
(
"Many to one with existing permission-view (pvm)",
{"NewDummy": ("can_read", "can_write",)},
{
Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),),
Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),),
},
(Pvm("UserDBModelView", "can_list"),),
("DummyView",),
(),
),
(
"Many to one with existing multiple permission-view (pvm)",
{"NewDummy": ("can_read", "can_write",)},
{
Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),),
Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),),
Pvm("DummySecondView", "can_list"): (Pvm("NewDummy", "can_read"),),
Pvm("DummySecondView", "can_add"): (Pvm("NewDummy", "can_write"),),
},
(Pvm("UserDBModelView", "can_list"), Pvm("UserDBModelView", "can_add"),),
("DummyView",),
(),
),
(
"Many to one with with old permission that gets deleted",
{"NewDummy": ("can_read", "can_write",)},
{
Pvm("DummyView", "can_new_perm"): (Pvm("NewDummy", "can_read"),),
Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),),
},
(),
("DummyView",),
("can_new_perm",),
),
(
"Many to Many (normally should be a downgrade)",
{"DummyView": ("can_list", "can_show", "can_add",)},
{
Pvm("NewDummy", "can_read"): (
Pvm("DummyView", "can_list"),
Pvm("DummyView", "can_show"),
),
Pvm("NewDummy", "can_write"): (Pvm("DummyView", "can_add"),),
},
(),
("NewDummy",),
(),
),
(
"Many to Many delete old permissions",
{"DummyView": ("can_list", "can_show", "can_add",)},
{
Pvm("NewDummy", "can_new_perm1"): (
Pvm("DummyView", "can_list"),
Pvm("DummyView", "can_show"),
),
Pvm("NewDummy", "can_new_perm2",): (Pvm("DummyView", "can_add"),),
},
(),
("NewDummy",),
("can_new_perm1", "can_new_perm2"),
),
],
)
def test_migrate_role(
descriptiom, new_pvms, pvm_map, external_pvms, deleted_views, deleted_permissions
):
"""
Permission migration: generic tests
"""
logger.info(descriptiom)
with create_old_role(pvm_map, external_pvms) as old_role:
role_name = old_role.name
session = db.session
# Run migrations
add_pvms(session, new_pvms)
migrate_roles(session, pvm_map)
role = db.session.query(Role).filter(Role.name == role_name).one_or_none()
for old_pvm, new_pvms in pvm_map.items():
old_pvm_model = security_manager.find_permission_view_menu(
old_pvm.permission, old_pvm.view
)
assert old_pvm_model is None
new_pvm_model = security_manager.find_permission_view_menu(
new_pvms[0].permission, new_pvms[0].view
)
assert new_pvm_model is not None
assert new_pvm_model in role.permissions
# assert deleted view menus
for deleted_view in deleted_views:
assert security_manager.find_view_menu(deleted_view) is None
# assert deleted permissions
for deleted_permission in deleted_permissions:
assert security_manager.find_permission(deleted_permission) is None
# assert externals are still there
for external_pvm in external_pvms:
assert (
security_manager.find_permission_view_menu(
external_pvm.permission, external_pvm.view
)
is not None
)