| #!/usr/bin/env python |
| # |
| # svnshell.py : a Python-based shell interface for cruising 'round in |
| # the filesystem. |
| # |
| ###################################################################### |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| ###################################################################### |
| # |
| |
| import sys |
| import time |
| import re |
| from cmd import Cmd |
| from random import randint |
| from svn import fs, core, repos |
| |
| |
| class SVNShell(Cmd): |
| def __init__(self, path): |
| """initialize an SVNShell object""" |
| Cmd.__init__(self) |
| path = core.svn_path_canonicalize(path) |
| self.fs_ptr = repos.fs(repos.open(path)) |
| self.is_rev = 1 |
| self.rev = fs.youngest_rev(self.fs_ptr) |
| self.txn = None |
| self.root = fs.revision_root(self.fs_ptr, self.rev) |
| self.path = "/" |
| self._setup_prompt() |
| self.cmdloop() |
| |
| def precmd(self, line): |
| if line == "EOF": |
| # Ctrl-D is a command without a newline. Print a newline, so the next |
| # shell prompt is not on the same line as the last svnshell prompt. |
| print("") |
| return "exit" |
| return line |
| |
| def postcmd(self, stop, line): |
| self._setup_prompt() |
| |
| _errors = ["Huh?", |
| "Whatchoo talkin' 'bout, Willis?", |
| "Say what?", |
| "Nope. Not gonna do it.", |
| "Ehh...I don't think so, chief."] |
| |
| def default(self, line): |
| print(self._errors[randint(0, len(self._errors) - 1)]) |
| |
| def do_cat(self, arg): |
| """dump the contents of a file""" |
| if not len(arg): |
| print("You must supply a file path.") |
| return |
| catpath = self._parse_path(arg) |
| kind = fs.check_path(self.root, catpath) |
| if kind == core.svn_node_none: |
| print("Path '%s' does not exist." % catpath) |
| return |
| if kind == core.svn_node_dir: |
| print("Path '%s' is not a file." % catpath) |
| return |
| ### be nice to get some paging in here. |
| stream = fs.file_contents(self.root, catpath) |
| while True: |
| data = core.svn_stream_read(stream, core.SVN_STREAM_CHUNK_SIZE) |
| sys.stdout.write(data) |
| if len(data) < core.SVN_STREAM_CHUNK_SIZE: |
| break |
| |
| def do_cd(self, arg): |
| """change directory""" |
| newpath = self._parse_path(arg) |
| |
| # make sure that path actually exists in the filesystem as a directory |
| kind = fs.check_path(self.root, newpath) |
| if kind != core.svn_node_dir: |
| print("Path '%s' is not a valid filesystem directory." % newpath) |
| return |
| self.path = newpath |
| |
| def do_ls(self, arg): |
| """list the contents of the current directory or provided path""" |
| parent = self.path |
| if not len(arg): |
| # no arg -- show a listing for the current directory. |
| entries = fs.dir_entries(self.root, self.path) |
| else: |
| # arg? show a listing of that path. |
| newpath = self._parse_path(arg) |
| kind = fs.check_path(self.root, newpath) |
| if kind == core.svn_node_dir: |
| parent = newpath |
| entries = fs.dir_entries(self.root, parent) |
| elif kind == core.svn_node_file: |
| parts = self._path_to_parts(newpath) |
| name = parts.pop(-1) |
| parent = self._parts_to_path(parts) |
| print(parent + ':' + name) |
| tmpentries = fs.dir_entries(self.root, parent) |
| if not tmpentries.get(name, None): |
| return |
| entries = {} |
| entries[name] = tmpentries[name] |
| else: |
| print("Path '%s' not found." % newpath) |
| return |
| |
| keys = sorted(entries.keys()) |
| |
| print(" REV AUTHOR NODE-REV-ID SIZE DATE NAME") |
| print("----------------------------------------------------------------------------") |
| |
| for entry in keys: |
| fullpath = parent + '/' + entry |
| size = '' |
| is_dir = fs.is_dir(self.root, fullpath) |
| if is_dir: |
| name = entry + '/' |
| else: |
| size = str(fs.file_length(self.root, fullpath)) |
| name = entry |
| node_id = fs.unparse_id(entries[entry].id) |
| created_rev = fs.node_created_rev(self.root, fullpath) |
| author = fs.revision_prop(self.fs_ptr, created_rev, |
| core.SVN_PROP_REVISION_AUTHOR) |
| if not author: |
| author = "" |
| date = fs.revision_prop(self.fs_ptr, created_rev, |
| core.SVN_PROP_REVISION_DATE) |
| if not date: |
| date = "" |
| else: |
| date = self._format_date(date) |
| |
| print("%6s %8s %12s %8s %12s %s" % (created_rev, author[:8], |
| node_id, size, date, name)) |
| |
| def do_lstxns(self, arg): |
| """list the transactions available for browsing""" |
| txns = sorted(fs.list_transactions(self.fs_ptr)) |
| counter = 0 |
| for txn in txns: |
| counter = counter + 1 |
| sys.stdout.write("%8s " % txn) |
| if counter == 6: |
| print("") |
| counter = 0 |
| print("") |
| |
| def do_pcat(self, arg): |
| """list the properties of a path""" |
| catpath = self.path |
| if len(arg): |
| catpath = self._parse_path(arg) |
| kind = fs.check_path(self.root, catpath) |
| if kind == core.svn_node_none: |
| print("Path '%s' does not exist." % catpath) |
| return |
| plist = fs.node_proplist(self.root, catpath) |
| if not plist: |
| return |
| for pkey, pval in plist.items(): |
| print('K ' + str(len(pkey))) |
| print(pkey) |
| print('P ' + str(len(pval))) |
| print(pval) |
| print('PROPS-END') |
| |
| def do_setrev(self, arg): |
| """set the current revision to view""" |
| try: |
| if arg.lower() == 'head': |
| rev = fs.youngest_rev(self.fs_ptr) |
| else: |
| rev = int(arg) |
| newroot = fs.revision_root(self.fs_ptr, rev) |
| except: |
| print("Error setting the revision to '" + arg + "'.") |
| return |
| fs.close_root(self.root) |
| self.root = newroot |
| self.rev = rev |
| self.is_rev = 1 |
| self._do_path_landing() |
| |
| def do_settxn(self, arg): |
| """set the current transaction to view""" |
| try: |
| txnobj = fs.open_txn(self.fs_ptr, arg) |
| newroot = fs.txn_root(txnobj) |
| except: |
| print("Error setting the transaction to '" + arg + "'.") |
| return |
| fs.close_root(self.root) |
| self.root = newroot |
| self.txn = arg |
| self.is_rev = 0 |
| self._do_path_landing() |
| |
| def do_youngest(self, arg): |
| """list the youngest revision available for browsing""" |
| rev = fs.youngest_rev(self.fs_ptr) |
| print(rev) |
| |
| def do_exit(self, arg): |
| sys.exit(0) |
| |
| def _path_to_parts(self, path): |
| return [_f for _f in path.split('/') if _f] |
| |
| def _parts_to_path(self, parts): |
| return '/' + '/'.join(parts) |
| |
| def _parse_path(self, path): |
| # cleanup leading, trailing, and duplicate '/' characters |
| newpath = self._parts_to_path(self._path_to_parts(path)) |
| |
| # if PATH is absolute, use it, else append it to the existing path. |
| if path.startswith('/') or self.path == '/': |
| newpath = '/' + newpath |
| else: |
| newpath = self.path + '/' + newpath |
| |
| # cleanup '.' and '..' |
| parts = self._path_to_parts(newpath) |
| finalparts = [] |
| for part in parts: |
| if part == '.': |
| pass |
| elif part == '..': |
| if len(finalparts) != 0: |
| finalparts.pop(-1) |
| else: |
| finalparts.append(part) |
| |
| # finally, return the calculated path |
| return self._parts_to_path(finalparts) |
| |
| def _format_date(self, date): |
| date = core.svn_time_from_cstring(date) |
| date = time.asctime(time.localtime(date / 1000000)) |
| return date[4:-8] |
| |
| def _do_path_landing(self): |
| """try to land on self.path as a directory in root, failing up to '/'""" |
| not_found = 1 |
| newpath = self.path |
| while not_found: |
| kind = fs.check_path(self.root, newpath) |
| if kind == core.svn_node_dir: |
| not_found = 0 |
| else: |
| parts = self._path_to_parts(newpath) |
| parts.pop(-1) |
| newpath = self._parts_to_path(parts) |
| self.path = newpath |
| |
| def _setup_prompt(self): |
| """present the prompt and handle the user's input""" |
| if self.is_rev: |
| self.prompt = "<rev: " + str(self.rev) |
| else: |
| self.prompt = "<txn: " + self.txn |
| self.prompt += " " + self.path + ">$ " |
| |
| def _complete(self, text, line, begidx, endidx, limit_node_kind=None): |
| """Generic tab completer. Takes the 4 standard parameters passed to a |
| cmd.Cmd completer function, plus LIMIT_NODE_KIND, which should be a |
| svn.core.svn_node_foo constant to restrict the returned completions to, or |
| None for no limit. Catches and displays exceptions, because otherwise |
| they are silently ignored - which is quite frustrating when debugging!""" |
| try: |
| args = line.split() |
| if len(args) > 1: |
| arg = args[1] |
| else: |
| arg = "" |
| dirs = arg.split('/') |
| user_elem = dirs[-1] |
| user_dir = "/".join(dirs[:-1] + ['']) |
| |
| canon_dir = self._parse_path(user_dir) |
| |
| entries = fs.dir_entries(self.root, canon_dir) |
| acceptable_completions = [] |
| for name, dirent_t in entries.items(): |
| if not name.startswith(user_elem): |
| continue |
| if limit_node_kind and dirent_t.kind != limit_node_kind: |
| continue |
| if dirent_t.kind == core.svn_node_dir: |
| name += '/' |
| acceptable_completions.append(name) |
| if limit_node_kind == core.svn_node_dir or not limit_node_kind: |
| if user_elem in ('.', '..'): |
| for extraname in ('.', '..'): |
| if extraname.startswith(user_elem): |
| acceptable_completions.append(extraname + '/') |
| return acceptable_completions |
| except: |
| ei = sys.exc_info() |
| sys.stderr.write("EXCEPTION WHILST COMPLETING\n") |
| import traceback |
| traceback.print_tb(ei[2]) |
| sys.stderr.write("%s: %s\n" % (ei[0], ei[1])) |
| raise |
| |
| def complete_cd(self, text, line, begidx, endidx): |
| return self._complete(text, line, begidx, endidx, core.svn_node_dir) |
| |
| def complete_cat(self, text, line, begidx, endidx): |
| return self._complete(text, line, begidx, endidx, core.svn_node_file) |
| |
| def complete_ls(self, text, line, begidx, endidx): |
| return self._complete(text, line, begidx, endidx) |
| |
| def complete_pcat(self, text, line, begidx, endidx): |
| return self._complete(text, line, begidx, endidx) |
| |
| |
| def _basename(path): |
| "Return the basename for a '/'-separated path." |
| idx = path.rfind('/') |
| if idx == -1: |
| return path |
| return path[idx+1:] |
| |
| |
| def usage(exit): |
| if exit: |
| output = sys.stderr |
| else: |
| output = sys.stdout |
| output.write( |
| "usage: %s REPOS_PATH\n" |
| "\n" |
| "Once the program has started, type 'help' at the prompt for hints on\n" |
| "using the shell.\n" % sys.argv[0]) |
| sys.exit(exit) |
| |
| def main(): |
| if len(sys.argv) != 2: |
| usage(1) |
| |
| SVNShell(sys.argv[1]) |
| |
| if __name__ == '__main__': |
| main() |