blob: 9c0fc426235f412912c8a50d6a5c4bd86052a51f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Admin panels for product management"""
from trac.admin.api import IAdminCommandProvider, AdminCommandError,\
AdminCommandManager
from trac.admin.console import TracAdmin, TRAC_VERSION
from trac.admin.web_ui import AdminModule
from trac.core import *
from trac.config import *
from trac.perm import PermissionSystem
from trac.resource import ResourceNotFound
from trac.ticket.admin import TicketAdminPanel, _save_config
from trac.util import lazy
from trac.util.text import print_table, to_unicode, printerr, printout
from trac.util.translation import _, N_, gettext, ngettext
from trac.web.api import HTTPNotFound, IRequestFilter, IRequestHandler
from trac.web.chrome import Chrome, add_notice, add_warning
from multiproduct.env import ProductEnvironment
from multiproduct.model import Product
from multiproduct.perm import sudo
import multiproduct.versioncontrol
import trac.versioncontrol.admin
from trac.versioncontrol import DbRepositoryProvider, RepositoryManager
from multiproduct.util import ReplacementComponent
#--------------------------
# Product admin panel
#--------------------------
class ProductAdminPanel(TicketAdminPanel):
"""The Product Admin Panel"""
_type = 'products'
_label = ('Product','Products')
def get_admin_commands(self):
if not isinstance(self.env, ProductEnvironment):
yield ('product add', '<prefix> <owner> <name>',
'Add a new product',
None, self._do_product_add)
yield ('product chown', '<prefix> <owner>',
'Change product ownership',
self._complete_product, self._do_product_chown)
yield ('product list', '',
'Show available products',
None, self._do_product_list)
yield ('product remove', '<prefix>',
'Remove/uninstall a product',
self._complete_product, self._do_product_remove)
yield ('product rename', '<prefix> <newname>',
'Rename a product',
self._complete_product, self._do_product_rename)
def get_admin_panels(self, req):
if isinstance(req.perm.env, ProductEnvironment):
return None
return super(ProductAdminPanel, self).get_admin_panels(req)
def _render_admin_panel(self, req, cat, page, product):
req.perm.require('PRODUCT_VIEW')
name = req.args.get('name')
description = req.args.get('description','')
prefix = req.args.get('prefix') if product is None else product
owner = req.args.get('owner')
keys = {'prefix':prefix}
field_data = {'name':name,
'description':description,
'owner':owner,
}
# Detail view?
if product:
prod = Product(self.env, keys)
if req.method == 'POST':
if req.args.get('save'):
req.perm.require('PRODUCT_MODIFY')
prod.update_field_dict(field_data)
prod.update()
add_notice(req, _('Your changes have been saved.'))
req.redirect(req.href.admin(cat, page))
elif req.args.get('cancel'):
req.redirect(req.href.admin(cat, page))
Chrome(self.env).add_wiki_toolbars(req)
data = {'view': 'detail', 'product': prod}
else:
default = self.config.get('ticket', 'default_product')
if req.method == 'POST':
# Add Product
if req.args.get('add') and req.args.get('prefix'):
req.perm.require('PRODUCT_CREATE')
if not owner:
add_warning(req, _('All fields are required!'))
req.redirect(req.href.admin(cat, page))
try:
prod = Product(self.env, keys)
except ResourceNotFound:
prod = Product(self.env)
prod.update_field_dict(keys)
prod.update_field_dict(field_data)
prod.insert()
add_notice(req, _('The product "%(id)s" has been added.',
id=prefix))
req.redirect(req.href.admin(cat, page))
else:
if prod.prefix is None:
raise TracError(_('Invalid product id.'))
raise TracError(_("Product %(id)s already exists.",
id=prefix))
# Remove product
elif req.args.get('remove'):
raise TracError(_('Product removal is not allowed!'))
# Set default product
elif req.args.get('apply'):
prefix = req.args.get('default')
if prefix and prefix != default:
self.log.info("Setting default product to %s",
prefix)
self.config.set('ticket', 'default_product',
prefix)
_save_config(self.config, req, self.log)
req.redirect(req.href.admin(cat, page))
products = Product.select(self.env)
data = {'view': 'list',
'products': products,
'default': default}
if self.config.getbool('ticket', 'restrict_owner'):
perm = PermissionSystem(self.env)
def valid_owner(username):
return perm.get_user_permissions(username).get('TICKET_MODIFY')
data['owners'] = [username for username, name, email
in self.env.get_known_users()
if valid_owner(username)]
data['owners'].insert(0, '')
data['owners'].sort()
else:
data['owners'] = None
return 'admin_products.html', data
def load_product(self, prefix):
products = Product.select(self.env, where={'prefix' : prefix})
if not products:
raise AdminCommandError('Unknown product %s' % (prefix,))
return products[0]
def _complete_product(self, args):
if len(args) == 1:
return get_products(self.env)
def _do_product_add(self, prefix, owner, name):
product = Product(self.env)
product._data.update({'prefix':prefix, 'name':name, 'owner':owner})
try:
product.insert()
except TracError, exc:
raise AdminCommandError(to_unicode(exc))
def _do_product_chown(self, prefix, owner):
product = self.load_product(prefix)
product._data['owner'] = owner
product.update()
def _do_product_list(self):
if not isinstance(self.env, ProductEnvironment):
print_table([(p.prefix, p.owner, p.name)
for p in Product.select(self.env)],
[_('Prefix'), _('Owner'), _('Name')])
def _do_product_remove(self, prefix):
raise AdminCommandError(_("Command 'product remove' not supported yet"))
def _do_product_rename(self, prefix, newname):
product = self.load_product(prefix)
product._data['name'] = newname
product.update()
#--------------------------
# Advanced administration in product context
#--------------------------
class IProductAdminAclContributor(Interface):
"""Interface implemented by components contributing with entries to the
access control white list in order to enable admin panels in product
context.
**Notice** that deny entries configured by users in the blacklist
(i.e. using TracIni `admin_blacklist` option in `multiproduct` section)
will override these entries.
"""
def enable_product_admin_panels():
"""Return a sequence of `(cat_id, panel_id)` tuples that will be
enabled in product context unless specified otherwise in configuration.
If `panel_id` is set to `'*'` then all panels in section `cat_id`
will have green light.
"""
class ProductAdminModule(Component):
"""Leverage administration panels in product context based on the
combination of white list and black list.
"""
implements(IAdminCommandProvider, IRequestFilter, IRequestHandler)
acl_contributors = ExtensionPoint(IProductAdminAclContributor)
raw_blacklist = ListOption('multiproduct', 'admin_blacklist',
doc="""Do not show any product admin panels in this list even if
allowed by white list. Value must be a comma-separated list of
`cat:id` strings respectively identifying the section and identifier
of target admin panel. Empty values of `cat` and `id` will be ignored
and warnings emitted if TracLogging is enabled. If `id` is set
to `*` then all panels in `cat` section will be added to blacklist
while in product context.""")
@lazy
def acl(self):
"""Access control table based on blacklist and white list.
"""
# FIXME : Use an immutable (mapping?) type
acl = {}
if isinstance(self.env, ProductEnvironment):
for acl_c in self.acl_contributors:
for cat_id, panel_id in acl_c.enable_product_admin_panels():
if cat_id and panel_id:
if panel_id == '*':
acl[cat_id] = True
else:
acl[(cat_id, panel_id)] = True
else:
self.log.warning('Invalid panel %s in white list',
panel_id)
# Blacklist entries will override those in white list
warnings = []
for panelref in self.raw_blacklist:
try:
cat_id, panel_id = panelref.split(':')
except ValueError:
cat_id = panel_id = ''
if cat_id and panel_id:
if panel_id == '*':
acl[cat_id] = False
else:
acl[(cat_id, panel_id)] = False
else:
warnings.append(panelref)
if warnings:
self.log.warning("Invalid panel descriptors '%s' in blacklist",
','.join(warnings))
return acl
# IAdminCommandProvider methods
def get_admin_commands(self):
if not isinstance(self.env, ProductEnvironment):
yield ('product admin', '<PREFIX> <admin command>',
'Execute admin (sub-)command upon product resources',
self._complete_product_admin, self._do_product_admin)
def product_admincmd_mgr(self, prefix):
try:
product_env = ProductEnvironment.lookup_env(self.env, prefix)
except LookupError:
raise AdminCommandError('Unknown product %s' % (prefix,))
else:
return AdminCommandManager(product_env)
def _complete_product_admin(self, args):
if len(args) == 1:
return get_products(self.env)
else:
mgr = self.product_admincmd_mgr(args[0])
return mgr.complete_command(args[1:])
GLOBAL_COMMANDS = ('deploy', 'help', 'hotcopy', 'initenv', 'upgrade')
def _do_product_admin(self, prefix, *args):
mgr = self.product_admincmd_mgr(prefix)
if args and args[0] in self.GLOBAL_COMMANDS:
raise AdminCommandError('%s command not supported for products' %
(args[0],))
if args and args[0] == 'help':
help_args = args[1:]
if help_args:
doc = mgr.get_command_help(list(help_args))
if doc:
TracAdmin.print_doc(doc)
else:
printerr(_("No documentation found for '%(cmd)s'."
" Use 'help' to see the list of commands.",
cmd=' '.join(help_args)))
cmds = mgr.get_similar_commands(help_args[0])
if cmds:
printout('')
printout(ngettext("Did you mean this?",
"Did you mean one of these?",
len(cmds)))
for cmd in cmds:
printout(' ' + cmd)
else:
printout(_("trac-admin - The Trac Administration Console "
"%(version)s", version=TRAC_VERSION))
env = mgr.env
TracAdmin.print_doc(TracAdmin.all_docs(env), short=True)
else:
mgr.execute_command(*args)
# IRequestFilter methods
def pre_process_request(self, req, handler):
"""Intercept admin requests in product context if `TRAC_ADMIN`
expectations are not met.
"""
if isinstance(self.env, ProductEnvironment) and \
handler is AdminModule(self.env) and \
not req.perm.has_permission('TRAC_ADMIN') and \
req.perm.has_permission('PRODUCT_ADMIN'):
# Intercept admin request
return self
return handler
def post_process_request(self, req, template, data, content_type):
return template, data, content_type
# IRequestHandler methods
def match_request(self, req):
"""Never match a request"""
def process_request(self, req):
"""Anticipate permission error to hijack admin panel dispatching
process in product context if `TRAC_ADMIN` expectations are not met.
"""
# TODO: Verify `isinstance(self.env, ProductEnvironment)` once again ?
cat_id = req.args.get('cat_id')
panel_id = req.args.get('panel_id')
if self._check_panel(cat_id, panel_id):
with sudo(req):
return self.global_process_request(req)
else:
raise HTTPNotFound(_('Unknown administration panel'))
global_process_request = AdminModule.process_request.im_func
# Internal methods
def _get_panels(self, req):
if isinstance(self.env, ProductEnvironment):
panels, providers = AdminModule(self.env)._get_panels(req)
# Filter based on ACLs
panels = [p for p in panels if self._check_panel(p[0], p[2])]
# providers = dict([k, p] for k, p in providers.iteritems()
# if self._check_panel(*k))
return panels, providers
else:
return [], []
def _check_panel(self, cat_id, panel_id):
cat_allow = self.acl.get(cat_id)
panel_allow = self.acl.get((cat_id, panel_id))
return cat_allow is not False and panel_allow is not False \
and (cat_allow, panel_allow) != (None, None) \
and (cat_id, panel_id) != ('general', 'plugin') # double-check !
def get_products(env):
return [p.prefix for p in Product.select(env)]
class DefaultProductAdminWhitelist(Component):
implements(IProductAdminAclContributor)
# IProductAdminAclContributor methods
def enable_product_admin_panels(self):
yield 'general', 'basics'
yield 'general', 'perm'
yield 'accounts', 'notification'
# FIXME: Include users admin panel ?
#yield 'accounts', 'users'
yield 'ticket', '*'
yield 'versioncontrol', 'repository'
class ProductRepositoryAdminPanel(ReplacementComponent, trac.versioncontrol.admin.RepositoryAdminPanel):
"""Web admin panel for repository administration, product-aware."""
implements(trac.admin.IAdminPanelProvider)
# IAdminPanelProvider methods
def get_admin_panels(self, req):
if 'VERSIONCONTROL_ADMIN' in req.perm:
yield ('versioncontrol', _('Version Control'), 'repository',
_('Repository Links') if isinstance(self.env, ProductEnvironment)
else _('Repositories'))
def render_admin_panel(self, req, category, page, path_info):
if not isinstance(self.env, ProductEnvironment):
return super(ProductRepositoryAdminPanel, self).render_admin_panel(
req, category, page, path_info)
req.perm.require('VERSIONCONTROL_ADMIN')
db_provider = self.env[DbRepositoryProvider]
if req.method == 'POST' and db_provider:
if req.args.get('remove'):
repolist = req.args.get('sel')
if repolist:
if isinstance(repolist, basestring):
repolist = [repolist, ]
for reponame in repolist:
db_provider.unlink_product(reponame)
elif req.args.get('addlink') is not None and db_provider:
reponame = req.args.get('repository')
db_provider.link_product(reponame)
req.redirect(req.href.admin(category, page))
# Retrieve info for all product repositories
rm_product = RepositoryManager(self.env)
rm_product.reload_repositories()
all_product_repos = rm_product.get_all_repositories()
repositories = dict((reponame, self._extend_info(
reponame, info.copy(), True))
for (reponame, info) in
all_product_repos.iteritems())
types = sorted([''] + rm_product.get_supported_types())
# construct a list of all repositores not linked to this product
rm = RepositoryManager(self.env.parent)
all_repos = rm.get_all_repositories()
unlinked_repositories = dict([(k, all_repos[k]) for k in
sorted(set(all_repos) - set(all_product_repos))])
data = {'types': types, 'default_type': rm_product.repository_type,
'repositories': repositories,
'unlinked_repositories': unlinked_repositories}
return 'repository_links.html', data
trac.versioncontrol.admin.RepositoryAdminPanel = ProductRepositoryAdminPanel