blob: df008acc3056221b97160d80044bf09232df8106 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import contextlib
import datetime
import logging
from unittest import mock
import pytest
from flask_appbuilder import SQLA, Model, expose, has_access
from flask_appbuilder.views import BaseView, ModelView
from freezegun import freeze_time
from sqlalchemy import Column, Date, Float, Integer, String
from airflow.exceptions import AirflowException
from airflow.models import DagModel
from airflow.models.base import Base
from airflow.models.dag import DAG
from airflow.security import permissions
from airflow.www import app as application
from airflow.www.fab_security.manager import AnonymousUser
from airflow.www.fab_security.sqla.models import User, assoc_permission_role
from airflow.www.utils import CustomSQLAInterface
from tests.test_utils.api_connexion_utils import create_user_scope, delete_role, set_user_single_role
from tests.test_utils.asserts import assert_queries_count
from tests.test_utils.db import clear_db_dags, clear_db_runs
from tests.test_utils.mock_security_manager import MockSecurityManager
READ_WRITE = {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}
READ_ONLY = {permissions.ACTION_CAN_READ}
logging.basicConfig(format='%(asctime)s:%(levelname)s:%(name)s:%(message)s')
logging.getLogger().setLevel(logging.DEBUG)
log = logging.getLogger(__name__)
class SomeModel(Model):
id = Column(Integer, primary_key=True)
field_string = Column(String(50), unique=True, nullable=False)
field_integer = Column(Integer())
field_float = Column(Float())
field_date = Column(Date())
def __repr__(self):
return str(self.field_string)
class SomeModelView(ModelView):
datamodel = CustomSQLAInterface(SomeModel)
base_permissions = [
'can_list',
'can_show',
'can_add',
permissions.ACTION_CAN_EDIT,
permissions.ACTION_CAN_DELETE,
]
list_columns = ['field_string', 'field_integer', 'field_float', 'field_date']
class SomeBaseView(BaseView):
route_base = ''
@expose('/some_action')
@has_access
def some_action(self):
return "action!"
def _clear_db_dag_and_runs():
clear_db_runs()
clear_db_dags()
def _delete_dag_permissions(dag_id, security_manager):
dag_resource_name = permissions.resource_name_for_dag(dag_id)
for dag_action_name in security_manager.DAG_ACTIONS:
security_manager.delete_permission(dag_action_name, dag_resource_name)
def _create_dag_model(dag_id, session, security_manager):
dag_model = DagModel(dag_id=dag_id)
session.add(dag_model)
session.commit()
security_manager.sync_perm_for_dag(dag_id, access_control=None)
return dag_model
def _delete_dag_model(dag_model, session, security_manager):
session.delete(dag_model)
session.commit()
_delete_dag_permissions(dag_model.dag_id, security_manager)
@contextlib.contextmanager
def _create_dag_model_context(dag_id, session, security_manager):
dag = _create_dag_model(dag_id, session, security_manager)
yield dag
_delete_dag_model(dag, session, security_manager)
@pytest.fixture(scope="module", autouse=True)
def clear_db_after_suite():
yield None
_clear_db_dag_and_runs()
@pytest.fixture(scope="function", autouse=True)
def clear_db_before_test():
_clear_db_dag_and_runs()
@pytest.fixture(scope="module")
def app():
_app = application.create_app(testing=True)
_app.config['WTF_CSRF_ENABLED'] = False
return _app
@pytest.fixture(scope="module")
def app_builder(app):
app_builder = app.appbuilder
app_builder.add_view(SomeBaseView, "SomeBaseView", category="BaseViews")
app_builder.add_view(SomeModelView, "SomeModelView", category="ModelViews")
return app.appbuilder
@pytest.fixture(scope="module")
def security_manager(app_builder):
return app_builder.sm
@pytest.fixture(scope="module")
def session(app_builder):
return app_builder.get_session
@pytest.fixture(scope="module")
def db(app):
return SQLA(app)
@pytest.fixture(scope="function")
def role(request, app, security_manager):
params = request.param
_role = None
params['mock_roles'] = [{'role': params['name'], 'perms': params['permissions']}]
if params.get("create", True):
security_manager.bulk_sync_roles(params['mock_roles'])
_role = security_manager.find_role(params['name'])
yield _role, params
delete_role(app, params['name'])
@pytest.fixture(scope="function")
def mock_dag_models(request, session, security_manager):
dags_ids = request.param
dags = [_create_dag_model(dag_id, session, security_manager) for dag_id in dags_ids]
yield dags_ids
for dag in dags:
_delete_dag_model(dag, session, security_manager)
@pytest.fixture(scope="function")
def sample_dags(security_manager):
dags = [
DAG('has_access_control', access_control={'Public': {permissions.ACTION_CAN_READ}}),
DAG('no_access_control'),
]
yield dags
for dag in dags:
_delete_dag_permissions(dag.dag_id, security_manager)
@pytest.fixture(scope="module")
def has_dag_perm(security_manager):
def _has_dag_perm(perm, dag_id, user):
root_dag_id = security_manager._get_root_dag_id(dag_id)
return security_manager.has_access(perm, permissions.resource_name_for_dag(root_dag_id), user)
return _has_dag_perm
@pytest.fixture(scope="module")
def assert_user_has_dag_perms(has_dag_perm):
def _assert_user_has_dag_perms(perms, dag_id, user=None):
for perm in perms:
assert has_dag_perm(perm, dag_id, user), f"User should have '{perm}' on DAG '{dag_id}'"
return _assert_user_has_dag_perms
@pytest.fixture(scope="module")
def assert_user_does_not_have_dag_perms(has_dag_perm):
def _assert_user_does_not_have_dag_perms(dag_id, perms, user=None):
for perm in perms:
assert not has_dag_perm(perm, dag_id, user), f"User should not have '{perm}' on DAG '{dag_id}'"
return _assert_user_does_not_have_dag_perms
@pytest.mark.parametrize(
"role",
[{"name": "MyRole7", "permissions": [('can_some_other_action', 'AnotherBaseView')], "create": False}],
indirect=True,
)
def test_init_role_baseview(app, security_manager, role):
_, params = role
with pytest.warns(
DeprecationWarning,
match="`init_role` has been deprecated\\. Please use `bulk_sync_roles` instead\\.",
):
security_manager.init_role(params['name'], params['permissions'])
_role = security_manager.find_role(params['name'])
assert _role is not None
assert len(_role.permissions) == len(params['permissions'])
@pytest.mark.parametrize(
"role",
[{"name": "MyRole3", "permissions": [('can_some_action', 'SomeBaseView')]}],
indirect=True,
)
def test_bulk_sync_roles_baseview(app, security_manager, role):
_role, params = role
assert _role is not None
assert len(_role.permissions) == len(params['permissions'])
@pytest.mark.parametrize(
"role",
[
{
"name": "MyRole2",
"permissions": [
('can_list', 'SomeModelView'),
('can_show', 'SomeModelView'),
('can_add', 'SomeModelView'),
(permissions.ACTION_CAN_EDIT, 'SomeModelView'),
(permissions.ACTION_CAN_DELETE, 'SomeModelView'),
],
}
],
indirect=True,
)
def test_bulk_sync_roles_modelview(app, security_manager, role):
_role, params = role
assert role is not None
assert len(_role.permissions) == len(params['permissions'])
# Check short circuit works
with assert_queries_count(2): # One for Permission, one for roles
security_manager.bulk_sync_roles(params['mock_roles'])
@pytest.mark.parametrize(
"role",
[{"name": "Test_Role", "permissions": []}],
indirect=True,
)
def test_update_and_verify_permission_role(app, security_manager, role):
_role, params = role
perm = security_manager.get_permission(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_ROLE)
security_manager.add_permission_to_role(_role, perm)
role_perms_len = len(_role.permissions)
security_manager.bulk_sync_roles(params['mock_roles'])
new_role_perms_len = len(_role.permissions)
assert role_perms_len == new_role_perms_len
assert new_role_perms_len == 1
def test_verify_public_role_has_no_permissions(security_manager):
public = security_manager.find_role("Public")
assert public.permissions == []
def test_verify_default_anon_user_has_no_accessible_dag_ids(app, session, security_manager):
with app.app_context():
user = AnonymousUser()
app.config['AUTH_ROLE_PUBLIC'] = 'Public'
assert security_manager.get_user_roles(user) == {security_manager.get_public_role()}
with _create_dag_model_context("test_dag_id", session, security_manager):
security_manager.sync_roles()
assert security_manager.get_accessible_dag_ids(user) == set()
def test_verify_default_anon_user_has_no_access_to_specific_dag(app, session, security_manager, has_dag_perm):
with app.app_context():
user = AnonymousUser()
app.config['AUTH_ROLE_PUBLIC'] = 'Public'
assert security_manager.get_user_roles(user) == {security_manager.get_public_role()}
dag_id = "test_dag_id"
with _create_dag_model_context(dag_id, session, security_manager):
security_manager.sync_roles()
assert security_manager.can_read_dag(dag_id, user) is False
assert security_manager.can_edit_dag(dag_id, user) is False
assert has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is False
assert has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is False
@pytest.mark.parametrize(
"mock_dag_models",
[["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]],
indirect=True,
)
def test_verify_anon_user_with_admin_role_has_all_dag_access(app, security_manager, mock_dag_models):
test_dag_ids = mock_dag_models
with app.app_context():
app.config['AUTH_ROLE_PUBLIC'] = 'Admin'
user = AnonymousUser()
assert security_manager.get_user_roles(user) == {security_manager.get_public_role()}
security_manager.sync_roles()
assert security_manager.get_accessible_dag_ids(user) == set(test_dag_ids)
def test_verify_anon_user_with_admin_role_has_access_to_each_dag(
app, session, security_manager, has_dag_perm
):
with app.app_context():
user = AnonymousUser()
app.config['AUTH_ROLE_PUBLIC'] = 'Admin'
# Call `.get_user_roles` bc `user` is a mock and the `user.roles` prop needs to be set.
user.roles = security_manager.get_user_roles(user)
assert user.roles == {security_manager.get_public_role()}
test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3", "test_dag_id_4.with_dot"]
for dag_id in test_dag_ids:
with _create_dag_model_context(dag_id, session, security_manager):
security_manager.sync_roles()
assert security_manager.can_read_dag(dag_id, user) is True
assert security_manager.can_edit_dag(dag_id, user) is True
assert has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is True
assert has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is True
def test_get_user_roles(app_builder, security_manager):
user = mock.MagicMock()
user.is_anonymous = False
roles = app_builder.sm.find_role('Admin')
user.roles = roles
assert security_manager.get_user_roles(user) == roles
def test_get_user_roles_for_anonymous_user(app, security_manager):
viewer_role_perms = {
(permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_WARNING),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_JOB),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_PLUGIN),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_SLA_MISS),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_XCOM),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PASSWORD),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PASSWORD),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PROFILE),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PROFILE),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_BROWSE_MENU),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_DEPENDENCIES),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DATASET),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_JOB),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_AUDIT_LOG),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_PLUGIN),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_SLA_MISS),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_TASK_INSTANCE),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS_MENU),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS),
}
app.config['AUTH_ROLE_PUBLIC'] = 'Viewer'
with app.app_context():
user = AnonymousUser()
perms_views = set()
for role in security_manager.get_user_roles(user):
perms_views.update({(perm.action.name, perm.resource.name) for perm in role.permissions})
assert perms_views == viewer_role_perms
def test_get_current_user_permissions(app):
action = 'can_some_action'
resource = 'SomeBaseView'
with app.app_context():
with create_user_scope(
app,
username='get_current_user_permissions',
role_name='MyRole5',
permissions=[
(action, resource),
],
) as user:
assert user.perms == {(action, resource)}
with create_user_scope(
app,
username='no_perms',
) as user:
assert len(user.perms) == 0
def test_get_accessible_dag_ids(app, security_manager, session):
role_name = 'MyRole1'
permission_action = [permissions.ACTION_CAN_READ]
dag_id = 'dag_id'
username = "ElUser"
with create_user_scope(
app,
username=username,
role_name=role_name,
permissions=[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
],
) as user:
dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", schedule_interval="2 2 * * *")
session.add(dag_model)
session.commit()
security_manager.sync_perm_for_dag( # type: ignore
dag_id, access_control={role_name: permission_action}
)
assert security_manager.get_accessible_dag_ids(user) == {'dag_id'}
def test_dont_get_inaccessible_dag_ids_for_dag_resource_permission(app, security_manager, session):
# In this test case,
# get_readable_dag_ids() don't return DAGs to which the user has CAN_EDIT action
username = "Monsieur User"
role_name = "MyRole1"
permission_action = [permissions.ACTION_CAN_EDIT]
dag_id = "dag_id"
with create_user_scope(
app,
username=username,
role_name=role_name,
permissions=[
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
],
) as user:
dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", schedule_interval="2 2 * * *")
session.add(dag_model)
session.commit()
security_manager.sync_perm_for_dag( # type: ignore
dag_id, access_control={role_name: permission_action}
)
assert security_manager.get_readable_dag_ids(user) == set()
def test_has_access(security_manager):
user = mock.MagicMock()
action_name = "action"
resource_name = "resource"
user.perms = [(action_name, resource_name)]
assert security_manager.has_access(action_name, resource_name, user)
def test_sync_perm_for_dag_creates_permissions_on_resources(security_manager):
test_dag_id = 'TEST_DAG'
prefixed_test_dag_id = f'DAG:{test_dag_id}'
security_manager.sync_perm_for_dag(test_dag_id, access_control=None)
assert security_manager.get_permission(permissions.ACTION_CAN_READ, prefixed_test_dag_id) is not None
assert security_manager.get_permission(permissions.ACTION_CAN_EDIT, prefixed_test_dag_id) is not None
def test_has_all_dag_access(app, security_manager):
for role_name in ['Admin', 'Viewer', 'Op', 'User']:
with app.app_context():
with create_user_scope(
app,
username="user",
role_name=role_name,
) as user:
assert security_manager.has_all_dags_access(user)
with app.app_context():
with create_user_scope(
app,
username="user",
role_name="read_all",
permissions=[(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)],
) as user:
assert security_manager.has_all_dags_access(user)
with app.app_context():
with create_user_scope(
app,
username="user",
role_name="edit_all",
permissions=[(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)],
) as user:
assert security_manager.has_all_dags_access(user)
with app.app_context():
with create_user_scope(
app,
username="user",
role_name="nada",
permissions=[],
) as user:
assert not security_manager.has_all_dags_access(user)
def test_access_control_with_non_existent_role(security_manager):
with pytest.raises(AirflowException) as ctx:
security_manager._sync_dag_view_permissions(
dag_id='access-control-test',
access_control={
'this-role-does-not-exist': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
},
)
assert "role does not exist" in str(ctx.value)
def test_all_dag_access_doesnt_give_non_dag_access(app, security_manager):
username = 'dag_access_user'
role_name = 'dag_access_role'
with app.app_context():
with create_user_scope(
app,
username=username,
role_name=role_name,
permissions=[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
],
) as user:
assert security_manager.has_access(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG, user)
assert not security_manager.has_access(
permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE, user
)
def test_access_control_with_invalid_permission(app, security_manager):
invalid_actions = [
'can_varimport', # a real action, but not a member of DAG_ACTIONS
'can_eat_pudding', # clearly not a real action
]
username = "LaUser"
rolename = "team-a"
with create_user_scope(
app,
username=username,
role_name=rolename,
):
for action in invalid_actions:
with pytest.raises(AirflowException) as ctx:
security_manager._sync_dag_view_permissions(
'access_control_test',
access_control={rolename: {action}},
)
assert "invalid permissions" in str(ctx.value)
def test_access_control_is_set_on_init(
app,
security_manager,
assert_user_has_dag_perms,
assert_user_does_not_have_dag_perms,
):
username = 'access_control_is_set_on_init'
role_name = 'team-a'
negated_role = 'NOT-team-a'
with app.app_context():
with create_user_scope(
app,
username=username,
role_name=role_name,
permissions=[],
) as user:
security_manager._sync_dag_view_permissions(
'access_control_test',
access_control={role_name: [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]},
)
assert_user_has_dag_perms(
perms=[permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ],
dag_id='access_control_test',
user=user,
)
security_manager.bulk_sync_roles([{'role': negated_role, 'perms': []}])
set_user_single_role(app, user, role_name=negated_role)
assert_user_does_not_have_dag_perms(
perms=[permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ],
dag_id='access_control_test',
user=user,
)
def test_access_control_stale_perms_are_revoked(
app,
security_manager,
assert_user_has_dag_perms,
assert_user_does_not_have_dag_perms,
):
username = 'access_control_stale_perms_are_revoked'
role_name = 'team-a'
with app.app_context():
with create_user_scope(
app,
username=username,
role_name=role_name,
permissions=[],
) as user:
set_user_single_role(app, user, role_name='team-a')
security_manager._sync_dag_view_permissions(
'access_control_test', access_control={'team-a': READ_WRITE}
)
assert_user_has_dag_perms(perms=READ_WRITE, dag_id='access_control_test', user=user)
security_manager._sync_dag_view_permissions(
'access_control_test', access_control={'team-a': READ_ONLY}
)
# Clear the cache, to make it pick up new rol perms
user._perms = None
assert_user_has_dag_perms(
perms=[permissions.ACTION_CAN_READ], dag_id='access_control_test', user=user
)
assert_user_does_not_have_dag_perms(
perms=[permissions.ACTION_CAN_EDIT], dag_id='access_control_test', user=user
)
def test_no_additional_dag_permission_views_created(db, security_manager):
ab_perm_role = assoc_permission_role
security_manager.sync_roles()
num_pv_before = db.session().query(ab_perm_role).count()
security_manager.sync_roles()
num_pv_after = db.session().query(ab_perm_role).count()
assert num_pv_before == num_pv_after
def test_override_role_vm(app_builder):
test_security_manager = MockSecurityManager(appbuilder=app_builder)
assert len(test_security_manager.VIEWER_VMS) == 1
assert test_security_manager.VIEWER_VMS == {'Airflow'}
def test_correct_roles_have_perms_to_read_config(security_manager):
roles_to_check = security_manager.get_all_roles()
assert len(roles_to_check) >= 5
for role in roles_to_check:
if role.name in ["Admin", "Op"]:
assert security_manager.permission_exists_in_one_or_more_roles(
permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id]
)
else:
assert not security_manager.permission_exists_in_one_or_more_roles(
permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id]
), (
f"{role.name} should not have {permissions.ACTION_CAN_READ} "
f"on {permissions.RESOURCE_CONFIG}"
)
def test_create_dag_specific_permissions(session, security_manager, monkeypatch, sample_dags):
access_control = {'Public': {permissions.ACTION_CAN_READ}}
collect_dags_from_db_mock = mock.Mock()
dagbag_mock = mock.Mock()
dagbag_mock.dags = {dag.dag_id: dag for dag in sample_dags}
dagbag_mock.collect_dags_from_db = collect_dags_from_db_mock
dagbag_class_mock = mock.Mock()
dagbag_class_mock.return_value = dagbag_mock
import airflow.www.security
monkeypatch.setitem(airflow.www.security.__dict__, "DagBag", dagbag_class_mock)
security_manager._sync_dag_view_permissions = mock.Mock()
for dag in sample_dags:
dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
all_perms = security_manager.get_all_permissions()
assert ('can_read', dag_resource_name) not in all_perms
assert ('can_edit', dag_resource_name) not in all_perms
security_manager.create_dag_specific_permissions()
dagbag_class_mock.assert_called_once_with(read_dags_from_db=True)
collect_dags_from_db_mock.assert_called_once_with()
for dag in sample_dags:
dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
all_perms = security_manager.get_all_permissions()
assert ('can_read', dag_resource_name) in all_perms
assert ('can_edit', dag_resource_name) in all_perms
security_manager._sync_dag_view_permissions.assert_called_once_with(
permissions.resource_name_for_dag('has_access_control'),
access_control,
)
del dagbag_mock.dags["has_access_control"]
with assert_queries_count(2): # two query to get all perms; dagbag is mocked
# The extra query happens at permission check
security_manager.create_dag_specific_permissions()
def test_get_all_permissions(security_manager):
with assert_queries_count(1):
perms = security_manager.get_all_permissions()
assert isinstance(perms, set)
for perm in perms:
assert len(perm) == 2
assert ('can_read', 'Connections') in perms
def test_get_all_non_dag_permissions(security_manager):
with assert_queries_count(1):
pvs = security_manager._get_all_non_dag_permissions()
assert isinstance(pvs, dict)
for (perm_name, viewmodel_name), perm in pvs.items():
assert isinstance(perm_name, str)
assert isinstance(viewmodel_name, str)
assert isinstance(perm, security_manager.permission_model)
assert ('can_read', 'Connections') in pvs
def test_get_all_roles_with_permissions(security_manager):
with assert_queries_count(1):
roles = security_manager._get_all_roles_with_permissions()
assert isinstance(roles, dict)
for role_name, role in roles.items():
assert isinstance(role_name, str)
assert isinstance(role, security_manager.role_model)
assert 'Admin' in roles
def test_prefixed_dag_id_is_deprecated(security_manager):
with pytest.warns(
DeprecationWarning,
match=(
"`prefixed_dag_id` has been deprecated. "
"Please use `airflow.security.permissions.resource_name_for_dag` instead."
),
):
security_manager.prefixed_dag_id("hello")
def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms, session):
username = 'dag_permission_user'
role_name = 'dag_permission_role'
parent_dag_name = "parent_dag"
subdag_name = parent_dag_name + ".subdag"
subsubdag_name = parent_dag_name + ".subdag.subsubdag"
with app.app_context():
mock_roles = [
{
'role': role_name,
'perms': [
(permissions.ACTION_CAN_READ, f"DAG:{parent_dag_name}"),
(permissions.ACTION_CAN_EDIT, f"DAG:{parent_dag_name}"),
],
}
]
with create_user_scope(
app,
username=username,
role_name=role_name,
) as user:
dag1 = DagModel(dag_id=parent_dag_name)
dag2 = DagModel(dag_id=subdag_name, is_subdag=True, root_dag_id=parent_dag_name)
dag3 = DagModel(dag_id=subsubdag_name, is_subdag=True, root_dag_id=parent_dag_name)
session.add_all([dag1, dag2, dag3])
session.commit()
security_manager.bulk_sync_roles(mock_roles)
for dag in [dag1, dag2, dag3]:
security_manager._sync_dag_view_permissions(
parent_dag_name, access_control={role_name: READ_WRITE}
)
assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user)
assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user)
assert_user_has_dag_perms(
perms=READ_WRITE, dag_id=parent_dag_name + ".subdag.subsubdag", user=user
)
session.query(DagModel).delete()
def test_permissions_work_for_dags_with_dot_in_dagname(
app, security_manager, assert_user_has_dag_perms, assert_user_does_not_have_dag_perms, session
):
username = 'dag_permission_user'
role_name = 'dag_permission_role'
dag_id = "dag_id_1"
dag_id_2 = "dag_id_1.with_dot"
with app.app_context():
mock_roles = [
{
'role': role_name,
'perms': [
(permissions.ACTION_CAN_READ, f"DAG:{dag_id}"),
(permissions.ACTION_CAN_EDIT, f"DAG:{dag_id}"),
],
}
]
with create_user_scope(
app,
username=username,
role_name=role_name,
) as user:
dag1 = DagModel(dag_id=dag_id)
dag2 = DagModel(dag_id=dag_id_2)
session.add_all([dag1, dag2])
session.commit()
security_manager.bulk_sync_roles(mock_roles)
security_manager.sync_perm_for_dag(dag1.dag_id, access_control={role_name: READ_WRITE})
security_manager.sync_perm_for_dag(dag2.dag_id, access_control={role_name: READ_WRITE})
assert_user_has_dag_perms(perms=READ_WRITE, dag_id=dag_id, user=user)
assert_user_does_not_have_dag_perms(perms=READ_WRITE, dag_id=dag_id_2, user=user)
session.query(DagModel).delete()
def test_fab_models_use_airflow_base_meta():
# TODO: move this test to appropriate place when we have more tests for FAB models
user = User()
assert user.metadata is Base.metadata
@pytest.fixture()
def mock_security_manager(app_builder):
mocked_security_manager = MockSecurityManager(appbuilder=app_builder)
mocked_security_manager.update_user = mock.MagicMock()
return mocked_security_manager
@pytest.fixture()
def new_user():
user = mock.MagicMock()
user.login_count = None
user.fail_login_count = None
user.last_login = None
return user
@pytest.fixture()
def old_user():
user = mock.MagicMock()
user.login_count = 42
user.fail_login_count = 9
user.last_login = datetime.datetime(1984, 12, 1, 0, 0, 0)
return user
@freeze_time(datetime.datetime(1985, 11, 5, 1, 24, 0)) # Get the Delorean, doc!
def test_update_user_auth_stat_first_successful_auth(mock_security_manager, new_user):
mock_security_manager.update_user_auth_stat(new_user, success=True)
assert new_user.login_count == 1
assert new_user.fail_login_count == 0
assert new_user.last_login == datetime.datetime(1985, 11, 5, 1, 24, 0)
assert mock_security_manager.update_user.called_once
@freeze_time(datetime.datetime(1985, 11, 5, 1, 24, 0))
def test_update_user_auth_stat_subsequent_successful_auth(mock_security_manager, old_user):
mock_security_manager.update_user_auth_stat(old_user, success=True)
assert old_user.login_count == 43
assert old_user.fail_login_count == 0
assert old_user.last_login == datetime.datetime(1985, 11, 5, 1, 24, 0)
assert mock_security_manager.update_user.called_once
@freeze_time(datetime.datetime(1985, 11, 5, 1, 24, 0))
def test_update_user_auth_stat_first_unsuccessful_auth(mock_security_manager, new_user):
mock_security_manager.update_user_auth_stat(new_user, success=False)
assert new_user.login_count == 0
assert new_user.fail_login_count == 1
assert new_user.last_login is None
assert mock_security_manager.update_user.called_once
@freeze_time(datetime.datetime(1985, 11, 5, 1, 24, 0))
def test_update_user_auth_stat_subsequent_unsuccessful_auth(mock_security_manager, old_user):
mock_security_manager.update_user_auth_stat(old_user, success=False)
assert old_user.login_count == 42
assert old_user.fail_login_count == 10
assert old_user.last_login == datetime.datetime(1984, 12, 1, 0, 0, 0)
assert mock_security_manager.update_user.called_once