blob: 86d3501b3f55399561ae02e0aa73c06cc6ed4b56 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from flask import g
from flask_appbuilder.security.sqla import models as sqla_models
from flask_appbuilder.security.sqla.manager import SecurityManager
from sqlalchemy import or_, and_
from airflow import models
from airflow.exceptions import AirflowException
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.www_rbac.app import appbuilder
from airflow.www_rbac.utils import CustomSQLAInterface
EXISTING_ROLES = {
'Admin',
'Viewer',
'User',
'Op',
'Public',
}
class AirflowSecurityManager(SecurityManager, LoggingMixin):
###########################################################################
# VIEW MENUS
###########################################################################
VIEWER_VMS = {
'Airflow',
'DagModelView',
'Browse',
'DAG Runs',
'DagRunModelView',
'Task Instances',
'TaskInstanceModelView',
'SLA Misses',
'SlaMissModelView',
'Jobs',
'JobModelView',
'Logs',
'LogModelView',
'Docs',
'Documentation',
'Github',
'About',
'Version',
'VersionView',
}
USER_VMS = VIEWER_VMS
OP_VMS = {
'Admin',
'Configurations',
'ConfigurationView',
'Connections',
'ConnectionModelView',
'Pools',
'PoolModelView',
'Variables',
'VariableModelView',
'XComs',
'XComModelView',
}
###########################################################################
# PERMISSIONS
###########################################################################
VIEWER_PERMS = {
'menu_access',
'can_index',
'can_list',
'can_show',
'can_chart',
'can_dag_stats',
'can_dag_details',
'can_task_stats',
'can_code',
'can_log',
'can_get_logs_with_metadata',
'can_tries',
'can_graph',
'can_tree',
'can_task',
'can_task_instances',
'can_xcom',
'can_gantt',
'can_landing_times',
'can_last_dagruns',
'can_duration',
'can_blocked',
'can_rendered',
'can_pickle_info',
'can_version',
}
USER_PERMS = {
'can_dagrun_clear',
'can_run',
'can_trigger',
'can_add',
'can_edit',
'can_delete',
'can_failed',
'can_paused',
'can_refresh',
'can_success',
'muldelete',
'set_failed',
'set_running',
'set_success',
'clear',
'can_clear',
}
OP_PERMS = {
'can_conf',
'can_varimport',
}
# global view-menu for dag-level access
DAG_VMS = {
'all_dags'
}
WRITE_DAG_PERMS = {
'can_dag_edit',
}
READ_DAG_PERMS = {
'can_dag_read',
}
DAG_PERMS = WRITE_DAG_PERMS | READ_DAG_PERMS
###########################################################################
# DEFAULT ROLE CONFIGURATIONS
###########################################################################
ROLE_CONFIGS = [
{
'role': 'Viewer',
'perms': VIEWER_PERMS | READ_DAG_PERMS,
'vms': VIEWER_VMS | DAG_VMS
},
{
'role': 'User',
'perms': VIEWER_PERMS | USER_PERMS | DAG_PERMS,
'vms': VIEWER_VMS | DAG_VMS | USER_VMS,
},
{
'role': 'Op',
'perms': VIEWER_PERMS | USER_PERMS | OP_PERMS | DAG_PERMS,
'vms': VIEWER_VMS | DAG_VMS | USER_VMS | OP_VMS,
},
]
def __init__(self, appbuilder):
super(AirflowSecurityManager, self).__init__(appbuilder)
# Go and fix up the SQLAInterface used from the stock one to our subclass.
# This is needed to support the "hack" where we had to edit
# FieldConverter.conversion_table in place in airflow.www.utils
for attr in dir(self):
if not attr.endswith('view'):
continue
view = getattr(self, attr, None)
if not view or not getattr(view, 'datamodel', None):
continue
view.datamodel = CustomSQLAInterface(view.datamodel.obj)
def init_role(self, role_name, role_vms, role_perms):
"""
Initialize the role with the permissions and related view-menus.
:param role_name:
:param role_vms:
:param role_perms:
:return:
"""
pvms = self.get_session.query(sqla_models.PermissionView).all()
pvms = [p for p in pvms if p.permission and p.view_menu]
role = self.find_role(role_name)
if not role:
role = self.add_role(role_name)
if len(role.permissions) == 0:
self.log.info('Initializing permissions for role:%s in the database.', role_name)
role_pvms = set()
for pvm in pvms:
if pvm.view_menu.name in role_vms and pvm.permission.name in role_perms:
role_pvms.add(pvm)
role.permissions = list(role_pvms)
self.get_session.merge(role)
self.get_session.commit()
else:
self.log.debug('Existing permissions for the role:%s '
'within the database will persist.', role_name)
def get_user_roles(self, user=None):
"""
Get all the roles associated with the user.
:param user: the ab_user in FAB model.
:return: a list of roles associated with the user.
"""
if user is None:
user = g.user
if user.is_anonymous:
public_role = appbuilder.config.get('AUTH_ROLE_PUBLIC')
return [appbuilder.security_manager.find_role(public_role)] \
if public_role else []
return user.roles
def get_all_permissions_views(self):
"""
Returns a set of tuples with the perm name and view menu name
"""
perms_views = set()
for role in self.get_user_roles():
perms_views.update({(perm_view.permission.name, perm_view.view_menu.name)
for perm_view in role.permissions})
return perms_views
def get_accessible_dag_ids(self, username=None):
"""
Return a set of dags that user has access to(either read or write).
:param username: Name of the user.
:return: A set of dag ids that the user could access.
"""
if not username:
username = g.user
if username.is_anonymous or 'Public' in username.roles:
# return an empty set if the role is public
return set()
roles = {role.name for role in username.roles}
if {'Admin', 'Viewer', 'User', 'Op'} & roles:
return self.DAG_VMS
user_perms_views = self.get_all_permissions_views()
# return a set of all dags that the user could access
return set([view for perm, view in user_perms_views if perm in self.DAG_PERMS])
def has_access(self, permission, view_name, user=None):
"""
Verify whether a given user could perform certain permission
(e.g can_read, can_write) on the given dag_id.
:param permission: permission on dag_id(e.g can_read, can_edit).
:type permission: str
:param view_name: name of view-menu(e.g dag id is a view-menu as well).
:type permission: str
:param user: user name
:type permission: str
:return: a bool whether user could perform certain permission on the dag_id.
:rtype bool
"""
if not user:
user = g.user
if user.is_anonymous:
return self.is_item_public(permission, view_name)
return self._has_view_access(user, permission, view_name)
def _get_and_cache_perms(self):
"""
Cache permissions-views
"""
self.perms = self.get_all_permissions_views()
def _has_role(self, role_name_or_list):
"""
Whether the user has this role name
"""
if not isinstance(role_name_or_list, list):
role_name_or_list = [role_name_or_list]
return any(
[r.name in role_name_or_list for r in self.get_user_roles()])
def _has_perm(self, permission_name, view_menu_name):
"""
Whether the user has this perm
"""
if hasattr(self, 'perms'):
if (permission_name, view_menu_name) in self.perms:
return True
# rebuild the permissions set
self._get_and_cache_perms()
return (permission_name, view_menu_name) in self.perms
def has_all_dags_access(self):
"""
Has all the dag access in any of the 3 cases:
1. Role needs to be in (Admin, Viewer, User, Op).
2. Has can_dag_read permission on all_dags view.
3. Has can_dag_edit permission on all_dags view.
"""
return (
self._has_role(['Admin', 'Viewer', 'Op', 'User']) or
self._has_perm('can_dag_read', 'all_dags') or
self._has_perm('can_dag_edit', 'all_dags'))
def clean_perms(self):
"""
FAB leaves faulty permissions that need to be cleaned up
"""
self.log.info('Cleaning faulty perms')
sesh = self.get_session
pvms = (
sesh.query(sqla_models.PermissionView)
.filter(or_(
sqla_models.PermissionView.permission == None, # NOQA
sqla_models.PermissionView.view_menu == None, # NOQA
))
)
# Since FAB doesn't define ON DELETE CASCADE on these tables, we need
# to delete the _object_ so that SQLA knows to delete the many-to-many
# relationship object too. :(
deleted_count = 0
for pvm in pvms:
sesh.delete(pvm)
deleted_count += 1
sesh.commit()
if deleted_count:
self.log.info('Deleted %s faulty permissions', deleted_count)
def _merge_perm(self, permission_name, view_menu_name):
"""
Add the new permission , view_menu to ab_permission_view_role if not exists.
It will add the related entry to ab_permission
and ab_view_menu two meta tables as well.
:param permission_name: Name of the permission.
:type permission_name: str
:param view_menu_name: Name of the view-menu
:type view_menu_name: str
:return:
"""
permission = self.find_permission(permission_name)
view_menu = self.find_view_menu(view_menu_name)
pv = None
if permission and view_menu:
pv = self.get_session.query(self.permissionview_model).filter_by(
permission=permission, view_menu=view_menu).first()
if not pv and permission_name and view_menu_name:
self.add_permission_view_menu(permission_name, view_menu_name)
@provide_session
def create_custom_dag_permission_view(self, session=None):
"""
Workflow:
1. Fetch all the existing (permissions, view-menu) from Airflow DB.
2. Fetch all the existing dag models that are either active or paused.
3. Create both read and write permission view-menus relation for every dags from step 2
4. Find out all the dag specific roles(excluded pubic, admin, viewer, op, user)
5. Get all the permission-vm owned by the user role.
6. Grant all the user role's permission-vm except the all-dag view-menus to the dag roles.
7. Commit the updated permission-vm-role into db
:return: None.
"""
# todo(Tao): should we put this function here or in scheduler loop?
self.log.info('Fetching a set of all permission, view_menu from FAB meta-table')
def merge_pv(perm, view_menu):
"""Create permission view menu only if it doesn't exist"""
if view_menu and perm and (view_menu, perm) not in all_pvs:
self._merge_perm(perm, view_menu)
all_pvs = set()
for pv in self.get_session.query(self.permissionview_model).all():
if pv.permission and pv.view_menu:
all_pvs.add((pv.permission.name, pv.view_menu.name))
# Get all the active / paused dags and insert them into a set
all_dags_models = session.query(models.DagModel)\
.filter(or_(models.DagModel.is_active, models.DagModel.is_paused)).all()
# create can_dag_edit and can_dag_read permissions for every dag(vm)
for dag in all_dags_models:
for perm in self.DAG_PERMS:
merge_pv(perm, dag.dag_id)
# for all the dag-level role, add the permission of viewer
# with the dag view to ab_permission_view
all_roles = self.get_all_roles()
user_role = self.find_role('User')
dag_role = [role for role in all_roles if role.name not in EXISTING_ROLES]
update_perm_views = []
# need to remove all_dag vm from all the existing view-menus
dag_vm = self.find_view_menu('all_dags')
ab_perm_view_role = sqla_models.assoc_permissionview_role
perm_view = self.permissionview_model
view_menu = self.viewmenu_model
all_perm_view_by_user = session.query(ab_perm_view_role)\
.join(perm_view, perm_view.id == ab_perm_view_role
.columns.permission_view_id)\
.filter(ab_perm_view_role.columns.role_id == user_role.id)\
.join(view_menu)\
.filter(perm_view.view_menu_id != dag_vm.id)
all_perm_views = set([role.permission_view_id for role in all_perm_view_by_user])
for role in dag_role:
# Get all the perm-view of the role
existing_perm_view_by_user = self.get_session.query(ab_perm_view_role)\
.filter(ab_perm_view_role.columns.role_id == role.id)
existing_perms_views = set([pv.permission_view_id
for pv in existing_perm_view_by_user])
missing_perm_views = all_perm_views - existing_perms_views
for perm_view_id in missing_perm_views:
update_perm_views.append({'permission_view_id': perm_view_id,
'role_id': role.id})
if update_perm_views:
self.get_session.execute(ab_perm_view_role.insert(), update_perm_views)
self.get_session.commit()
def update_admin_perm_view(self):
"""
Admin should has all the permission-views, except the dag views.
because Admin have already have all_dags permission.
Add the missing ones to the table for admin.
:return: None.
"""
all_dag_view = self.find_view_menu('all_dags')
dag_perm_ids = [self.find_permission('can_dag_edit').id, self.find_permission('can_dag_read').id]
pvms = self.get_session.query(sqla_models.PermissionView).filter(~and_(
sqla_models.PermissionView.permission_id.in_(dag_perm_ids),
sqla_models.PermissionView.view_menu_id != all_dag_view.id)
).all()
pvms = [p for p in pvms if p.permission and p.view_menu]
admin = self.find_role('Admin')
admin.permissions = list(set(admin.permissions) | set(pvms))
self.get_session.commit()
def sync_roles(self):
"""
1. Init the default role(Admin, Viewer, User, Op, public)
with related permissions.
2. Init the custom role(dag-user) with related permissions.
:return: None.
"""
self.log.info('Start syncing user roles.')
# Create global all-dag VM
self.create_perm_vm_for_all_dag()
# Create default user role.
for config in self.ROLE_CONFIGS:
role = config['role']
vms = config['vms']
perms = config['perms']
self.init_role(role, vms, perms)
self.create_custom_dag_permission_view()
# init existing roles, the rest role could be created through UI.
self.update_admin_perm_view()
self.clean_perms()
def sync_perm_for_dag(self, dag_id, access_control=None):
"""
Sync permissions for given dag id. The dag id surely exists in our dag bag
as only / refresh button or cli.sync_perm will call this function
:param dag_id: the ID of the DAG whose permissions should be updated
:type dag_id: str
:param access_control: a dict where each key is a rolename and
each value is a set() of permission names (e.g.,
{'can_dag_read'}
:type access_control: dict
:return:
"""
for dag_perm in self.DAG_PERMS:
perm_on_dag = self.find_permission_view_menu(dag_perm, dag_id)
if perm_on_dag is None:
self.add_permission_view_menu(dag_perm, dag_id)
if access_control:
self._sync_dag_view_permissions(dag_id, access_control)
def _sync_dag_view_permissions(self, dag_id, access_control):
"""Set the access policy on the given DAG's ViewModel.
:param dag_id: the ID of the DAG whose permissions should be updated
:type dag_id: str
:param access_control: a dict where each key is a rolename and
each value is a set() of permission names (e.g.,
{'can_dag_read'}
:type access_control: dict
"""
def _get_or_create_dag_permission(perm_name):
dag_perm = self.find_permission_view_menu(perm_name, dag_id)
if not dag_perm:
self.log.info("Creating new permission '{}' on view '{}'".format(
perm_name,
dag_id
))
dag_perm = self.add_permission_view_menu(perm_name, dag_id)
return dag_perm
def _revoke_stale_permissions(dag_view):
existing_dag_perms = self.find_permissions_view_menu(dag_view)
for perm in existing_dag_perms:
non_admin_roles = [role for role in perm.role
if role.name != 'Admin']
for role in non_admin_roles:
target_perms_for_role = access_control.get(role.name, {})
if perm.permission.name not in target_perms_for_role:
self.log.info("Revoking '{}' on DAG '{}' for role '{}'".format(
perm.permission,
dag_id,
role.name
))
self.del_permission_role(role, perm)
dag_view = self.find_view_menu(dag_id)
if dag_view:
_revoke_stale_permissions(dag_view)
for rolename, perms in access_control.items():
role = self.find_role(rolename)
if not role:
raise AirflowException(
"The access_control mapping for DAG '{}' includes a role "
"named '{}', but that role does not exist".format(
dag_id,
rolename))
perms = set(perms)
invalid_perms = perms - self.DAG_PERMS
if invalid_perms:
raise AirflowException(
"The access_control map for DAG '{}' includes the following "
"invalid permissions: {}; The set of valid permissions "
"is: {}".format(dag_id,
(perms - self.DAG_PERMS),
self.DAG_PERMS))
for perm_name in perms:
dag_perm = _get_or_create_dag_permission(perm_name)
self.add_permission_role(role, dag_perm)
def create_perm_vm_for_all_dag(self):
"""
Create perm-vm if not exist and insert into FAB security model for all-dags.
"""
# create perm for global logical dag
for dag_vm in self.DAG_VMS:
for perm in self.DAG_PERMS:
self._merge_perm(permission_name=perm,
view_menu_name=dag_vm)