|  | #!/usr/bin/env python | 
|  | # | 
|  | # svnshell.py : a Python-based shell interface for cruising 'round in | 
|  | #               the filesystem. | 
|  | # | 
|  | ###################################################################### | 
|  | # | 
|  | # Copyright (c) 2000-2004 CollabNet.  All rights reserved. | 
|  | # | 
|  | # This software is licensed as described in the file COPYING, which | 
|  | # you should have received as part of this distribution.  The terms | 
|  | # are also available at http://subversion.tigris.org/license-1.html. | 
|  | # If newer versions of this license are posted there, you may use a | 
|  | # newer version instead, at your option. | 
|  | # | 
|  | ###################################################################### | 
|  | # | 
|  |  | 
|  | import sys | 
|  | import string | 
|  | import time | 
|  | import re | 
|  | from cmd import Cmd | 
|  | from random import randint | 
|  | from svn import fs, core, repos | 
|  |  | 
|  |  | 
|  | class SVNShell(Cmd): | 
|  | def __init__(self, path): | 
|  | """initialize an SVNShell object""" | 
|  | Cmd.__init__(self) | 
|  | path = core.svn_path_canonicalize(path) | 
|  | self.fs_ptr = repos.fs(repos.open(path)) | 
|  | self.is_rev = 1 | 
|  | self.rev = fs.youngest_rev(self.fs_ptr) | 
|  | self.txn = None | 
|  | self.root = fs.revision_root(self.fs_ptr, self.rev) | 
|  | self.path = "/" | 
|  | self._setup_prompt() | 
|  | self.cmdloop() | 
|  |  | 
|  | def precmd(self, line): | 
|  | if line == "EOF": | 
|  | # Ctrl-D is a command without a newline.  Print a newline, so the next | 
|  | # shell prompt is not on the same line as the last svnshell prompt. | 
|  | print | 
|  | return "exit" | 
|  | return line | 
|  |  | 
|  | def postcmd(self, stop, line): | 
|  | self._setup_prompt() | 
|  |  | 
|  | _errors = ["Huh?", | 
|  | "Whatchoo talkin' 'bout, Willis?", | 
|  | "Say what?", | 
|  | "Nope.  Not gonna do it.", | 
|  | "Ehh...I don't think so, chief."] | 
|  |  | 
|  | def default(self, line): | 
|  | print self._errors[randint(0, len(self._errors) - 1)] | 
|  |  | 
|  | def do_cat(self, arg): | 
|  | """dump the contents of a file""" | 
|  | if not len(arg): | 
|  | print "You must supply a file path." | 
|  | return | 
|  | catpath = self._parse_path(arg) | 
|  | kind = fs.check_path(self.root, catpath) | 
|  | if kind == core.svn_node_none: | 
|  | print "Path '%s' does not exist." % catpath | 
|  | return | 
|  | if kind == core.svn_node_dir: | 
|  | print "Path '%s' is not a file." % catpath | 
|  | return | 
|  | ### be nice to get some paging in here. | 
|  | stream = fs.file_contents(self.root, catpath) | 
|  | while 1: | 
|  | data = core.svn_stream_read(stream, core.SVN_STREAM_CHUNK_SIZE) | 
|  | sys.stdout.write(data) | 
|  | if len(data) < core.SVN_STREAM_CHUNK_SIZE: | 
|  | break | 
|  |  | 
|  | def do_cd(self, arg): | 
|  | """change directory""" | 
|  | newpath = self._parse_path(arg) | 
|  |  | 
|  | # make sure that path actually exists in the filesystem as a directory | 
|  | kind = fs.check_path(self.root, newpath) | 
|  | if kind != core.svn_node_dir: | 
|  | print "Path '%s' is not a valid filesystem directory." % newpath | 
|  | return | 
|  | self.path = newpath | 
|  |  | 
|  | def do_ls(self, arg): | 
|  | """list the contents of the current directory or provided path""" | 
|  | parent = self.path | 
|  | if not len(arg): | 
|  | # no arg -- show a listing for the current directory. | 
|  | entries = fs.dir_entries(self.root, self.path) | 
|  | else: | 
|  | # arg?  show a listing of that path. | 
|  | newpath = self._parse_path(arg) | 
|  | kind = fs.check_path(self.root, newpath) | 
|  | if kind == core.svn_node_dir: | 
|  | parent = newpath | 
|  | entries = fs.dir_entries(self.root, parent) | 
|  | elif kind == core.svn_node_file: | 
|  | parts = self._path_to_parts(newpath) | 
|  | name = parts.pop(-1) | 
|  | parent = self._parts_to_path(parts) | 
|  | print parent + ':' + name | 
|  | tmpentries = fs.dir_entries(self.root, parent) | 
|  | if not tmpentries.get(name, None): | 
|  | return | 
|  | entries = {} | 
|  | entries[name] = tmpentries[name] | 
|  | else: | 
|  | print "Path '%s' not found." % newpath | 
|  | return | 
|  |  | 
|  | keys = entries.keys() | 
|  | keys.sort() | 
|  |  | 
|  | print "   REV   AUTHOR  NODE-REV-ID     SIZE         DATE NAME" | 
|  | print "----------------------------------------------------------------------------" | 
|  |  | 
|  | for entry in keys: | 
|  | fullpath = parent + '/' + entry | 
|  | size = '' | 
|  | is_dir = fs.is_dir(self.root, fullpath) | 
|  | if is_dir: | 
|  | name = entry + '/' | 
|  | else: | 
|  | size = str(fs.file_length(self.root, fullpath)) | 
|  | name = entry | 
|  | node_id = fs.unparse_id(entries[entry].id) | 
|  | created_rev = fs.node_created_rev(self.root, fullpath) | 
|  | author = fs.revision_prop(self.fs_ptr, created_rev, | 
|  | core.SVN_PROP_REVISION_AUTHOR) | 
|  | if not author: | 
|  | author = "" | 
|  | date = fs.revision_prop(self.fs_ptr, created_rev, | 
|  | core.SVN_PROP_REVISION_DATE) | 
|  | if not date: | 
|  | date = "" | 
|  | else: | 
|  | date = self._format_date(date) | 
|  |  | 
|  | print "%6s %8s %12s %8s %12s %s" % (created_rev, author[:8], | 
|  | node_id, size, date, name) | 
|  |  | 
|  | def do_lstxns(self, arg): | 
|  | """list the transactions available for browsing""" | 
|  | txns = fs.list_transactions(self.fs_ptr) | 
|  | txns.sort() | 
|  | counter = 0 | 
|  | for txn in txns: | 
|  | counter = counter + 1 | 
|  | print "%8s  " % txn, | 
|  | if counter == 6: | 
|  | print "" | 
|  | counter = 0 | 
|  | print "" | 
|  |  | 
|  | def do_pcat(self, arg): | 
|  | """list the properties of a path""" | 
|  | catpath = self.path | 
|  | if len(arg): | 
|  | catpath = self._parse_path(arg) | 
|  | kind = fs.check_path(self.root, catpath) | 
|  | if kind == core.svn_node_none: | 
|  | print "Path '%s' does not exist." % catpath | 
|  | return | 
|  | plist = fs.node_proplist(self.root, catpath) | 
|  | if not plist: | 
|  | return | 
|  | for pkey, pval in plist.items(): | 
|  | print 'K ' + str(len(pkey)) | 
|  | print pkey | 
|  | print 'P ' + str(len(pval)) | 
|  | print pval | 
|  | print 'PROPS-END' | 
|  |  | 
|  | def do_setrev(self, arg): | 
|  | """set the current revision to view""" | 
|  | try: | 
|  | if arg.lower() == 'head': | 
|  | rev = fs.youngest_rev(self.fs_ptr) | 
|  | else: | 
|  | rev = int(arg) | 
|  | newroot = fs.revision_root(self.fs_ptr, rev) | 
|  | except: | 
|  | print "Error setting the revision to '" + arg + "'." | 
|  | return | 
|  | fs.close_root(self.root) | 
|  | self.root = newroot | 
|  | self.rev = rev | 
|  | self.is_rev = 1 | 
|  | self._do_path_landing() | 
|  |  | 
|  | def do_settxn(self, arg): | 
|  | """set the current transaction to view""" | 
|  | try: | 
|  | txnobj = fs.open_txn(self.fs_ptr, arg) | 
|  | newroot = fs.txn_root(txnobj) | 
|  | except: | 
|  | print "Error setting the transaction to '" + arg + "'." | 
|  | return | 
|  | fs.close_root(self.root) | 
|  | self.root = newroot | 
|  | self.txn = arg | 
|  | self.is_rev = 0 | 
|  | self._do_path_landing() | 
|  |  | 
|  | def do_youngest(self, arg): | 
|  | """list the youngest revision available for browsing""" | 
|  | rev = fs.youngest_rev(self.fs_ptr) | 
|  | print rev | 
|  |  | 
|  | def do_exit(self, arg): | 
|  | sys.exit(0) | 
|  |  | 
|  | def _path_to_parts(self, path): | 
|  | return filter(None, string.split(path, '/')) | 
|  |  | 
|  | def _parts_to_path(self, parts): | 
|  | return '/' + string.join(parts, '/') | 
|  |  | 
|  | def _parse_path(self, path): | 
|  | # cleanup leading, trailing, and duplicate '/' characters | 
|  | newpath = self._parts_to_path(self._path_to_parts(path)) | 
|  |  | 
|  | # if PATH is absolute, use it, else append it to the existing path. | 
|  | if path.startswith('/') or self.path == '/': | 
|  | newpath = '/' + newpath | 
|  | else: | 
|  | newpath = self.path + '/' + newpath | 
|  |  | 
|  | # cleanup '.' and '..' | 
|  | parts = self._path_to_parts(newpath) | 
|  | finalparts = [] | 
|  | for part in parts: | 
|  | if part == '.': | 
|  | pass | 
|  | elif part == '..': | 
|  | if len(finalparts) != 0: | 
|  | finalparts.pop(-1) | 
|  | else: | 
|  | finalparts.append(part) | 
|  |  | 
|  | # finally, return the calculated path | 
|  | return self._parts_to_path(finalparts) | 
|  |  | 
|  | def _format_date(self, date): | 
|  | date = core.svn_time_from_cstring(date) | 
|  | date = time.asctime(time.localtime(date / 1000000)) | 
|  | return date[4:-8] | 
|  |  | 
|  | def _do_path_landing(self): | 
|  | """try to land on self.path as a directory in root, failing up to '/'""" | 
|  | not_found = 1 | 
|  | newpath = self.path | 
|  | while not_found: | 
|  | kind = fs.check_path(self.root, newpath) | 
|  | if kind == core.svn_node_dir: | 
|  | not_found = 0 | 
|  | else: | 
|  | parts = self._path_to_parts(newpath) | 
|  | parts.pop(-1) | 
|  | newpath = self._parts_to_path(parts) | 
|  | self.path = newpath | 
|  |  | 
|  | def _setup_prompt(self): | 
|  | """present the prompt and handle the user's input""" | 
|  | if self.is_rev: | 
|  | self.prompt = "<rev: " + str(self.rev) | 
|  | else: | 
|  | self.prompt = "<txn: " + self.txn | 
|  | self.prompt += " " + self.path + ">$ " | 
|  |  | 
|  | def _complete(self, text, line, begidx, endidx, limit_node_kind=None): | 
|  | """Generic tab completer.  Takes the 4 standard parameters passed to a | 
|  | cmd.Cmd completer function, plus LIMIT_NODE_KIND, which should be a | 
|  | svn.core.svn_node_foo constant to restrict the returned completions to, or | 
|  | None for no limit.  Catches and displays exceptions, because otherwise | 
|  | they are silently ignored - which is quite frustrating when debugging!""" | 
|  | try: | 
|  | args = line.split() | 
|  | if len(args) > 1: | 
|  | arg = args[1] | 
|  | else: | 
|  | arg = "" | 
|  | dirs = arg.split('/') | 
|  | user_elem = dirs[-1] | 
|  | user_dir = "/".join(dirs[:-1] + ['']) | 
|  |  | 
|  | canon_dir = self._parse_path(user_dir) | 
|  |  | 
|  | entries = fs.dir_entries(self.root, canon_dir) | 
|  | acceptable_completions = [] | 
|  | for name, dirent_t in entries.items(): | 
|  | if not name.startswith(user_elem): | 
|  | continue | 
|  | if limit_node_kind and dirent_t.kind != limit_node_kind: | 
|  | continue | 
|  | if dirent_t.kind == core.svn_node_dir: | 
|  | name += '/' | 
|  | acceptable_completions.append(name) | 
|  | if limit_node_kind == core.svn_node_dir or not limit_node_kind: | 
|  | if user_elem in ('.', '..'): | 
|  | for extraname in ('.', '..'): | 
|  | if extraname.startswith(user_elem): | 
|  | acceptable_completions.append(extraname + '/') | 
|  | return acceptable_completions | 
|  | except: | 
|  | ei = sys.exc_info() | 
|  | sys.stderr.write("EXCEPTION WHILST COMPLETING\n") | 
|  | import traceback | 
|  | traceback.print_tb(ei[2]) | 
|  | sys.stderr.write("%s: %s\n" % (ei[0], ei[1])) | 
|  | raise | 
|  |  | 
|  | def complete_cd(self, text, line, begidx, endidx): | 
|  | return self._complete(text, line, begidx, endidx, core.svn_node_dir) | 
|  |  | 
|  | def complete_cat(self, text, line, begidx, endidx): | 
|  | return self._complete(text, line, begidx, endidx, core.svn_node_file) | 
|  |  | 
|  | def complete_ls(self, text, line, begidx, endidx): | 
|  | return self._complete(text, line, begidx, endidx) | 
|  |  | 
|  | def complete_pcat(self, text, line, begidx, endidx): | 
|  | return self._complete(text, line, begidx, endidx) | 
|  |  | 
|  |  | 
|  | def _basename(path): | 
|  | "Return the basename for a '/'-separated path." | 
|  | idx = string.rfind(path, '/') | 
|  | if idx == -1: | 
|  | return path | 
|  | return path[idx+1:] | 
|  |  | 
|  |  | 
|  | def usage(exit): | 
|  | if exit: | 
|  | output = sys.stderr | 
|  | else: | 
|  | output = sys.stdout | 
|  | output.write( | 
|  | "usage: %s REPOS_PATH\n" | 
|  | "\n" | 
|  | "Once the program has started, type 'help' at the prompt for hints on\n" | 
|  | "using the shell.\n" % sys.argv[0]) | 
|  | sys.exit(exit) | 
|  |  | 
|  | def main(): | 
|  | if len(sys.argv) != 2: | 
|  | usage(1) | 
|  |  | 
|  | SVNShell(sys.argv[1]) | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | main() |