blob: dad8a3d47eb4c90fa92a7c26a82c73c281641a9f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# isort:skip_file
import inspect
import time
import unittest
from collections import namedtuple
from unittest import mock
from unittest.mock import Mock, patch, call, ANY
from typing import Any
import jwt
import prison
import pytest
from flask import current_app, g
from flask_appbuilder.security.sqla.models import Role
from superset.daos.datasource import DatasourceDAO # noqa: F401
from superset.models.dashboard import Dashboard
from superset import appbuilder, db, security_manager, viz
from superset.connectors.sqla.models import SqlaTable
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import SupersetSecurityException
from superset.models.core import Database
from superset.models.slice import Slice
from superset.sql.parse import Table
from superset.utils.core import (
DatasourceType,
backend,
get_example_default_schema,
override_user,
)
from superset.utils import json
from superset.utils.database import get_example_database
from superset.utils.urls import get_url_host
from tests.integration_tests.base_tests import SupersetTestCase
from tests.integration_tests.constants import GAMMA_USERNAME
from tests.integration_tests.conftest import with_feature_flags
from tests.integration_tests.fixtures.public_role import (
public_role_like_gamma, # noqa: F401
public_role_like_test_role, # noqa: F401
)
from tests.integration_tests.fixtures.birth_names_dashboard import (
load_birth_names_dashboard_with_slices, # noqa: F401
load_birth_names_data, # noqa: F401
)
from tests.integration_tests.fixtures.world_bank_dashboard import (
load_world_bank_dashboard_with_slices, # noqa: F401
load_world_bank_data, # noqa: F401
)
from tests.integration_tests.fixtures.users import (
create_gamma_user_group, # noqa: F401
create_user_group_with_dar, # noqa: F401
create_gamma_user_group_with_dar, # noqa: F401
)
NEW_SECURITY_CONVERGE_VIEWS = (
"Annotation",
"Database",
"Dataset",
"Dashboard",
"CssTemplate",
"Chart",
"Query",
"SavedQuery",
)
def get_perm_tuples(role_name):
perm_set = set()
for perm in security_manager.find_role(role_name).permissions:
perm_set.add((perm.permission.name, perm.view_menu.name))
return perm_set
SCHEMA_ACCESS_ROLE = "schema_access_role"
def create_schema_perm(view_menu_name: str) -> None:
permission = "schema_access"
security_manager.add_permission_view_menu(permission, view_menu_name)
perm_view = security_manager.find_permission_view_menu(permission, view_menu_name)
security_manager.add_permission_role(
security_manager.find_role(SCHEMA_ACCESS_ROLE), perm_view
)
return None
def delete_schema_perm(view_menu_name: str) -> None:
pv = security_manager.find_permission_view_menu("schema_access", "[examples].[2]")
security_manager.del_permission_role(
security_manager.find_role(SCHEMA_ACCESS_ROLE), pv
)
security_manager.del_permission_view_menu("schema_access", "[examples].[2]")
return None
class TestRolePermission(SupersetTestCase):
"""Testing export role permissions."""
def setUp(self):
schema = get_example_default_schema()
security_manager.add_role(SCHEMA_ACCESS_ROLE)
db.session.commit()
ds = (
db.session.query(SqlaTable)
.filter_by(table_name="wb_health_population", schema=schema)
.first()
)
ds.schema = "temp_schema"
ds.schema_perm = ds.get_schema_perm()
ds_slices = (
db.session.query(Slice)
.filter_by(datasource_type=DatasourceType.TABLE)
.filter_by(datasource_id=ds.id)
.all()
)
for s in ds_slices:
s.schema_perm = ds.schema_perm
create_schema_perm("[examples].[temp_schema]")
gamma_user = security_manager.find_user(username="gamma")
gamma_user.roles.append(security_manager.find_role(SCHEMA_ACCESS_ROLE))
db.session.commit()
def tearDown(self):
ds = (
db.session.query(SqlaTable)
.filter_by(table_name="wb_health_population", schema="temp_schema")
.first()
)
schema_perm = ds.schema_perm
ds.schema = get_example_default_schema()
ds.schema_perm = None
ds_slices = (
db.session.query(Slice)
.filter_by(datasource_type=DatasourceType.TABLE)
.filter_by(datasource_id=ds.id)
.all()
)
for s in ds_slices:
s.schema_perm = None
delete_schema_perm(schema_perm)
db.session.delete(security_manager.find_role(SCHEMA_ACCESS_ROLE))
db.session.commit()
super().tearDown()
def test_after_insert_dataset(self):
security_manager.on_view_menu_after_insert = Mock()
security_manager.on_permission_view_after_insert = Mock()
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
table = SqlaTable(
schema="tmp_schema",
table_name="tmp_perm_table",
database=tmp_db1,
)
db.session.add(table)
db.session.commit()
table = db.session.query(SqlaTable).filter_by(table_name="tmp_perm_table").one()
assert table.perm == f"[tmp_db1].[tmp_perm_table](id:{table.id})"
pvm_dataset = security_manager.find_permission_view_menu(
"datasource_access", table.perm
)
pvm_schema = security_manager.find_permission_view_menu(
"schema_access", table.schema_perm
)
# Assert dataset permission is created and local perms are ok
assert pvm_dataset is not None
assert table.perm == f"[tmp_db1].[tmp_perm_table](id:{table.id})"
assert table.schema_perm == "[tmp_db1].[tmp_schema]"
assert pvm_schema is not None
# assert on permission hooks
call_args = security_manager.on_permission_view_after_insert.call_args
assert call_args.args[2].id == pvm_schema.id
security_manager.on_permission_view_after_insert.assert_has_calls(
[
call(ANY, ANY, ANY),
call(ANY, ANY, ANY),
]
)
# Cleanup
db.session.delete(table)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_insert_dataset_rollback(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
table = SqlaTable(
schema="tmp_schema",
table_name="tmp_table",
database=tmp_db1,
)
db.session.add(table)
db.session.flush()
pvm_dataset = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table](id:{table.id})"
)
assert pvm_dataset is not None
table_id = table.id
db.session.rollback()
table = (
db.session.query(SqlaTable).filter_by(table_name="tmp_table").one_or_none()
)
assert table is None
pvm_dataset = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table](id:{table_id})"
)
assert pvm_dataset is None
db.session.delete(tmp_db1)
db.session.commit()
def test_after_insert_dataset_table_none(self):
table = SqlaTable(
schema="tmp_schema",
table_name="tmp_perm_table",
# Setting database_id instead of database will skip permission creation
database_id=get_example_database().id,
)
db.session.add(table)
db.session.commit()
stored_table = (
db.session.query(SqlaTable).filter_by(table_name="tmp_perm_table").one()
)
# Assert permission is created
assert (
security_manager.find_permission_view_menu(
"datasource_access", stored_table.perm
)
is not None
)
# Assert no bogus permission is created
assert (
security_manager.find_permission_view_menu(
"datasource_access", f"[None].[tmp_perm_table](id:{stored_table.id})"
)
is None
)
# Cleanup
db.session.delete(table)
db.session.commit()
def test_after_insert_database(self):
security_manager.on_permission_view_after_insert = Mock()
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
assert tmp_db1.perm == f"[tmp_db1].(id:{tmp_db1.id})"
tmp_db1_pvm = security_manager.find_permission_view_menu(
"database_access", tmp_db1.perm
)
assert tmp_db1_pvm is not None
# Assert the hook is called
security_manager.on_permission_view_after_insert.assert_has_calls(
[
call(ANY, ANY, ANY),
]
)
call_args = security_manager.on_permission_view_after_insert.call_args
assert call_args.args[2].id == tmp_db1_pvm.id
db.session.delete(tmp_db1)
db.session.commit()
def test_after_insert_database_rollback(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.flush()
pvm_database = security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
assert pvm_database is not None
db.session.rollback()
pvm_database = security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1](id:{tmp_db1.id})"
)
assert pvm_database is None
def test_after_update_database__perm_database_access(self):
security_manager.on_view_menu_after_update = Mock()
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
assert (
security_manager.find_permission_view_menu("database_access", tmp_db1.perm)
is not None
)
tmp_db1.database_name = "tmp_db2"
db.session.commit()
# Assert that the old permission was updated
assert (
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
is None
)
# Assert that the db permission was updated
assert (
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db2].(id:{tmp_db1.id})"
)
is not None
)
# Assert the hook is called
tmp_db1_view_menu = security_manager.find_view_menu(
f"[tmp_db2].(id:{tmp_db1.id})"
)
security_manager.on_view_menu_after_update.assert_has_calls(
[
call(ANY, ANY, tmp_db1_view_menu),
]
)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_update_database_rollback(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
assert (
security_manager.find_permission_view_menu("database_access", tmp_db1.perm)
is not None
)
tmp_db1.database_name = "tmp_db2"
db.session.flush()
# Assert that the old permission was updated
assert (
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
is None
)
# Assert that the db permission was updated
assert (
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db2].(id:{tmp_db1.id})"
)
is not None
)
db.session.rollback()
assert (
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
is not None
)
# Assert that the db permission was updated
assert (
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db2].(id:{tmp_db1.id})"
)
is None
)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_update_database__perm_database_access_exists(self):
security_manager.on_permission_view_after_delete = Mock()
# Add a bogus existing permission before the change
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
security_manager.add_permission_view_menu(
"database_access", f"[tmp_db2].(id:{tmp_db1.id})"
)
assert (
security_manager.find_permission_view_menu("database_access", tmp_db1.perm)
is not None
)
tmp_db1.database_name = "tmp_db2"
db.session.commit()
# Assert that the old permission was updated
assert (
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
is None
)
# Assert that the db permission was updated
assert (
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db2].(id:{tmp_db1.id})"
)
is not None
)
security_manager.on_permission_view_after_delete.assert_has_calls(
[
call(ANY, ANY, ANY),
]
)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_update_database__perm_datasource_access(self):
security_manager.on_view_menu_after_update = Mock()
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db1,
)
db.session.add(table1)
table2 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table2",
database=tmp_db1,
)
db.session.add(table2)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
slice1 = db.session.query(Slice).filter_by(slice_name="tmp_slice1").one()
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
table2 = db.session.query(SqlaTable).filter_by(table_name="tmp_table2").one()
# assert initial perms
assert (
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
is not None
)
assert (
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table2](id:{table2.id})"
)
is not None
)
assert slice1.perm == f"[tmp_db1].[tmp_table1](id:{table1.id})"
assert table1.perm == f"[tmp_db1].[tmp_table1](id:{table1.id})"
assert table2.perm == f"[tmp_db1].[tmp_table2](id:{table2.id})"
# Refresh and update the database name
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
tmp_db1.database_name = "tmp_db2"
db.session.commit()
# Assert that the old permissions were updated
assert (
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
is None
)
assert (
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table2](id:{table2.id})"
)
is None
)
# Assert that the db permission was updated
assert (
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db2].[tmp_table1](id:{table1.id})"
)
is not None
)
assert (
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db2].[tmp_table2](id:{table2.id})"
)
is not None
)
assert slice1.perm == f"[tmp_db2].[tmp_table1](id:{table1.id})"
assert table1.perm == f"[tmp_db2].[tmp_table1](id:{table1.id})"
assert table2.perm == f"[tmp_db2].[tmp_table2](id:{table2.id})"
# Assert hooks are called
tmp_db1_view_menu = security_manager.find_view_menu(
f"[tmp_db2].(id:{tmp_db1.id})"
)
table1_view_menu = security_manager.find_view_menu(
f"[tmp_db2].[tmp_table1](id:{table1.id})"
)
table2_view_menu = security_manager.find_view_menu(
f"[tmp_db2].[tmp_table2](id:{table2.id})"
)
security_manager.on_view_menu_after_update.assert_has_calls(
[
call(ANY, ANY, tmp_db1_view_menu),
call(ANY, ANY, table1_view_menu),
call(ANY, ANY, table2_view_menu),
]
)
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(table2)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_delete_database(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
database_pvm = security_manager.find_permission_view_menu(
"database_access", tmp_db1.perm
)
assert database_pvm is not None
role1 = Role(name="tmp_role1")
role1.permissions.append(database_pvm)
db.session.add(role1)
db.session.commit()
db.session.delete(tmp_db1)
db.session.commit()
# Assert that PVM is removed from Role
role1 = security_manager.find_role("tmp_role1")
assert role1.permissions == []
# Assert that the old permission was updated
assert (
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
is None
)
# Cleanup
db.session.delete(role1)
db.session.commit()
def test_after_delete_database_rollback(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
database_pvm = security_manager.find_permission_view_menu(
"database_access", tmp_db1.perm
)
assert database_pvm is not None
role1 = Role(name="tmp_role1")
role1.permissions.append(database_pvm)
db.session.add(role1)
db.session.commit()
db.session.delete(tmp_db1)
db.session.flush()
role1 = security_manager.find_role("tmp_role1")
assert role1.permissions == []
assert (
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
is None
)
db.session.rollback()
# Test a rollback reverts everything
database_pvm = security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
role1 = security_manager.find_role("tmp_role1")
assert role1.permissions == [database_pvm]
# Cleanup
db.session.delete(role1)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_delete_dataset(self):
security_manager.on_permission_view_after_delete = Mock()
tmp_db = Database(database_name="tmp_db", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db,
)
db.session.add(table1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is not None
role1 = Role(name="tmp_role1")
role1.permissions.append(table1_pvm)
db.session.add(role1)
db.session.commit()
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test delete
db.session.delete(table1)
db.session.commit()
role1 = security_manager.find_role("tmp_role1")
assert role1.permissions == []
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is None
table1_view_menu = security_manager.find_view_menu(
f"[tmp_db].[tmp_table1](id:{table1.id})"
)
assert table1_view_menu is None
# Assert the hook is called
security_manager.on_permission_view_after_delete.assert_has_calls(
[
call(ANY, ANY, ANY),
]
)
# cleanup
db.session.delete(role1)
db.session.delete(tmp_db)
db.session.commit()
def test_after_delete_dataset_rollback(self):
tmp_db = Database(database_name="tmp_db", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db,
)
db.session.add(table1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is not None
role1 = Role(name="tmp_role1")
role1.permissions.append(table1_pvm)
db.session.add(role1)
db.session.commit()
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test delete, permissions are correctly deleted
db.session.delete(table1)
db.session.flush()
role1 = security_manager.find_role("tmp_role1")
assert role1.permissions == []
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is None
# Test rollback, permissions exist everything is correctly rollback
db.session.rollback()
role1 = security_manager.find_role("tmp_role1")
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is not None
assert role1.permissions == [table1_pvm]
# cleanup
db.session.delete(table1)
db.session.delete(role1)
db.session.delete(tmp_db)
db.session.commit()
def test_after_update_dataset__name_changes(self):
security_manager.on_view_menu_after_update = Mock()
tmp_db = Database(database_name="tmp_db", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db,
)
db.session.add(table1)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is not None
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test update
table1.table_name = "tmp_table1_changed"
db.session.commit()
# Test old permission does not exist
old_table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
assert old_table1_pvm is None
# Test new permission exist
new_table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1_changed](id:{table1.id})"
)
assert new_table1_pvm is not None
# test dataset permission changed
changed_table1 = (
db.session.query(SqlaTable).filter_by(table_name="tmp_table1_changed").one()
)
assert changed_table1.perm == f"[tmp_db].[tmp_table1_changed](id:{table1.id})"
# Test Chart permission changed
slice1 = db.session.query(Slice).filter_by(slice_name="tmp_slice1").one()
assert slice1.perm == f"[tmp_db].[tmp_table1_changed](id:{table1.id})"
# Assert hook is called
view_menu_dataset = security_manager.find_view_menu(
f"[tmp_db].[tmp_table1_changed](id:{table1.id})"
)
security_manager.on_view_menu_after_update.assert_has_calls(
[
call(ANY, ANY, view_menu_dataset),
]
)
# cleanup
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(tmp_db)
db.session.commit()
def test_after_update_dataset_rollback(self):
tmp_db = Database(database_name="tmp_db", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db,
)
db.session.add(table1)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test update
table1.table_name = "tmp_table1_changed"
db.session.flush()
# Test old permission does not exist
old_table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
assert old_table1_pvm is None
# Test new permission exist
new_table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1_changed](id:{table1.id})"
)
assert new_table1_pvm is not None
# Test rollback
db.session.rollback()
old_table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
assert old_table1_pvm is not None
# cleanup
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(tmp_db)
db.session.commit()
def test_after_update_dataset__db_changes(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
tmp_db2 = Database(database_name="tmp_db2", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.add(tmp_db2)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db1,
)
db.session.add(table1)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is not None
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test update
table1.database = tmp_db2
db.session.commit()
# Test old permission does not exist
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is None
# Test new permission exist
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db2].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is not None
# test dataset permission and schema permission changed
changed_table1 = (
db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
)
assert changed_table1.perm == f"[tmp_db2].[tmp_table1](id:{table1.id})"
assert changed_table1.schema_perm == "[tmp_db2].[tmp_schema]" # noqa: F541
# Test Chart permission changed
slice1 = db.session.query(Slice).filter_by(slice_name="tmp_slice1").one()
assert slice1.perm == f"[tmp_db2].[tmp_table1](id:{table1.id})"
assert slice1.schema_perm == f"[tmp_db2].[tmp_schema]" # noqa: F541
# cleanup
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(tmp_db1)
db.session.delete(tmp_db2)
db.session.commit()
def test_after_update_dataset__schema_changes(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db1,
)
db.session.add(table1)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is not None
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test update
table1.schema = "tmp_schema_changed"
db.session.commit()
# Test permission still exists
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is not None
# test dataset schema permission changed
changed_table1 = (
db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
)
assert changed_table1.perm == f"[tmp_db1].[tmp_table1](id:{table1.id})"
assert changed_table1.schema_perm == "[tmp_db1].[tmp_schema_changed]" # noqa: F541
# Test Chart schema permission changed
slice1 = db.session.query(Slice).filter_by(slice_name="tmp_slice1").one()
assert slice1.perm == f"[tmp_db1].[tmp_table1](id:{table1.id})"
assert slice1.schema_perm == "[tmp_db1].[tmp_schema_changed]" # noqa: F541
# cleanup
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_update_dataset__schema_none(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db1,
)
db.session.add(table1)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is not None
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test update
table1.schema = None
db.session.commit()
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
assert table1.perm == f"[tmp_db1].[tmp_table1](id:{table1.id})"
assert table1.schema_perm is None
# cleanup
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_update_dataset__name_db_changes(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
tmp_db2 = Database(database_name="tmp_db2", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.add(tmp_db2)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db1,
)
db.session.add(table1)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is not None
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test update
table1.table_name = "tmp_table1_changed"
table1.database = tmp_db2
db.session.commit()
# Test old permission does not exist
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
assert table1_pvm is None
# Test new permission exist
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db2].[tmp_table1_changed](id:{table1.id})"
)
assert table1_pvm is not None
# test dataset permission and schema permission changed
changed_table1 = (
db.session.query(SqlaTable).filter_by(table_name="tmp_table1_changed").one()
)
assert changed_table1.perm == f"[tmp_db2].[tmp_table1_changed](id:{table1.id})"
assert changed_table1.schema_perm == "[tmp_db2].[tmp_schema]" # noqa: F541
# Test Chart permission changed
slice1 = db.session.query(Slice).filter_by(slice_name="tmp_slice1").one()
assert slice1.perm == f"[tmp_db2].[tmp_table1_changed](id:{table1.id})"
assert slice1.schema_perm == f"[tmp_db2].[tmp_schema]" # noqa: F541
# cleanup
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(tmp_db1)
db.session.delete(tmp_db2)
db.session.commit()
def test_hybrid_perm_database(self):
database = Database(database_name="tmp_database3", sqlalchemy_uri="sqlite://")
db.session.add(database)
id_ = (
db.session.query(Database.id)
.filter_by(database_name="tmp_database3")
.scalar()
)
record = (
db.session.query(Database)
.filter_by(perm=f"[tmp_database3].(id:{id_})")
.one()
)
assert record.get_perm() == record.perm
assert record.id == id_
assert record.database_name == "tmp_database3"
db.session.delete(database)
db.session.commit()
def test_set_perm_slice(self):
database = Database(database_name="tmp_database", sqlalchemy_uri="sqlite://")
table = SqlaTable(table_name="tmp_perm_table", database=database)
db.session.add(database)
db.session.add(table)
db.session.commit()
# no schema permission
slice = Slice(
datasource_id=table.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_perm_table",
slice_name="slice_name",
)
db.session.add(slice)
db.session.commit()
slice = db.session.query(Slice).filter_by(slice_name="slice_name").one()
assert slice.perm == table.perm
assert slice.perm == f"[tmp_database].[tmp_perm_table](id:{table.id})"
assert slice.schema_perm == table.schema_perm
assert slice.schema_perm is None
table.schema = "tmp_perm_schema"
table.table_name = "tmp_perm_table_v2"
db.session.commit()
table = (
db.session.query(SqlaTable).filter_by(table_name="tmp_perm_table_v2").one()
)
assert slice.perm == table.perm
assert slice.perm == f"[tmp_database].[tmp_perm_table_v2](id:{table.id})"
assert table.perm == f"[tmp_database].[tmp_perm_table_v2](id:{table.id})"
assert slice.schema_perm == table.schema_perm
assert slice.schema_perm == "[tmp_database].[tmp_perm_schema]"
db.session.delete(slice)
db.session.delete(table)
db.session.delete(database)
db.session.commit()
@patch("superset.utils.core.g")
@patch("superset.security.manager.g")
def test_schemas_accessible_by_user_admin(self, mock_sm_g, mock_g):
mock_g.user = mock_sm_g.user = security_manager.find_user("admin")
with self.client.application.test_request_context():
database = get_example_database()
schemas = security_manager.get_schemas_accessible_by_user(
database, None, {"1", "2", "3"}
)
assert schemas == {"1", "2", "3"} # no changes
@patch("superset.utils.core.g")
@patch("superset.security.manager.g")
def test_schemas_accessible_by_user_schema_access(self, mock_sm_g, mock_g):
# User has schema access to the schema 1
create_schema_perm("[examples].[1]")
mock_g.user = mock_sm_g.user = security_manager.find_user("gamma")
with self.client.application.test_request_context():
database = get_example_database()
schemas = security_manager.get_schemas_accessible_by_user(
database, None, {"1", "2", "3"}
)
# temp_schema is not passed in the params
assert schemas == {"1"}
delete_schema_perm("[examples].[1]")
def test_schemas_accessible_by_user_datasource_access(self):
# User has schema access to the datasource temp_schema.wb_health_population in examples DB. # noqa: E501
database = get_example_database()
with self.client.application.test_request_context():
with override_user(security_manager.find_user("gamma")):
schemas = security_manager.get_schemas_accessible_by_user(
database, None, {"temp_schema", "2", "3"}
)
assert schemas == {"temp_schema"}
def test_schemas_accessible_by_user_datasource_and_schema_access(self):
# User has schema access to the datasource temp_schema.wb_health_population in examples DB. # noqa: E501
create_schema_perm("[examples].[2]")
with self.client.application.test_request_context():
database = get_example_database()
with override_user(security_manager.find_user("gamma")):
schemas = security_manager.get_schemas_accessible_by_user(
database, None, {"temp_schema", "2", "3"}
)
assert schemas == {"temp_schema", "2"}
vm = security_manager.find_permission_view_menu(
"schema_access", "[examples].[2]"
)
assert vm is not None
delete_schema_perm("[examples].[2]")
@pytest.mark.usefixtures("load_world_bank_dashboard_with_slices")
def test_gamma_user_schema_access_to_dashboards(self):
dash = db.session.query(Dashboard).filter_by(slug="world_health").first()
dash.published = True
db.session.commit()
self.login(GAMMA_USERNAME)
data = str(self.client.get("api/v1/dashboard/").data)
assert "/superset/dashboard/world_health/" in data
assert "/superset/dashboard/births/" not in data
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
@pytest.mark.usefixtures("public_role_like_gamma")
def test_public_sync_role_data_perms(self):
"""
Security: Tests if the sync role method preserves data access permissions
if they already exist on a public role.
Also check that non data access permissions are removed
"""
table = db.session.query(SqlaTable).filter_by(table_name="birth_names").one()
self.grant_public_access_to_table(table)
public_role = security_manager.get_public_role()
unwanted_pvm = security_manager.find_permission_view_menu(
"menu_access", "Security"
)
public_role.permissions.append(unwanted_pvm)
db.session.commit()
security_manager.sync_role_definitions()
public_role = security_manager.get_public_role()
public_role_resource_names = [
permission.view_menu.name for permission in public_role.permissions
]
assert table.get_perm() in public_role_resource_names
assert "Security" not in public_role_resource_names
# Cleanup
self.revoke_public_access_to_table(table)
@pytest.mark.usefixtures("public_role_like_test_role")
def test_public_sync_role_builtin_perms(self):
"""
Security: Tests public role creation based on a builtin role
"""
public_role = security_manager.get_public_role()
public_role_resource_names = [
[permission.view_menu.name, permission.permission.name]
for permission in public_role.permissions
]
for pvm in current_app.config["FAB_ROLES"]["TestRole"]:
assert pvm in public_role_resource_names
@pytest.mark.usefixtures("load_world_bank_dashboard_with_slices")
def test_sqllab_gamma_user_schema_access_to_sqllab(self):
example_db = (
db.session.query(Database).filter_by(database_name="examples").one()
)
example_db.expose_in_sqllab = True
db.session.commit()
arguments = {
"keys": ["none"],
"columns": ["expose_in_sqllab"],
"filters": [{"col": "expose_in_sqllab", "opr": "eq", "value": True}],
"order_columns": "database_name",
"order_direction": "asc",
"page": 0,
"page_size": -1,
}
NEW_FLASK_GET_SQL_DBS_REQUEST = f"/api/v1/database/?q={prison.dumps(arguments)}" # noqa: N806
self.login(GAMMA_USERNAME)
databases_json = self.client.get(NEW_FLASK_GET_SQL_DBS_REQUEST).json
assert databases_json["count"] == 1
def assert_can_read(self, view_menu, permissions_set):
if view_menu in NEW_SECURITY_CONVERGE_VIEWS:
assert ("can_read", view_menu) in permissions_set
else:
assert ("can_list", view_menu) in permissions_set
def assert_can_write(self, view_menu, permissions_set):
if view_menu in NEW_SECURITY_CONVERGE_VIEWS:
assert ("can_write", view_menu) in permissions_set
else:
assert ("can_add", view_menu) in permissions_set
assert ("can_delete", view_menu) in permissions_set
assert ("can_edit", view_menu) in permissions_set
def assert_cannot_write(self, view_menu, permissions_set):
if view_menu in NEW_SECURITY_CONVERGE_VIEWS:
assert ("can_write", view_menu) not in permissions_set
else:
assert ("can_add", view_menu) not in permissions_set
assert ("can_delete", view_menu) not in permissions_set
assert ("can_edit", view_menu) not in permissions_set
assert ("can_save", view_menu) not in permissions_set
def assert_can_all(self, view_menu, permissions_set):
self.assert_can_read(view_menu, permissions_set)
self.assert_can_write(view_menu, permissions_set)
def assert_can_menu(self, view_menu, permissions_set):
assert ("menu_access", view_menu) in permissions_set
def assert_cannot_menu(self, view_menu, permissions_set):
assert ("menu_access", view_menu) not in permissions_set
def assert_cannot_gamma(self, perm_set):
self.assert_cannot_write("Annotation", perm_set)
self.assert_cannot_write("CssTemplate", perm_set)
self.assert_cannot_menu("SQL Lab", perm_set)
self.assert_cannot_menu("CSS Templates", perm_set)
self.assert_cannot_menu("Annotation Layers", perm_set)
self.assert_cannot_menu("Manage", perm_set)
self.assert_cannot_menu("Queries", perm_set)
self.assert_cannot_menu("Import dashboards", perm_set)
self.assert_cannot_menu("Upload a CSV", perm_set)
self.assert_cannot_menu("ReportSchedule", perm_set)
self.assert_cannot_menu("Alerts & Report", perm_set)
assert ("can_upload", "Database") not in perm_set
def assert_can_gamma(self, perm_set):
self.assert_can_read("Dataset", perm_set)
# make sure that user can create slices and dashboards
self.assert_can_all("Dashboard", perm_set)
self.assert_can_all("Chart", perm_set)
assert ("can_csv", "Superset") in perm_set
assert ("can_dashboard", "Superset") in perm_set
assert ("can_explore", "Superset") in perm_set
assert ("can_share_chart", "Superset") in perm_set
assert ("can_share_dashboard", "Superset") in perm_set
assert ("can_explore_json", "Superset") in perm_set
assert ("can_explore_json", "Superset") in perm_set
assert ("can_userinfo", "UserDBModelView") in perm_set
assert ("can_view_chart_as_table", "Dashboard") in perm_set
assert ("can_view_query", "Dashboard") in perm_set
self.assert_can_menu("Databases", perm_set)
self.assert_can_menu("Datasets", perm_set)
self.assert_can_menu("Data", perm_set)
self.assert_can_menu("Charts", perm_set)
self.assert_can_menu("Dashboards", perm_set)
def assert_can_alpha(self, perm_set):
self.assert_can_all("Annotation", perm_set)
self.assert_can_all("CssTemplate", perm_set)
self.assert_can_all("Dataset", perm_set)
self.assert_can_read("Database", perm_set)
assert ("can_upload", "Database") in perm_set
self.assert_can_menu("Manage", perm_set)
self.assert_can_menu("Annotation Layers", perm_set)
self.assert_can_menu("CSS Templates", perm_set)
assert ("all_datasource_access", "all_datasource_access") in perm_set
def assert_cannot_alpha(self, perm_set):
self.assert_cannot_write("Queries", perm_set)
self.assert_cannot_write("RoleModelView", perm_set)
self.assert_cannot_write("UserDBModelView", perm_set)
self.assert_cannot_write("Database", perm_set)
def assert_can_admin(self, perm_set):
self.assert_can_all("Database", perm_set)
self.assert_can_all("RoleModelView", perm_set)
self.assert_can_all("UserDBModelView", perm_set)
assert ("all_database_access", "all_database_access") in perm_set
self.assert_can_menu("Security", perm_set)
self.assert_can_menu("List Users", perm_set)
self.assert_can_menu("List Roles", perm_set)
def test_is_admin_only(self):
assert not security_manager._is_admin_only(
security_manager.find_permission_view_menu("can_read", "Dataset")
)
assert not security_manager._is_admin_only(
security_manager.find_permission_view_menu(
"all_datasource_access", "all_datasource_access"
)
)
log_permissions = ["can_read"]
for log_permission in log_permissions:
assert security_manager._is_admin_only(
security_manager.find_permission_view_menu(log_permission, "Log")
)
assert security_manager._is_admin_only(
security_manager.find_permission_view_menu("can_edit", "UserDBModelView")
)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_is_alpha_only(self):
assert not security_manager._is_alpha_only(
security_manager.find_permission_view_menu("can_read", "Dataset")
)
assert security_manager._is_alpha_only(
security_manager.find_permission_view_menu("can_write", "Dataset")
)
assert security_manager._is_alpha_only(
security_manager.find_permission_view_menu(
"all_datasource_access", "all_datasource_access"
)
)
assert security_manager._is_alpha_only(
security_manager.find_permission_view_menu(
"all_database_access", "all_database_access"
)
)
def test_is_gamma_pvm(self):
assert security_manager._is_gamma_pvm(
security_manager.find_permission_view_menu("can_read", "Dataset")
)
def test_gamma_permissions_basic(self):
self.assert_can_gamma(get_perm_tuples("Gamma"))
self.assert_cannot_alpha(get_perm_tuples("Gamma"))
self.assert_cannot_gamma(get_perm_tuples("Gamma"))
@pytest.mark.usefixtures("public_role_like_gamma")
def test_public_permissions_basic(self):
self.assert_can_gamma(get_perm_tuples("Public"))
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_alpha_permissions(self):
alpha_perm_tuples = get_perm_tuples("Alpha")
self.assert_can_gamma(alpha_perm_tuples)
self.assert_can_alpha(alpha_perm_tuples)
self.assert_cannot_alpha(alpha_perm_tuples)
assert ("can_this_form_get", "UserInfoEditView") not in alpha_perm_tuples
assert ("can_this_form_post", "UserInfoEditView") not in alpha_perm_tuples
@pytest.mark.usefixtures("load_world_bank_dashboard_with_slices")
def test_admin_permissions(self):
if backend() == "hive":
return
self.assert_can_gamma(get_perm_tuples("Admin"))
self.assert_can_alpha(get_perm_tuples("Admin"))
self.assert_can_admin(get_perm_tuples("Admin"))
def test_sql_lab_permissions(self):
sql_lab_set = get_perm_tuples("sql_lab")
assert sql_lab_set == {
("can_activate", "TabStateView"),
("can_csv", "Superset"),
("can_delete_query", "TabStateView"),
("can_delete", "TabStateView"),
("can_execute_sql_query", "SQLLab"),
("can_export", "SavedQuery"),
("can_export_csv", "SQLLab"),
("can_get", "TabStateView"),
("can_get_results", "SQLLab"),
("can_migrate_query", "TabStateView"),
("can_sqllab", "Superset"),
("can_sqllab_history", "Superset"),
("can_put", "TabStateView"),
("can_post", "TabStateView"),
("can_write", "SavedQuery"),
("can_read", "Query"),
("can_read", "Database"),
("can_read", "SQLLab"),
("can_read", "SavedQuery"),
("menu_access", "Query Search"),
("menu_access", "Saved Queries"),
("menu_access", "SQL Editor"),
("menu_access", "SQL Lab"),
("can_read", "SqlLabPermalinkRestApi"),
("can_write", "SqlLabPermalinkRestApi"),
}
self.assert_cannot_alpha(sql_lab_set)
def test_gamma_permissions(self):
gamma_perm_set = set()
for perm in security_manager.find_role("Gamma").permissions:
gamma_perm_set.add((perm.permission.name, perm.view_menu.name))
# check read only perms
# make sure that user can create slices and dashboards
self.assert_can_all("Dashboard", gamma_perm_set)
self.assert_can_read("Dataset", gamma_perm_set)
# make sure that user can create slices and dashboards
self.assert_can_all("Chart", gamma_perm_set)
self.assert_cannot_write("UserDBModelView", gamma_perm_set)
self.assert_cannot_write("RoleModelView", gamma_perm_set)
assert ("can_csv", "Superset") in gamma_perm_set
assert ("can_dashboard", "Superset") in gamma_perm_set
assert ("can_explore", "Superset") in gamma_perm_set
assert ("can_share_chart", "Superset") in gamma_perm_set
assert ("can_share_dashboard", "Superset") in gamma_perm_set
assert ("can_explore_json", "Superset") in gamma_perm_set
assert ("can_userinfo", "UserDBModelView") in gamma_perm_set
assert ("can_view_chart_as_table", "Dashboard") in gamma_perm_set
assert ("can_view_query", "Dashboard") in gamma_perm_set
def test_views_are_secured(self):
"""Preventing the addition of unsecured views without has_access decorator"""
# These FAB views are secured in their body as opposed to by decorators
method_allowlist = ("action", "action_post")
# List of redirect & other benign views
views_allowlist = [
["MyIndexView", "index"],
["UtilView", "back"],
["LocaleView", "index"],
["AuthDBView", "login"],
["AuthDBView", "logout"],
["CurrentUserRestApi", "get_me"],
["CurrentUserRestApi", "update_me"],
["CurrentUserRestApi", "get_my_roles"],
["UserRestApi", "avatar"],
# TODO (embedded) remove Dashboard:embedded after uuids have been shipped
["Dashboard", "embedded"],
["EmbeddedView", "embedded"],
["R", "index"],
["Superset", "log"],
["Superset", "theme"],
["Superset", "welcome"],
["SecurityApi", "login"],
["SecurityApi", "refresh"],
["SupersetIndexView", "index"],
["SupersetIndexView", "patch_flask_locale"],
["DatabaseRestApi", "oauth2"],
["SupersetAuthView", "login"],
["SupersetAuthView", "logout"],
["SupersetRegisterUserView", "register"],
["SupersetRegisterUserView", "activation"],
]
unsecured_views = []
for view_class in appbuilder.baseviews:
class_name = view_class.__class__.__name__
for name, value in inspect.getmembers(
view_class, predicate=inspect.ismethod
):
if (
name not in method_allowlist
and [class_name, name] not in views_allowlist
and hasattr(value, "_urls")
and not hasattr(value, "_permission_name")
):
unsecured_views.append((class_name, name))
if unsecured_views:
view_str = "\n".join([str(v) for v in unsecured_views])
raise Exception(f"Some views are not secured:\n{view_str}")
class TestSecurityManager(SupersetTestCase):
"""
Testing the Security Manager.
"""
@patch("superset.security.SupersetSecurityManager.raise_for_access")
def test_can_access_datasource(self, mock_raise_for_access):
datasource = self.get_datasource_mock()
mock_raise_for_access.return_value = None
assert security_manager.can_access_datasource(datasource=datasource)
mock_raise_for_access.side_effect = SupersetSecurityException(
SupersetError(
"dummy",
SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR,
ErrorLevel.ERROR,
)
)
assert not security_manager.can_access_datasource(datasource=datasource)
@patch("superset.security.SupersetSecurityManager.raise_for_access")
def test_can_access_table(self, mock_raise_for_access):
database = get_example_database()
table = Table("bar", "foo")
mock_raise_for_access.return_value = None
assert security_manager.can_access_table(database, table)
mock_raise_for_access.side_effect = SupersetSecurityException(
SupersetError(
"dummy", SupersetErrorType.TABLE_SECURITY_ACCESS_ERROR, ErrorLevel.ERROR
)
)
assert not security_manager.can_access_table(database, table)
@patch("superset.security.SupersetSecurityManager.is_owner")
@patch("superset.security.SupersetSecurityManager.can_access")
@patch("superset.security.SupersetSecurityManager.can_access_schema")
def test_raise_for_access_datasource(
self, mock_can_access_schema, mock_can_access, mock_is_owner
):
datasource = self.get_datasource_mock()
mock_can_access_schema.return_value = True
security_manager.raise_for_access(datasource=datasource)
mock_can_access.return_value = False
mock_can_access_schema.return_value = False
mock_is_owner.return_value = False
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(datasource=datasource)
@patch("superset.security.SupersetSecurityManager.is_owner")
@patch("superset.security.SupersetSecurityManager.can_access")
def test_raise_for_access_query(self, mock_can_access, mock_is_owner):
query = Mock(
database=get_example_database(),
schema="bar",
sql="SELECT * FROM foo",
catalog=None,
)
mock_can_access.return_value = True
security_manager.raise_for_access(query=query)
mock_can_access.return_value = False
mock_is_owner.return_value = False
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(query=query)
def test_raise_for_access_sql_fails(self):
with override_user(security_manager.find_user("gamma")):
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(
database=get_example_database(),
schema="bar",
sql="SELECT * FROM foo",
)
@patch("superset.security.SupersetSecurityManager.is_owner")
@patch("superset.security.SupersetSecurityManager.can_access")
def test_raise_for_access_sql(self, mock_can_access, mock_is_owner):
mock_can_access.return_value = True
mock_is_owner.return_value = True
with override_user(security_manager.find_user("gamma")):
security_manager.raise_for_access(
database=get_example_database(), schema="bar", sql="SELECT * FROM foo"
)
@patch("superset.security.SupersetSecurityManager.is_owner")
@patch("superset.security.SupersetSecurityManager.can_access")
@patch("superset.security.SupersetSecurityManager.can_access_schema")
def test_raise_for_access_query_context(
self, mock_can_access_schema, mock_can_access, mock_is_owner
):
query_context = Mock(datasource=self.get_datasource_mock(), form_data={})
mock_can_access_schema.return_value = True
security_manager.raise_for_access(query_context=query_context)
mock_can_access.return_value = False
mock_can_access_schema.return_value = False
mock_is_owner.return_value = False
with override_user(security_manager.find_user("gamma")):
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(query_context=query_context)
@patch("superset.security.SupersetSecurityManager.can_access")
def test_raise_for_access_table(self, mock_can_access):
database = get_example_database()
table = Table("bar", "foo")
mock_can_access.return_value = True
security_manager.raise_for_access(database=database, table=table)
mock_can_access.return_value = False
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(database=database, table=table)
@patch("superset.security.SupersetSecurityManager.is_owner")
@patch("superset.security.SupersetSecurityManager.can_access")
@patch("superset.security.SupersetSecurityManager.can_access_schema")
def test_raise_for_access_viz(
self, mock_can_access_schema, mock_can_access, mock_is_owner
):
test_viz = viz.TimeTableViz(self.get_datasource_mock(), form_data={})
mock_can_access_schema.return_value = True
security_manager.raise_for_access(viz=test_viz)
mock_can_access.return_value = False
mock_can_access_schema.return_value = False
mock_is_owner.return_value = False
with override_user(security_manager.find_user("gamma")):
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(viz=test_viz)
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
@pytest.mark.usefixtures("load_world_bank_dashboard_with_slices")
@with_feature_flags(DASHBOARD_RBAC=True)
@patch("superset.security.SupersetSecurityManager.is_owner")
@patch("superset.security.SupersetSecurityManager.can_access")
@patch("superset.security.SupersetSecurityManager.can_access_schema")
def test_raise_for_access_rbac(
self,
mock_can_access_schema,
mock_can_access,
mock_is_owner,
):
births = self.get_dash_by_slug("births")
girls = self.get_slice("Girls")
birth_names = girls.datasource
world_health = self.get_dash_by_slug("world_health")
treemap = self.get_slice("Treemap")
births.json_metadata = json.dumps(
{
"native_filter_configuration": [
{
"id": "NATIVE_FILTER-ABCDEFGH",
"targets": [{"datasetId": birth_names.id}],
},
{
"id": "NATIVE_FILTER-IJKLMNOP",
"targets": [{"datasetId": treemap.id}],
},
]
}
)
mock_is_owner.return_value = False
mock_can_access.return_value = False
mock_can_access_schema.return_value = False
with override_user(security_manager.find_user("gamma")):
for kwarg in ["query_context", "viz"]:
births.roles = []
# No dashboard roles.
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": births.id,
"slice_id": girls.id,
},
)
}
)
births.roles = [self.get_role("Gamma")]
# Undefined dashboard.
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={},
)
}
)
# Undefined dashboard chart.
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={"dashboardId": births.id},
)
}
)
# Ill-defined dashboard chart.
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": births.id,
"slice_id": treemap.id,
},
)
}
)
# Dashboard chart not associated with said datasource.
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": world_health.id,
"slice_id": treemap.id,
},
)
}
)
# Dashboard chart associated with said datasource.
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": births.id,
"slice_id": girls.id,
},
)
}
)
# Ill-defined native filter.
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": births.id,
"type": "NATIVE_FILTER",
},
)
}
)
# Native filter not associated with said datasource.
with self.assertRaises(SupersetSecurityException): # noqa: PT027
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": births.id,
"native_filter_id": "NATIVE_FILTER-IJKLMNOP",
"type": "NATIVE_FILTER",
},
)
}
)
# Native filter associated with said datasource.
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": births.id,
"native_filter_id": "NATIVE_FILTER-ABCDEFGH",
"type": "NATIVE_FILTER",
},
)
}
)
def test_get_admin_user_roles(self):
admin = security_manager.find_user("admin")
with override_user(admin):
roles = security_manager.get_user_roles()
assert admin.roles == roles
def test_get_gamma_user_roles(self):
admin = security_manager.find_user("gamma")
with override_user(admin):
roles = security_manager.get_user_roles()
assert admin.roles == roles
@pytest.mark.usefixtures("create_gamma_user_group")
def test_get_user_roles_with_groups(self):
user = security_manager.find_user("gamma_with_groups")
with override_user(user):
roles = security_manager.get_user_roles()
assert user.groups[0].roles == roles
@pytest.mark.usefixtures("create_gamma_user_group_with_dar")
def test_get_user_roles_with_groups_dar(self):
user = security_manager.find_user("gamma_with_groups")
with override_user(user):
role_names = [role.name for role in security_manager.get_user_roles()]
assert "Gamma" in role_names
assert "dar" in role_names
assert len(role_names) == 2
@pytest.mark.usefixtures("create_user_group_with_dar")
def test_user_view_menu_names_with_groups_dar(self):
user = security_manager.find_user("gamma_with_groups")
with override_user(user):
assert security_manager.user_view_menu_names("datasource_access") == {
"[examples].[birth_names](id:1)]"
}
@pytest.mark.usefixtures("create_gamma_user_group_with_dar")
def test_gamma_user_view_menu_names_with_groups_dar(self):
user = security_manager.find_user("gamma_with_groups")
with override_user(user):
# assert pvm for dar role
assert security_manager.user_view_menu_names("datasource_access") == {
"[examples].[birth_names](id:1)]"
}
# assert pvm for gamma role
assert security_manager.user_view_menu_names("can_external_metadata") == {
"Datasource"
}
assert security_manager.user_view_menu_names("can_recent_activity") == {
"Log"
}
def test_get_anonymous_roles(self):
with override_user(security_manager.get_anonymous_user()):
roles = security_manager.get_user_roles()
assert [security_manager.get_public_role()] == roles
def test_all_database_access(self):
gamma_user = security_manager.find_user(username="gamma")
g.user = gamma_user
# Double checking that gamma users can't access all databases
assert not security_manager.can_access_all_databases()
assert not security_manager.can_access_datasource(self.get_datasource_mock())
all_db_pvm = ("all_database_access", "all_database_access")
with self.temporary_user(gamma_user, extra_pvms=[all_db_pvm]):
assert security_manager.can_access_all_databases()
assert security_manager.can_access_datasource(self.get_datasource_mock())
class TestDatasources(SupersetTestCase):
@patch("superset.security.SupersetSecurityManager.can_access_database")
@patch("superset.security.SupersetSecurityManager.get_session")
def test_get_user_datasources_admin(
self, mock_get_session, mock_can_access_database
):
Datasource = namedtuple("Datasource", ["database", "schema", "name"])
mock_can_access_database.return_value = True
mock_get_session.query.return_value.filter.return_value.all.return_value = []
with mock.patch.object(
SqlaTable, "get_all_datasources"
) as mock_get_all_datasources:
mock_get_all_datasources.return_value = [
Datasource("database1", "schema1", "table1"),
Datasource("database1", "schema1", "table2"),
Datasource("database2", None, "table1"),
]
with override_user(security_manager.find_user("admin")):
datasources = security_manager.get_user_datasources()
assert sorted(datasources) == [
Datasource("database1", "schema1", "table1"),
Datasource("database1", "schema1", "table2"),
Datasource("database2", None, "table1"),
]
@patch("superset.security.SupersetSecurityManager.can_access_database")
@patch("superset.security.SupersetSecurityManager.get_session")
def test_get_user_datasources_gamma(
self, mock_get_session, mock_can_access_database
):
Datasource = namedtuple("Datasource", ["database", "schema", "name"])
mock_can_access_database.return_value = False
mock_get_session.query.return_value.filter.return_value.all.return_value = []
with mock.patch.object(
SqlaTable, "get_all_datasources"
) as mock_get_all_datasources:
mock_get_all_datasources.return_value = [
Datasource("database1", "schema1", "table1"),
Datasource("database1", "schema1", "table2"),
Datasource("database2", None, "table1"),
]
with override_user(security_manager.find_user("gamma")):
datasources = security_manager.get_user_datasources()
assert datasources == []
@patch("superset.security.SupersetSecurityManager.can_access_database")
@patch("superset.security.SupersetSecurityManager.get_session")
def test_get_user_datasources_gamma_with_schema(
self, mock_get_session, mock_can_access_database
):
Datasource = namedtuple("Datasource", ["database", "schema", "name"])
mock_can_access_database.return_value = False
mock_get_session.query.return_value.filter.return_value.all.return_value = [
Datasource("database1", "schema1", "table1"),
Datasource("database1", "schema1", "table2"),
]
with mock.patch.object(
SqlaTable, "get_all_datasources"
) as mock_get_all_datasources:
mock_get_all_datasources.return_value = [
Datasource("database1", "schema1", "table1"),
Datasource("database1", "schema1", "table2"),
Datasource("database2", None, "table1"),
]
with override_user(security_manager.find_user("gamma")):
datasources = security_manager.get_user_datasources()
assert sorted(datasources) == [
Datasource("database1", "schema1", "table1"),
Datasource("database1", "schema1", "table2"),
]
class FakeRequest:
headers: Any = {}
form: Any = {}
class TestGuestTokens(SupersetTestCase):
def create_guest_token(self):
user = {"username": "test_guest"}
resources = [{"some": "resource"}]
rls = [{"dataset": 1, "clause": "access = 1"}]
return security_manager.create_guest_access_token(user, resources, rls)
@patch("superset.security.SupersetSecurityManager._get_current_epoch_time")
def test_create_guest_access_token(self, get_time_mock):
now = time.time()
get_time_mock.return_value = now # so we know what it should =
user = {"username": "test_guest"}
resources = [{"some": "resource"}]
rls = [{"dataset": 1, "clause": "access = 1"}]
token = security_manager.create_guest_access_token(user, resources, rls)
aud = get_url_host()
# unfortunately we cannot mock time in the jwt lib
decoded_token = jwt.decode(
token,
self.app.config["GUEST_TOKEN_JWT_SECRET"],
algorithms=[self.app.config["GUEST_TOKEN_JWT_ALGO"]],
audience=aud,
)
assert user == decoded_token["user"]
assert resources == decoded_token["resources"]
assert now == decoded_token["iat"]
assert aud == decoded_token["aud"]
assert "guest" == decoded_token["type"]
assert (
now + self.app.config["GUEST_TOKEN_JWT_EXP_SECONDS"] == decoded_token["exp"]
)
def test_get_guest_user(self):
token = self.create_guest_token()
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = token
guest_user = security_manager.get_guest_user_from_request(fake_request)
assert guest_user is not None
assert "test_guest" == guest_user.username
def test_get_guest_user_with_request_form(self):
token = self.create_guest_token()
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = None
fake_request.form["guest_token"] = token
guest_user = security_manager.get_guest_user_from_request(fake_request)
assert guest_user is not None
assert "test_guest" == guest_user.username
@patch("superset.security.SupersetSecurityManager._get_current_epoch_time")
def test_get_guest_user_expired_token(self, get_time_mock):
# make a just-expired token
get_time_mock.return_value = (
time.time() - (self.app.config["GUEST_TOKEN_JWT_EXP_SECONDS"] * 1000) - 1
)
token = self.create_guest_token()
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = token
guest_user = security_manager.get_guest_user_from_request(fake_request)
assert guest_user is None
def test_get_guest_user_no_user(self):
user = None
resources = [{"type": "dashboard", "id": 1}]
rls = {}
token = security_manager.create_guest_access_token(user, resources, rls)
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = token
guest_user = security_manager.get_guest_user_from_request(fake_request)
assert guest_user is None
self.assertRaisesRegex(ValueError, "Guest token does not contain a user claim") # noqa: PT027
def test_get_guest_user_no_resource(self):
user = {"username": "test_guest"}
resources = []
rls = {}
token = security_manager.create_guest_access_token(user, resources, rls)
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = token
security_manager.get_guest_user_from_request(fake_request)
self.assertRaisesRegex( # noqa: PT027
ValueError, "Guest token does not contain a resources claim"
)
def test_get_guest_user_not_guest_type(self):
now = time.time()
user = {"username": "test_guest"}
resources = [{"some": "resource"}]
aud = get_url_host()
claims = {
"user": user,
"resources": resources,
"rls_rules": [],
# standard jwt claims:
"aud": aud,
"iat": now, # issued at
"type": "not_guest",
}
token = jwt.encode(
claims,
self.app.config["GUEST_TOKEN_JWT_SECRET"],
algorithm=self.app.config["GUEST_TOKEN_JWT_ALGO"],
)
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = token
guest_user = security_manager.get_guest_user_from_request(fake_request)
assert guest_user is None
self.assertRaisesRegex(ValueError, "This is not a guest token.") # noqa: PT027
def test_get_guest_user_bad_audience(self):
now = time.time()
user = {"username": "test_guest"}
resources = [{"some": "resource"}]
aud = get_url_host() # noqa: F841
claims = {
"user": user,
"resources": resources,
"rls_rules": [],
# standard jwt claims:
"aud": "bad_audience",
"iat": now, # issued at
"type": "guest",
}
token = jwt.encode(
claims,
self.app.config["GUEST_TOKEN_JWT_SECRET"],
algorithm=self.app.config["GUEST_TOKEN_JWT_ALGO"],
)
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = token
guest_user = security_manager.get_guest_user_from_request(fake_request)
self.assertRaisesRegex(jwt.exceptions.InvalidAudienceError, "Invalid audience") # noqa: PT027
assert guest_user is None
@patch("superset.security.SupersetSecurityManager._get_current_epoch_time")
def test_create_guest_access_token_callable_audience(self, get_time_mock):
now = time.time()
get_time_mock.return_value = now
self.app.config["GUEST_TOKEN_JWT_AUDIENCE"] = Mock(return_value="cool_code")
user = {"username": "test_guest"}
resources = [{"some": "resource"}]
rls = [{"dataset": 1, "clause": "access = 1"}]
token = security_manager.create_guest_access_token(user, resources, rls)
decoded_token = jwt.decode(
token,
self.app.config["GUEST_TOKEN_JWT_SECRET"],
algorithms=[self.app.config["GUEST_TOKEN_JWT_ALGO"]],
audience="cool_code",
)
self.app.config["GUEST_TOKEN_JWT_AUDIENCE"].assert_called_once()
assert "cool_code" == decoded_token["aud"]
assert "guest" == decoded_token["type"]
self.app.config["GUEST_TOKEN_JWT_AUDIENCE"] = None