blob: 90a2d0ba98fc5789e7b0237bb6fc699dc9ca9fab [file] [log] [blame]
from functools import wraps
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Permission components for Bloodhound product environments"""
__all__ = 'ProductPermissionPolicy',
from trac.core import Component, implements
from trac.perm import IPermissionPolicy, PermissionSystem, PermissionError
#--------------------------
# Permission components
#--------------------------
class MultiproductPermissionPolicy(Component):
"""Apply product policy in product environments to deal with TRAC_ADMIN,
PRODUCT_ADMIN and alike.
"""
implements(IPermissionPolicy)
# IPermissionPolicy methods
def check_permission(self, action, username, resource, perm):
# FIXME: Better handling of recursive imports
from multiproduct.env import ProductEnvironment
if isinstance(self.env, ProductEnvironment):
permsys = PermissionSystem(self.env.parent)
if permsys.check_permission('TRAC_ADMIN', username):
return action in PermissionSystem(self.env).get_actions() \
or None # FIXME: maybe False is better
elif username == self.env.product.owner:
# Product owner granted with PRODUCT_ADMIN permission ootb
permsys = PermissionSystem(self.env)
# FIXME: would `action != 'TRAC_ADMIN'` be enough ?
return True if action in permsys.get_actions() and \
action != 'TRAC_ADMIN' \
else None
#--------------------------
# Impersonation helpers
#--------------------------
class SudoPermissionContext(object):
"""Allows a permitted user (by default `PRODUCT_ADMIN`) to execute
a command as if temporarily granted with `TRAC_ADMIN` or other specific
permission. There is also support to revoke some actions unconditionally.
These objects will act as context managers wrapping the permissions cache
of the target request object. Entering the same context more than once
is not supported and will result in unexpected behavior.
"""
def __init__(self, req, require=None, grant=None, revoke=None):
grant = frozenset(grant if grant is not None else ('TRAC_ADMIN',))
revoke = frozenset(revoke or [])
if grant & revoke:
raise ValueError('Impossible to grant and revoke (%s)' %
', '.join(sorted(grant & revoke)))
self.grant = grant
self.revoke = revoke
if req:
self._expand_perms(req.perm.env)
else:
self._expanded = False
self._perm = None
self.req = req
self.require_actions = frozenset(('PRODUCT_ADMIN',) if require is None
else ([require]
if isinstance(require, basestring)
else require))
@property
def perm(self):
return self._perm
@perm.setter
def perm(self, perm):
if perm and not self._expanded:
self._expand_perms(perm.env)
self._perm = perm
def __getattr__(self, attrnm):
# Actually PermissionCache.__slots__ but this will be faster
if attrnm in ('env', 'username', '_resource', '_cache'):
try:
return getattr(self.perm, attrnm)
except AttributeError:
pass
raise AttributeError("'%s' object has no attribute '%s'" %
(self.__class__.__name__, attrnm))
def __enter__(self):
if self.req is None:
# e.g. instances returned by __call__
raise ValueError('Context manager not bound to request object')
req = self.req
for action in self.require_actions:
req.perm.require(action)
self.perm = req.perm
req.perm = self
return self
def __exit__(self, exc_type, exc_value, tb):
self.req.perm = self.perm
self.perm = None
# Internal methods
@property
def is_active(self):
"""Determine whether this context is active
"""
return self.req and self.perm
def _expand_perms(self, env):
permsys = PermissionSystem(env)
grant = frozenset(permsys.expand_actions(self.grant))
revoke = frozenset(permsys.expand_actions(self.revoke))
# Double check ambiguous action lists
if grant & revoke:
raise ValueError('Impossible to grant and revoke (%s)' %
', '.join(sorted(grant & revoke)))
self.grant = grant
self.revoke = revoke
self._expanded = True
def __assert_require(f):
@wraps(f)
def __require(self, *args, **kwargs):
# FIXME : No check ? Transform into assert statement ?
if not self.perm:
raise RuntimeError('Permission check out of context')
if not self.is_active:
for action in self.require_actions:
self.perm.require(action)
return f(self, *args, **kwargs)
return __require
# PermissionCache methods
@__assert_require
def __call__(self, realm_or_resource, id=False, version=False):
newperm = self.perm(realm_or_resource, id, version)
if newperm is self.perm:
return self
else:
newctx = SudoPermissionContext(None, self.require_actions, self.grant,
self.revoke)
newctx.perm = newperm
return newctx
@__assert_require
def has_permission(self, action, realm_or_resource=None, id=False,
version=False):
return action in self.grant or \
(action not in self.revoke and
self.perm.has_permission(action, realm_or_resource, id,
version))
__contains__ = has_permission
@__assert_require
def require(self, action, realm_or_resource=None, id=False, version=False):
if action in self.grant:
return
if action in self.revoke:
resource = self._normalize_resource(realm_or_resource, id, version)
raise PermissionError(action, resource, self.perm.env)
self.perm.require(action, realm_or_resource, id, version)
assert_permission = require
@__assert_require
def permissions(self):
"""Deprecated (but still used by the HDF compatibility layer)
"""
self.perm.env.log.warning("perm.permissions() is deprecated and "
"is only present for HDF compatibility")
permsys = PermissionSystem(self.perm.env)
actions = permsys.get_user_permissions(self.perm.username)
return [action for action in actions if action in self]
del __assert_require
sudo = SudoPermissionContext