blob: 4275b1b15b28ff3078fd0216a1ad8ca50a72d49c [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#
# $HeadURL$
# $LastChangedDate$
# $LastChangedBy$
# $LastChangedRevision$
import sys, os
import getopt
import shlex
try:
# Python >=3.0
from subprocess import getstatusoutput as subprocess_getstatusoutput
except ImportError:
# Python <3.0
from commands import getstatusoutput as subprocess_getstatusoutput
try:
my_getopt = getopt.gnu_getopt
except AttributeError:
my_getopt = getopt.getopt
import re
__author__ = "Gustavo Niemeyer <gustavo@niemeyer.net>"
class Error(Exception): pass
SECTION = re.compile(r'\[([^]]+?)(?:\s+extends\s+([^]]+))?\]')
OPTION = re.compile(r'(\S+)\s*=\s*(.*)$')
class Config:
def __init__(self, filename):
# Options are stored in __sections_list like this:
# [(sectname, [(optname, optval), ...]), ...]
self._sections_list = []
self._sections_dict = {}
self._read(filename)
def _read(self, filename):
# Use the same logic as in ConfigParser.__read()
file = open(filename)
cursectdict = None
optname = None
lineno = 0
for line in file:
lineno = lineno + 1
if line.isspace() or line[0] == '#':
continue
if line[0].isspace() and cursectdict is not None and optname:
value = line.strip()
cursectdict[optname] = "%s %s" % (cursectdict[optname], value)
cursectlist[-1][1] = "%s %s" % (cursectlist[-1][1], value)
else:
m = SECTION.match(line)
if m:
sectname = m.group(1)
parentsectname = m.group(2)
if parentsectname is None:
# No parent section defined, so start a new section
cursectdict = self._sections_dict.setdefault \
(sectname, {})
cursectlist = []
else:
# Copy the parent section into the new section
parentsectdict = self._sections_dict.get \
(parentsectname, {})
cursectdict = self._sections_dict.setdefault \
(sectname, parentsectdict.copy())
cursectlist = self.walk(parentsectname)
self._sections_list.append((sectname, cursectlist))
optname = None
elif cursectdict is None:
raise Error("%s:%d: no section header" % \
(filename, lineno))
else:
m = OPTION.match(line)
if m:
optname, optval = m.groups()
optval = optval.strip()
cursectdict[optname] = optval
cursectlist.append([optname, optval])
else:
raise Error("%s:%d: parsing error" % \
(filename, lineno))
def sections(self):
return list(self._sections_dict.keys())
def options(self, section):
return list(self._sections_dict.get(section, {}).keys())
def get(self, section, option, default=None):
return self._sections_dict.get(option, default)
def walk(self, section, option=None):
ret = []
for sectname, options in self._sections_list:
if sectname == section:
for optname, value in options:
if not option or optname == option:
ret.append((optname, value))
return ret
class Permission:
def __init__(self):
self._group = {}
self._permlist = []
def parse_groups(self, groupsiter):
for option, value in groupsiter:
groupusers = []
for token in shlex.split(value):
# expand nested groups in place; no forward decls
if token[0] == "@":
try:
groupusers.extend(self._group[token[1:]])
except KeyError:
raise Error, "group '%s' not found" % token[1:]
else:
groupusers.append(token)
self._group[option] = groupusers
def parse_perms(self, permsiter):
for option, value in permsiter:
# Paths never start with /, so remove it if provided
if option[0] == "/":
option = option[1:]
pattern = re.compile("^%s$" % option)
for entry in value.split():
openpar, closepar = entry.find("("), entry.find(")")
groupsusers = entry[:openpar].split(",")
perms = entry[openpar+1:closepar].split(",")
users = []
for groupuser in groupsusers:
if groupuser[0] == "@":
try:
users.extend(self._group[groupuser[1:]])
except KeyError:
raise Error("group '%s' not found" % \
groupuser[1:])
else:
users.append(groupuser)
self._permlist.append((pattern, users, perms))
def get(self, user, path):
ret = []
for pattern, users, perms in self._permlist:
if pattern.match(path) and (user in users or "*" in users):
ret = perms
return ret
class SVNLook:
def __init__(self, repospath, txn=None, rev=None):
self.repospath = repospath
self.txn = txn
self.rev = rev
def _execcmd(self, *cmd, **kwargs):
cmdstr = " ".join(cmd)
status, output = subprocess_getstatusoutput(cmdstr)
if status != 0:
sys.stderr.write(cmdstr)
sys.stderr.write("\n")
sys.stderr.write(output)
raise Error("command failed: %s\n%s" % (cmdstr, output))
return status, output
def _execsvnlook(self, cmd, *args, **kwargs):
execcmd_args = ["svnlook", cmd, self.repospath]
self._add_txnrev(execcmd_args, kwargs)
execcmd_args += args
execcmd_kwargs = {}
keywords = ["show", "noerror"]
for key in keywords:
if key in kwargs:
execcmd_kwargs[key] = kwargs[key]
return self._execcmd(*execcmd_args, **execcmd_kwargs)
def _add_txnrev(self, cmd_args, received_kwargs):
if "txn" in received_kwargs:
txn = received_kwargs.get("txn")
if txn is not None:
cmd_args += ["-t", txn]
elif self.txn is not None:
cmd_args += ["-t", self.txn]
if "rev" in received_kwargs:
rev = received_kwargs.get("rev")
if rev is not None:
cmd_args += ["-r", rev]
elif self.rev is not None:
cmd_args += ["-r", self.rev]
def changed(self, **kwargs):
status, output = self._execsvnlook("changed", **kwargs)
if status != 0:
return None
changes = []
for line in output.splitlines():
line = line.rstrip()
if not line: continue
entry = [None, None, None]
changedata, changeprop, path = None, None, None
if line[0] != "_":
changedata = line[0]
if line[1] != " ":
changeprop = line[1]
path = line[4:]
changes.append((changedata, changeprop, path))
return changes
def author(self, **kwargs):
status, output = self._execsvnlook("author", **kwargs)
if status != 0:
return None
return output.strip()
def check_perms(filename, section, repos, txn=None, rev=None, author=None):
svnlook = SVNLook(repos, txn=txn, rev=rev)
if author is None:
author = svnlook.author()
changes = svnlook.changed()
try:
config = Config(filename)
except IOError:
raise Error("can't read config file "+filename)
if not section in config.sections():
raise Error("section '%s' not found in config file" % section)
perm = Permission()
perm.parse_groups(config.walk("groups"))
perm.parse_groups(config.walk(section+" groups"))
perm.parse_perms(config.walk(section))
permerrors = []
for changedata, changeprop, path in changes:
pathperms = perm.get(author, path)
if changedata == "A" and "add" not in pathperms:
permerrors.append("you can't add "+path)
elif changedata == "U" and "update" not in pathperms:
permerrors.append("you can't update "+path)
elif changedata == "D" and "remove" not in pathperms:
permerrors.append("you can't remove "+path)
elif changeprop == "U" and "update" not in pathperms:
permerrors.append("you can't update properties of "+path)
#else:
# print "cdata=%s cprop=%s path=%s perms=%s" % \
# (str(changedata), str(changeprop), path, str(pathperms))
if permerrors:
permerrors.insert(0, "you don't have enough permissions for "
"this transaction:")
raise Error("\n".join(permerrors))
# Command:
USAGE = """\
Usage: svnperms.py OPTIONS
Options:
-r PATH Use repository at PATH to check transactions
-t TXN Query transaction TXN for commit information
-f PATH Use PATH as configuration file (default is repository
path + /conf/svnperms.conf)
-s NAME Use section NAME as permission section (default is
repository name, extracted from repository path)
-R REV Query revision REV for commit information (for tests)
-A AUTHOR Check commit as if AUTHOR had committed it (for tests)
-h Show this message
"""
class MissingArgumentsException(Exception):
"Thrown when required arguments are missing."
pass
def parse_options():
try:
opts, args = my_getopt(sys.argv[1:], "f:s:r:t:R:A:h", ["help"])
except getopt.GetoptError as e:
raise Error(e.msg)
class Options: pass
obj = Options()
obj.filename = None
obj.section = None
obj.repository = None
obj.transaction = None
obj.revision = None
obj.author = None
for opt, val in opts:
if opt == "-f":
obj.filename = val
elif opt == "-s":
obj.section = val
elif opt == "-r":
obj.repository = val
elif opt == "-t":
obj.transaction = val
elif opt == "-R":
obj.revision = val
elif opt == "-A":
obj.author = val
elif opt in ["-h", "--help"]:
sys.stdout.write(USAGE)
sys.exit(0)
missingopts = []
if not obj.repository:
missingopts.append("repository")
if not (obj.transaction or obj.revision):
missingopts.append("either transaction or a revision")
if missingopts:
raise MissingArgumentsException("missing required option(s): " + ", ".join(missingopts))
obj.repository = os.path.abspath(obj.repository)
if obj.filename is None:
obj.filename = os.path.join(obj.repository, "conf", "svnperms.conf")
if obj.section is None:
obj.section = os.path.basename(obj.repository)
if not (os.path.isdir(obj.repository) and
os.path.isdir(os.path.join(obj.repository, "db")) and
os.path.isdir(os.path.join(obj.repository, "hooks")) and
os.path.isfile(os.path.join(obj.repository, "format"))):
raise Error("path '%s' doesn't look like a repository" % \
obj.repository)
return obj
def main():
try:
opts = parse_options()
check_perms(opts.filename, opts.section,
opts.repository, opts.transaction, opts.revision,
opts.author)
except MissingArgumentsException as e:
sys.stderr.write("%s\n" % str(e))
sys.stderr.write(USAGE)
sys.exit(1)
except Error as e:
sys.stderr.write("error: %s\n" % str(e))
sys.exit(1)
if __name__ == "__main__":
main()
# vim:et:ts=4:sw=4