|  | #!/usr/bin/env python | 
|  | # -*- coding: utf-8 -*- | 
|  | # | 
|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one | 
|  | # or more contributor license agreements.  See the NOTICE file | 
|  | # distributed with this work for additional information | 
|  | # regarding copyright ownership.  The ASF licenses this file | 
|  | # to you under the Apache License, Version 2.0 (the | 
|  | # "License"); you may not use this file except in compliance | 
|  | # with the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, | 
|  | # software distributed under the License is distributed on an | 
|  | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | # KIND, either express or implied.  See the License for the | 
|  | # specific language governing permissions and limitations | 
|  | # under the License. | 
|  | # | 
|  | # | 
|  |  | 
|  | # $HeadURL$ | 
|  | # $LastChangedDate$ | 
|  | # $LastChangedBy$ | 
|  | # $LastChangedRevision$ | 
|  |  | 
|  | import sys, os | 
|  | import getopt | 
|  | import shlex | 
|  |  | 
|  | try: | 
|  | # Python >=3.0 | 
|  | from subprocess import getstatusoutput as subprocess_getstatusoutput | 
|  | except ImportError: | 
|  | # Python <3.0 | 
|  | from commands import getstatusoutput as subprocess_getstatusoutput | 
|  | try: | 
|  | my_getopt = getopt.gnu_getopt | 
|  | except AttributeError: | 
|  | my_getopt = getopt.getopt | 
|  | import re | 
|  |  | 
|  | __author__ = "Gustavo Niemeyer <gustavo@niemeyer.net>" | 
|  |  | 
|  | class Error(Exception): pass | 
|  |  | 
|  | SECTION = re.compile(r'\[([^]]+?)(?:\s+extends\s+([^]]+))?\]') | 
|  | OPTION = re.compile(r'(\S+)\s*=\s*(.*)$') | 
|  |  | 
|  | class Config: | 
|  | def __init__(self, filename): | 
|  | # Options are stored in __sections_list like this: | 
|  | # [(sectname, [(optname, optval), ...]), ...] | 
|  | self._sections_list = [] | 
|  | self._sections_dict = {} | 
|  | self._read(filename) | 
|  |  | 
|  | def _read(self, filename): | 
|  | # Use the same logic as in ConfigParser.__read() | 
|  | file = open(filename) | 
|  | cursectdict = None | 
|  | optname = None | 
|  | lineno = 0 | 
|  | for line in file: | 
|  | lineno = lineno + 1 | 
|  | if line.isspace() or line[0] == '#': | 
|  | continue | 
|  | if line[0].isspace() and cursectdict is not None and optname: | 
|  | value = line.strip() | 
|  | cursectdict[optname] = "%s %s" % (cursectdict[optname], value) | 
|  | cursectlist[-1][1] = "%s %s" % (cursectlist[-1][1], value) | 
|  | else: | 
|  | m = SECTION.match(line) | 
|  | if m: | 
|  | sectname = m.group(1) | 
|  | parentsectname = m.group(2) | 
|  | if parentsectname is None: | 
|  | # No parent section defined, so start a new section | 
|  | cursectdict = self._sections_dict.setdefault \ | 
|  | (sectname, {}) | 
|  | cursectlist = [] | 
|  | else: | 
|  | # Copy the parent section into the new section | 
|  | parentsectdict = self._sections_dict.get \ | 
|  | (parentsectname, {}) | 
|  | cursectdict = self._sections_dict.setdefault \ | 
|  | (sectname, parentsectdict.copy()) | 
|  | cursectlist = self.walk(parentsectname) | 
|  | self._sections_list.append((sectname, cursectlist)) | 
|  | optname = None | 
|  | elif cursectdict is None: | 
|  | raise Error("%s:%d: no section header" % \ | 
|  | (filename, lineno)) | 
|  | else: | 
|  | m = OPTION.match(line) | 
|  | if m: | 
|  | optname, optval = m.groups() | 
|  | optval = optval.strip() | 
|  | cursectdict[optname] = optval | 
|  | cursectlist.append([optname, optval]) | 
|  | else: | 
|  | raise Error("%s:%d: parsing error" % \ | 
|  | (filename, lineno)) | 
|  |  | 
|  | def sections(self): | 
|  | return list(self._sections_dict.keys()) | 
|  |  | 
|  | def options(self, section): | 
|  | return list(self._sections_dict.get(section, {}).keys()) | 
|  |  | 
|  | def get(self, section, option, default=None): | 
|  | return self._sections_dict.get(option, default) | 
|  |  | 
|  | def walk(self, section, option=None): | 
|  | ret = [] | 
|  | for sectname, options in self._sections_list: | 
|  | if sectname == section: | 
|  | for optname, value in options: | 
|  | if not option or optname == option: | 
|  | ret.append((optname, value)) | 
|  | return ret | 
|  |  | 
|  |  | 
|  | class Permission: | 
|  | def __init__(self): | 
|  | self._group = {} | 
|  | self._permlist = [] | 
|  |  | 
|  | def parse_groups(self, groupsiter): | 
|  | for option, value in groupsiter: | 
|  | groupusers = [] | 
|  | for token in shlex.split(value): | 
|  | # expand nested groups in place; no forward decls | 
|  | if token[0] == "@": | 
|  | try: | 
|  | groupusers.extend(self._group[token[1:]]) | 
|  | except KeyError: | 
|  | raise Error, "group '%s' not found" % token[1:] | 
|  | else: | 
|  | groupusers.append(token) | 
|  | self._group[option] = groupusers | 
|  |  | 
|  | def parse_perms(self, permsiter): | 
|  | for option, value in permsiter: | 
|  | # Paths never start with /, so remove it if provided | 
|  | if option[0] == "/": | 
|  | option = option[1:] | 
|  | pattern = re.compile("^%s$" % option) | 
|  | for entry in value.split(): | 
|  | openpar, closepar = entry.find("("), entry.find(")") | 
|  | groupsusers = entry[:openpar].split(",") | 
|  | perms = entry[openpar+1:closepar].split(",") | 
|  | users = [] | 
|  | for groupuser in groupsusers: | 
|  | if groupuser[0] == "@": | 
|  | try: | 
|  | users.extend(self._group[groupuser[1:]]) | 
|  | except KeyError: | 
|  | raise Error("group '%s' not found" % \ | 
|  | groupuser[1:]) | 
|  | else: | 
|  | users.append(groupuser) | 
|  | self._permlist.append((pattern, users, perms)) | 
|  |  | 
|  | def get(self, user, path): | 
|  | ret = [] | 
|  | for pattern, users, perms in self._permlist: | 
|  | if pattern.match(path) and (user in users or "*" in users): | 
|  | ret = perms | 
|  | return ret | 
|  |  | 
|  | class SVNLook: | 
|  | def __init__(self, repospath, txn=None, rev=None): | 
|  | self.repospath = repospath | 
|  | self.txn = txn | 
|  | self.rev = rev | 
|  |  | 
|  | def _execcmd(self, *cmd, **kwargs): | 
|  | cmdstr = " ".join(cmd) | 
|  | status, output = subprocess_getstatusoutput(cmdstr) | 
|  | if status != 0: | 
|  | sys.stderr.write(cmdstr) | 
|  | sys.stderr.write("\n") | 
|  | sys.stderr.write(output) | 
|  | raise Error("command failed: %s\n%s" % (cmdstr, output)) | 
|  | return status, output | 
|  |  | 
|  | def _execsvnlook(self, cmd, *args, **kwargs): | 
|  | execcmd_args = ["svnlook", cmd, self.repospath] | 
|  | self._add_txnrev(execcmd_args, kwargs) | 
|  | execcmd_args += args | 
|  | execcmd_kwargs = {} | 
|  | keywords = ["show", "noerror"] | 
|  | for key in keywords: | 
|  | if key in kwargs: | 
|  | execcmd_kwargs[key] = kwargs[key] | 
|  | return self._execcmd(*execcmd_args, **execcmd_kwargs) | 
|  |  | 
|  | def _add_txnrev(self, cmd_args, received_kwargs): | 
|  | if "txn" in received_kwargs: | 
|  | txn = received_kwargs.get("txn") | 
|  | if txn is not None: | 
|  | cmd_args += ["-t", txn] | 
|  | elif self.txn is not None: | 
|  | cmd_args += ["-t", self.txn] | 
|  | if "rev" in received_kwargs: | 
|  | rev = received_kwargs.get("rev") | 
|  | if rev is not None: | 
|  | cmd_args += ["-r", rev] | 
|  | elif self.rev is not None: | 
|  | cmd_args += ["-r", self.rev] | 
|  |  | 
|  | def changed(self, **kwargs): | 
|  | status, output = self._execsvnlook("changed", **kwargs) | 
|  | if status != 0: | 
|  | return None | 
|  | changes = [] | 
|  | for line in output.splitlines(): | 
|  | line = line.rstrip() | 
|  | if not line: continue | 
|  | entry = [None, None, None] | 
|  | changedata, changeprop, path = None, None, None | 
|  | if line[0] != "_": | 
|  | changedata = line[0] | 
|  | if line[1] != " ": | 
|  | changeprop = line[1] | 
|  | path = line[4:] | 
|  | changes.append((changedata, changeprop, path)) | 
|  | return changes | 
|  |  | 
|  | def author(self, **kwargs): | 
|  | status, output = self._execsvnlook("author", **kwargs) | 
|  | if status != 0: | 
|  | return None | 
|  | return output.strip() | 
|  |  | 
|  |  | 
|  | def check_perms(filename, section, repos, txn=None, rev=None, author=None): | 
|  | svnlook = SVNLook(repos, txn=txn, rev=rev) | 
|  | if author is None: | 
|  | author = svnlook.author() | 
|  | changes = svnlook.changed() | 
|  | try: | 
|  | config = Config(filename) | 
|  | except IOError: | 
|  | raise Error("can't read config file "+filename) | 
|  | if not section in config.sections(): | 
|  | raise Error("section '%s' not found in config file" % section) | 
|  | perm = Permission() | 
|  | perm.parse_groups(config.walk("groups")) | 
|  | perm.parse_groups(config.walk(section+" groups")) | 
|  | perm.parse_perms(config.walk(section)) | 
|  | permerrors = [] | 
|  | for changedata, changeprop, path in changes: | 
|  | pathperms = perm.get(author, path) | 
|  | if changedata == "A" and "add" not in pathperms: | 
|  | permerrors.append("you can't add "+path) | 
|  | elif changedata == "U" and "update" not in pathperms: | 
|  | permerrors.append("you can't update "+path) | 
|  | elif changedata == "D" and "remove" not in pathperms: | 
|  | permerrors.append("you can't remove "+path) | 
|  | elif changeprop == "U" and "update" not in pathperms: | 
|  | permerrors.append("you can't update properties of "+path) | 
|  | #else: | 
|  | #    print "cdata=%s cprop=%s path=%s perms=%s" % \ | 
|  | #          (str(changedata), str(changeprop), path, str(pathperms)) | 
|  | if permerrors: | 
|  | permerrors.insert(0, "you don't have enough permissions for " | 
|  | "this transaction:") | 
|  | raise Error("\n".join(permerrors)) | 
|  |  | 
|  |  | 
|  | # Command: | 
|  |  | 
|  | USAGE = """\ | 
|  | Usage: svnperms.py OPTIONS | 
|  |  | 
|  | Options: | 
|  | -r PATH    Use repository at PATH to check transactions | 
|  | -t TXN     Query transaction TXN for commit information | 
|  | -f PATH    Use PATH as configuration file (default is repository | 
|  | path + /conf/svnperms.conf) | 
|  | -s NAME    Use section NAME as permission section (default is | 
|  | repository name, extracted from repository path) | 
|  | -R REV     Query revision REV for commit information (for tests) | 
|  | -A AUTHOR  Check commit as if AUTHOR had committed it (for tests) | 
|  | -h         Show this message | 
|  | """ | 
|  |  | 
|  | class MissingArgumentsException(Exception): | 
|  | "Thrown when required arguments are missing." | 
|  | pass | 
|  |  | 
|  | def parse_options(): | 
|  | try: | 
|  | opts, args = my_getopt(sys.argv[1:], "f:s:r:t:R:A:h", ["help"]) | 
|  | except getopt.GetoptError as e: | 
|  | raise Error(e.msg) | 
|  | class Options: pass | 
|  | obj = Options() | 
|  | obj.filename = None | 
|  | obj.section = None | 
|  | obj.repository = None | 
|  | obj.transaction = None | 
|  | obj.revision = None | 
|  | obj.author = None | 
|  | for opt, val in opts: | 
|  | if opt == "-f": | 
|  | obj.filename = val | 
|  | elif opt == "-s": | 
|  | obj.section = val | 
|  | elif opt == "-r": | 
|  | obj.repository = val | 
|  | elif opt == "-t": | 
|  | obj.transaction = val | 
|  | elif opt == "-R": | 
|  | obj.revision = val | 
|  | elif opt == "-A": | 
|  | obj.author = val | 
|  | elif opt in ["-h", "--help"]: | 
|  | sys.stdout.write(USAGE) | 
|  | sys.exit(0) | 
|  | missingopts = [] | 
|  | if not obj.repository: | 
|  | missingopts.append("repository") | 
|  | if not (obj.transaction or obj.revision): | 
|  | missingopts.append("either transaction or a revision") | 
|  | if missingopts: | 
|  | raise MissingArgumentsException("missing required option(s): " + ", ".join(missingopts)) | 
|  | obj.repository = os.path.abspath(obj.repository) | 
|  | if obj.filename is None: | 
|  | obj.filename = os.path.join(obj.repository, "conf", "svnperms.conf") | 
|  | if obj.section is None: | 
|  | obj.section = os.path.basename(obj.repository) | 
|  | if not (os.path.isdir(obj.repository) and | 
|  | os.path.isdir(os.path.join(obj.repository, "db")) and | 
|  | os.path.isdir(os.path.join(obj.repository, "hooks")) and | 
|  | os.path.isfile(os.path.join(obj.repository, "format"))): | 
|  | raise Error("path '%s' doesn't look like a repository" % \ | 
|  | obj.repository) | 
|  |  | 
|  | return obj | 
|  |  | 
|  | def main(): | 
|  | try: | 
|  | opts = parse_options() | 
|  | check_perms(opts.filename, opts.section, | 
|  | opts.repository, opts.transaction, opts.revision, | 
|  | opts.author) | 
|  | except MissingArgumentsException as e: | 
|  | sys.stderr.write("%s\n" % str(e)) | 
|  | sys.stderr.write(USAGE) | 
|  | sys.exit(1) | 
|  | except Error as e: | 
|  | sys.stderr.write("error: %s\n" % str(e)) | 
|  | sys.exit(1) | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | main() | 
|  |  | 
|  | # vim:et:ts=4:sw=4 |