blob: 9c67af4664c470c421e272ef4a9ef850fcd80271 [file] [log] [blame]
#!/usr/bin/env python
#
# svnshell.py : a Python-based shell interface for cruising 'round in
# the filesystem.
#
######################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
######################################################################
#
import sys
import time
import re
from cmd import Cmd
from random import randint
from svn import fs, core, repos
class SVNShell(Cmd):
def __init__(self, path):
"""initialize an SVNShell object"""
Cmd.__init__(self)
path = core.svn_path_canonicalize(path)
self.fs_ptr = repos.fs(repos.open(path))
self.is_rev = 1
self.rev = fs.youngest_rev(self.fs_ptr)
self.txn = None
self.root = fs.revision_root(self.fs_ptr, self.rev)
self.path = "/"
self._setup_prompt()
self.cmdloop()
def precmd(self, line):
if line == "EOF":
# Ctrl-D is a command without a newline. Print a newline, so the next
# shell prompt is not on the same line as the last svnshell prompt.
print("")
return "exit"
return line
def postcmd(self, stop, line):
self._setup_prompt()
_errors = ["Huh?",
"Whatchoo talkin' 'bout, Willis?",
"Say what?",
"Nope. Not gonna do it.",
"Ehh...I don't think so, chief."]
def default(self, line):
print(self._errors[randint(0, len(self._errors) - 1)])
def do_cat(self, arg):
"""dump the contents of a file"""
if not len(arg):
print("You must supply a file path.")
return
catpath = self._parse_path(arg)
kind = fs.check_path(self.root, catpath)
if kind == core.svn_node_none:
print("Path '%s' does not exist." % catpath)
return
if kind == core.svn_node_dir:
print("Path '%s' is not a file." % catpath)
return
### be nice to get some paging in here.
stream = fs.file_contents(self.root, catpath)
while True:
data = core.svn_stream_read(stream, core.SVN_STREAM_CHUNK_SIZE)
sys.stdout.write(data)
if len(data) < core.SVN_STREAM_CHUNK_SIZE:
break
def do_cd(self, arg):
"""change directory"""
newpath = self._parse_path(arg)
# make sure that path actually exists in the filesystem as a directory
kind = fs.check_path(self.root, newpath)
if kind != core.svn_node_dir:
print("Path '%s' is not a valid filesystem directory." % newpath)
return
self.path = newpath
def do_ls(self, arg):
"""list the contents of the current directory or provided path"""
parent = self.path
if not len(arg):
# no arg -- show a listing for the current directory.
entries = fs.dir_entries(self.root, self.path)
else:
# arg? show a listing of that path.
newpath = self._parse_path(arg)
kind = fs.check_path(self.root, newpath)
if kind == core.svn_node_dir:
parent = newpath
entries = fs.dir_entries(self.root, parent)
elif kind == core.svn_node_file:
parts = self._path_to_parts(newpath)
name = parts.pop(-1)
parent = self._parts_to_path(parts)
print(parent + ':' + name)
tmpentries = fs.dir_entries(self.root, parent)
if not tmpentries.get(name, None):
return
entries = {}
entries[name] = tmpentries[name]
else:
print("Path '%s' not found." % newpath)
return
keys = sorted(entries.keys())
print(" REV AUTHOR NODE-REV-ID SIZE DATE NAME")
print("----------------------------------------------------------------------------")
for entry in keys:
fullpath = parent + '/' + entry
size = ''
is_dir = fs.is_dir(self.root, fullpath)
if is_dir:
name = entry + '/'
else:
size = str(fs.file_length(self.root, fullpath))
name = entry
node_id = fs.unparse_id(entries[entry].id)
created_rev = fs.node_created_rev(self.root, fullpath)
author = fs.revision_prop(self.fs_ptr, created_rev,
core.SVN_PROP_REVISION_AUTHOR)
if not author:
author = ""
date = fs.revision_prop(self.fs_ptr, created_rev,
core.SVN_PROP_REVISION_DATE)
if not date:
date = ""
else:
date = self._format_date(date)
print("%6s %8s %12s %8s %12s %s" % (created_rev, author[:8],
node_id, size, date, name))
def do_lstxns(self, arg):
"""list the transactions available for browsing"""
txns = sorted(fs.list_transactions(self.fs_ptr))
counter = 0
for txn in txns:
counter = counter + 1
sys.stdout.write("%8s " % txn)
if counter == 6:
print("")
counter = 0
print("")
def do_pcat(self, arg):
"""list the properties of a path"""
catpath = self.path
if len(arg):
catpath = self._parse_path(arg)
kind = fs.check_path(self.root, catpath)
if kind == core.svn_node_none:
print("Path '%s' does not exist." % catpath)
return
plist = fs.node_proplist(self.root, catpath)
if not plist:
return
for pkey, pval in plist.items():
print('K ' + str(len(pkey)))
print(pkey)
print('P ' + str(len(pval)))
print(pval)
print('PROPS-END')
def do_setrev(self, arg):
"""set the current revision to view"""
try:
if arg.lower() == 'head':
rev = fs.youngest_rev(self.fs_ptr)
else:
rev = int(arg)
newroot = fs.revision_root(self.fs_ptr, rev)
except:
print("Error setting the revision to '" + arg + "'.")
return
fs.close_root(self.root)
self.root = newroot
self.rev = rev
self.is_rev = 1
self._do_path_landing()
def do_settxn(self, arg):
"""set the current transaction to view"""
try:
txnobj = fs.open_txn(self.fs_ptr, arg)
newroot = fs.txn_root(txnobj)
except:
print("Error setting the transaction to '" + arg + "'.")
return
fs.close_root(self.root)
self.root = newroot
self.txn = arg
self.is_rev = 0
self._do_path_landing()
def do_youngest(self, arg):
"""list the youngest revision available for browsing"""
rev = fs.youngest_rev(self.fs_ptr)
print(rev)
def do_exit(self, arg):
sys.exit(0)
def _path_to_parts(self, path):
return [_f for _f in path.split('/') if _f]
def _parts_to_path(self, parts):
return '/' + '/'.join(parts)
def _parse_path(self, path):
# cleanup leading, trailing, and duplicate '/' characters
newpath = self._parts_to_path(self._path_to_parts(path))
# if PATH is absolute, use it, else append it to the existing path.
if path.startswith('/') or self.path == '/':
newpath = '/' + newpath
else:
newpath = self.path + '/' + newpath
# cleanup '.' and '..'
parts = self._path_to_parts(newpath)
finalparts = []
for part in parts:
if part == '.':
pass
elif part == '..':
if len(finalparts) != 0:
finalparts.pop(-1)
else:
finalparts.append(part)
# finally, return the calculated path
return self._parts_to_path(finalparts)
def _format_date(self, date):
date = core.svn_time_from_cstring(date)
date = time.asctime(time.localtime(date / 1000000))
return date[4:-8]
def _do_path_landing(self):
"""try to land on self.path as a directory in root, failing up to '/'"""
not_found = 1
newpath = self.path
while not_found:
kind = fs.check_path(self.root, newpath)
if kind == core.svn_node_dir:
not_found = 0
else:
parts = self._path_to_parts(newpath)
parts.pop(-1)
newpath = self._parts_to_path(parts)
self.path = newpath
def _setup_prompt(self):
"""present the prompt and handle the user's input"""
if self.is_rev:
self.prompt = "<rev: " + str(self.rev)
else:
self.prompt = "<txn: " + self.txn
self.prompt += " " + self.path + ">$ "
def _complete(self, text, line, begidx, endidx, limit_node_kind=None):
"""Generic tab completer. Takes the 4 standard parameters passed to a
cmd.Cmd completer function, plus LIMIT_NODE_KIND, which should be a
svn.core.svn_node_foo constant to restrict the returned completions to, or
None for no limit. Catches and displays exceptions, because otherwise
they are silently ignored - which is quite frustrating when debugging!"""
try:
args = line.split()
if len(args) > 1:
arg = args[1]
else:
arg = ""
dirs = arg.split('/')
user_elem = dirs[-1]
user_dir = "/".join(dirs[:-1] + [''])
canon_dir = self._parse_path(user_dir)
entries = fs.dir_entries(self.root, canon_dir)
acceptable_completions = []
for name, dirent_t in entries.items():
if not name.startswith(user_elem):
continue
if limit_node_kind and dirent_t.kind != limit_node_kind:
continue
if dirent_t.kind == core.svn_node_dir:
name += '/'
acceptable_completions.append(name)
if limit_node_kind == core.svn_node_dir or not limit_node_kind:
if user_elem in ('.', '..'):
for extraname in ('.', '..'):
if extraname.startswith(user_elem):
acceptable_completions.append(extraname + '/')
return acceptable_completions
except:
ei = sys.exc_info()
sys.stderr.write("EXCEPTION WHILST COMPLETING\n")
import traceback
traceback.print_tb(ei[2])
sys.stderr.write("%s: %s\n" % (ei[0], ei[1]))
raise
def complete_cd(self, text, line, begidx, endidx):
return self._complete(text, line, begidx, endidx, core.svn_node_dir)
def complete_cat(self, text, line, begidx, endidx):
return self._complete(text, line, begidx, endidx, core.svn_node_file)
def complete_ls(self, text, line, begidx, endidx):
return self._complete(text, line, begidx, endidx)
def complete_pcat(self, text, line, begidx, endidx):
return self._complete(text, line, begidx, endidx)
def _basename(path):
"Return the basename for a '/'-separated path."
idx = path.rfind('/')
if idx == -1:
return path
return path[idx+1:]
def usage(exit):
if exit:
output = sys.stderr
else:
output = sys.stdout
output.write(
"usage: %s REPOS_PATH\n"
"\n"
"Once the program has started, type 'help' at the prompt for hints on\n"
"using the shell.\n" % sys.argv[0])
sys.exit(exit)
def main():
if len(sys.argv) != 2:
usage(1)
SVNShell(sys.argv[1])
if __name__ == '__main__':
main()