blob: 4c6ae089e7e092063e9d419359af12da34e6bd0a [file] [log] [blame]
# -*- coding: UTF-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Permission components for Bloodhound product environments"""
from functools import wraps
from trac.core import Component, implements
from trac.perm import IPermissionPolicy, PermissionSystem, PermissionError
__all__ = 'ProductPermissionPolicy',
# Permission components
class MultiproductPermissionPolicy(Component):
"""Apply product policy in product environments to deal with TRAC_ADMIN,
PRODUCT_ADMIN and alike.
# IPermissionPolicy methods
def check_permission(self, action, username, resource, perm):
# FIXME: Better handling of recursive imports
from multiproduct.env import ProductEnvironment
if isinstance(self.env, ProductEnvironment):
permsys = PermissionSystem(self.env.parent)
if permsys.check_permission('TRAC_ADMIN', username):
return action in PermissionSystem(self.env).get_actions() \
or None # FIXME: maybe False is better
elif username == self.env.product.owner:
# Product owner granted with PRODUCT_ADMIN permission ootb
permsys = PermissionSystem(self.env)
# FIXME: would `action != 'TRAC_ADMIN'` be enough ?
return True if action in permsys.get_actions() and \
action != 'TRAC_ADMIN' \
else None
# Impersonation helpers
class SudoPermissionContext(object):
"""Allows a permitted user (by default `PRODUCT_ADMIN`) to execute
a command as if temporarily granted with `TRAC_ADMIN` or other specific
permission. There is also support to revoke some actions unconditionally.
These objects will act as context managers wrapping the permissions cache
of the target request object. Entering the same context more than once
is not supported and will result in unexpected behavior.
def __init__(self, req, require=None, grant=None, revoke=None):
grant = frozenset(grant if grant is not None else ('TRAC_ADMIN',))
revoke = frozenset(revoke or [])
if grant & revoke:
raise ValueError('Impossible to grant and revoke (%s)' %
', '.join(sorted(grant & revoke)))
self.grant = grant
self.revoke = revoke
if req:
self._expanded = False
self._perm = None
self.req = req
self.require_actions = frozenset(('PRODUCT_ADMIN',) if require is None
else ([require]
if isinstance(require, basestring)
else require))
def perm(self):
return self._perm
def perm(self, perm):
if perm and not self._expanded:
self._perm = perm
def __getattr__(self, attrnm):
# Actually PermissionCache.__slots__ but this will be faster
if attrnm in ('env', 'username', '_resource', '_cache'):
return getattr(self.perm, attrnm)
except AttributeError:
raise AttributeError("'%s' object has no attribute '%s'" %
(self.__class__.__name__, attrnm))
def __enter__(self):
if self.req is None:
# e.g. instances returned by __call__
raise ValueError('Context manager not bound to request object')
req = self.req
for action in self.require_actions:
self.perm = req.perm
req.perm = self
return self
def __exit__(self, exc_type, exc_value, tb):
self.req.perm = self.perm
self.perm = None
# Internal methods
def is_active(self):
"""Determine whether this context is active
return self.req and self.perm
def _expand_perms(self, env):
permsys = PermissionSystem(env)
grant = frozenset(permsys.expand_actions(self.grant))
revoke = frozenset(permsys.expand_actions(self.revoke))
# Double check ambiguous action lists
if grant & revoke:
raise ValueError('Impossible to grant and revoke (%s)' %
', '.join(sorted(grant & revoke)))
self.grant = grant
self.revoke = revoke
self._expanded = True
def __assert_require(f):
def __require(self, *args, **kwargs):
# FIXME : No check ? Transform into assert statement ?
if not self.perm:
raise RuntimeError('Permission check out of context')
if not self.is_active:
for action in self.require_actions:
return f(self, *args, **kwargs)
return __require
# PermissionCache methods
def __call__(self, realm_or_resource, id=False, version=False):
newperm = self.perm(realm_or_resource, id, version)
if newperm is self.perm:
return self
newctx = SudoPermissionContext(None, self.require_actions, self.grant,
newctx.perm = newperm
return newctx
def has_permission(self, action, realm_or_resource=None, id=False,
return action in self.grant or \
(action not in self.revoke and
self.perm.has_permission(action, realm_or_resource, id,
__contains__ = has_permission
def require(self, action, realm_or_resource=None, id=False, version=False):
if action in self.grant:
if action in self.revoke:
resource = self._normalize_resource(realm_or_resource, id, version)
raise PermissionError(action, resource, self.perm.env)
self.perm.require(action, realm_or_resource, id, version)
assert_permission = require
def permissions(self):
"""Deprecated (but still used by the HDF compatibility layer)
self.perm.env.log.warning("perm.permissions() is deprecated and "
"is only present for HDF compatibility")
permsys = PermissionSystem(self.perm.env)
actions = permsys.get_user_permissions(self.perm.username)
return [action for action in actions if action in self]
del __assert_require
sudo = SudoPermissionContext