blob: 0f9a98933d06bbc0e3fff39e6d25f53576faa2df [file] [log] [blame]
#!/usr/bin/env python
# : alternative svnlook in Python with library API
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# $HeadURL$
# $LastChangedDate$
# $LastChangedRevision$
""" can also be used as a Python module::
>>> import svnlook
>>> svnlook = svnlook.SVNLook("/testrepo")
>>> svnlook.get_author()
Accessible API::
[x] author
[x] changed
[x] date
[ ] diff
[x] dirs-changed
[ ] ids
[x] info
[x] log
[ ] tree
[ ] generator API to avoid passing lists
import sys
import time
import os
from svn import core, fs, delta, repos
class SVNLook(object):
def __init__(self, path, rev=None, txn=None, cmd=None):
path - path to repository
rev - revision number
txn - name of transaction (usually the one about to be committed)
cmd - if set, specifies cmd_* method to execute
txn takes precedence over rev; if both are None, inspect the head revision
path = core.svn_path_canonicalize(path)
repos_ptr =
self.fs_ptr = repos.fs(repos_ptr)
# if set, txn takes precendence
if txn:
self.txn_ptr = fs.open_txn(self.fs_ptr, txn)
self.txn_ptr = None
if rev is None:
rev = fs.youngest_rev(self.fs_ptr)
rev = int(rev)
self.rev = rev
if cmd != None:
getattr(self, 'cmd_' + cmd)()
def cmd_default(self):
def cmd_author(self):
print(self.get_author() or '')
def cmd_changed(self):
for status, path in self.get_changed():
print("%-3s %s" % (status, path))
def cmd_date(self):
# duplicate original svnlook format, which is
# 2010-02-08 21:53:15 +0200 (Mon, 08 Feb 2010)
secs = self.get_date(unixtime=True)
if secs is None:
# convert to tuple, detect time zone and format
stamp = time.localtime(secs)
isdst = stamp.tm_isdst
utcoffset = -(time.altzone if (time.daylight and isdst) else time.timezone) // 60
suffix = "%+03d%02d" % (utcoffset // 60, abs(utcoffset) % 60)
outstr = time.strftime('%Y-%m-%d %H:%M:%S ', stamp) + suffix
outstr += time.strftime(' (%a, %d %b %Y)', stamp)
def cmd_diff(self):
self._print_tree(DiffEditor, pass_root=1)
def cmd_dirs_changed(self):
for dir in self.get_changed_dirs():
def cmd_ids(self):
self._print_tree(Editor, base_rev=0, pass_root=1)
def cmd_info(self):
"""print the author, data, log_size, and log message"""
log = self.get_log() or ''
def cmd_log(self):
print(self.get_log() or '')
def cmd_tree(self):
self._print_tree(Editor, base_rev=0)
# --- API getters
def get_author(self):
"""return string with the author name or None"""
return self._get_property(core.SVN_PROP_REVISION_AUTHOR)
def get_changed(self):
"""return list of tuples (status, path)"""
ret = []
def list_callback(status, path):
ret.append( (status, path) )
self._walk_tree(ChangedEditor, pass_root=1, callback=list_callback)
return ret
def get_date(self, unixtime=False):
"""return commit timestamp in RFC 3339 format (2010-02-08T20:37:25.195000Z)
if unixtime is True, return unix timestamp
return None for a txn, or if date property is not set
if self.txn_ptr:
return None
date = self._get_property(core.SVN_PROP_REVISION_DATE)
if not unixtime or date == None:
return date
# convert to unix time
aprtime = core.svn_time_from_cstring(date)
# ### convert to a time_t; this requires intimate knowledge of
# ### the apr_time_t type
secs = aprtime / 1000000 # aprtime is microseconds; make seconds
return secs
def get_changed_dirs(self):
"""return list of changed dirs
dir names end with trailing forward slash even on windows
dirlist = []
def list_callback(item):
self._walk_tree(DirsChangedEditor, callback=list_callback)
return dirlist
def get_log(self):
"""return log message string or None if not present"""
return self._get_property(core.SVN_PROP_REVISION_LOG)
# --- Internal helpers
def _get_property(self, name):
if self.txn_ptr:
return fs.txn_prop(self.txn_ptr, name)
return fs.revision_prop(self.fs_ptr, self.rev, name)
def _print_tree(self, e_factory, base_rev=None, pass_root=0):
def print_callback(msg):
self._walk_tree(e_factory, base_rev, pass_root, callback=print_callback)
# svn fs, delta, repos calls needs review according to DeltaEditor documentation
def _walk_tree(self, e_factory, base_rev=None, pass_root=0, callback=None):
if base_rev is None:
# a specific base rev was not provided. use the transaction base,
# or the previous revision
if self.txn_ptr:
base_rev = fs.txn_base_revision(self.txn_ptr)
elif self.rev == 0:
base_rev = 0
base_rev = self.rev - 1
# get the current root
if self.txn_ptr:
root = fs.txn_root(self.txn_ptr)
root = fs.revision_root(self.fs_ptr, self.rev)
# the base of the comparison
base_root = fs.revision_root(self.fs_ptr, base_rev)
if callback == None:
callback = lambda msg: None
if pass_root:
editor = e_factory(root, base_root, callback)
editor = e_factory(callback=callback)
# construct the editor for printing these things out
e_ptr, e_baton = delta.make_editor(editor)
# compute the delta, printing as we go
def authz_cb(root, path, pool):
return 1
repos.dir_delta(base_root, '', '', root, '',
e_ptr, e_baton, authz_cb, 0, 1, 0, 0)
# ---------------------------------------------------------
# Delta Editors. For documentation see:
# this one doesn't process delete_entry, change_dir_prop, apply_text_delta,
# change_file_prop, close_file, close_edit, abort_edit
# ?set_target_revision
# need review
class Editor(delta.Editor):
def __init__(self, root=None, base_root=None, callback=None):
"""callback argument is unused for this editor"""
self.root = root
# base_root ignored
self.indent = ''
def open_root(self, base_revision, dir_pool):
print('/' + self._get_id('/'))
self.indent = self.indent + ' ' # indent one space
def add_directory(self, path, *args):
id = self._get_id(path)
print(self.indent + _basename(path) + '/' + id)
self.indent = self.indent + ' ' # indent one space
# we cheat. one method implementation for two entry points.
open_directory = add_directory
def close_directory(self, baton):
# note: if indents are being performed, this slice just returns
# another empty string.
self.indent = self.indent[:-1]
def add_file(self, path, *args):
id = self._get_id(path)
print(self.indent + _basename(path) + id)
# we cheat. one method implementation for two entry points.
open_file = add_file
def _get_id(self, path):
if self.root:
id = fs.node_id(self.root, path)
return ' <%s>' % fs.unparse_id(id)
return ''
# doesn't process close_directory, apply_text_delta,
# change_file_prop, close_file, close_edit, abort_edit
# ?set_target_revision
class DirsChangedEditor(delta.Editor):
"""print names of changed dirs, callback(dir) is a printer function"""
def __init__(self, callback):
self.callback = callback
def open_root(self, base_revision, dir_pool):
return [ 1, '' ]
def delete_entry(self, path, revision, parent_baton, pool):
def add_directory(self, path, parent_baton,
copyfrom_path, copyfrom_revision, dir_pool):
return [ 1, path ]
def open_directory(self, path, parent_baton, base_revision, dir_pool):
return [ 1, path ]
def change_dir_prop(self, dir_baton, name, value, pool):
def add_file(self, path, parent_baton,
copyfrom_path, copyfrom_revision, file_pool):
def open_file(self, path, parent_baton, base_revision, file_pool):
# some kind of change is going to happen
def _dir_changed(self, baton):
if baton[0]:
# the directory hasn't been printed yet. do it.
self.callback(baton[1] + '/')
baton[0] = 0
class ChangedEditor(delta.Editor):
def __init__(self, root, base_root, callback):
"""callback(status, path) is a printer function"""
self.root = root
self.base_root = base_root
self.callback = callback
def open_root(self, base_revision, dir_pool):
return [ 1, '' ]
def delete_entry(self, path, revision, parent_baton, pool):
### need more logic to detect 'replace'
if fs.is_dir(self.base_root, '/' + path):
self.callback('D', path + '/')
self.callback('D', path)
def add_directory(self, path, parent_baton,
copyfrom_path, copyfrom_revision, dir_pool):
self.callback('A', path + '/')
return [ 0, path ]
def open_directory(self, path, parent_baton, base_revision, dir_pool):
return [ 1, path ]
def change_dir_prop(self, dir_baton, name, value, pool):
if dir_baton[0]:
# the directory hasn't been printed yet. do it.
self.callback('_U', dir_baton[1] + '/')
dir_baton[0] = 0
def add_file(self, path, parent_baton,
copyfrom_path, copyfrom_revision, file_pool):
self.callback('A', path)
return [ '_', ' ', None ]
def open_file(self, path, parent_baton, base_revision, file_pool):
return [ '_', ' ', path ]
def apply_textdelta(self, file_baton, base_checksum):
file_baton[0] = 'U'
# no handler
return None
def change_file_prop(self, file_baton, name, value, pool):
file_baton[1] = 'U'
def close_file(self, file_baton, text_checksum):
text_mod, prop_mod, path = file_baton
# test the path. it will be None if we added this file.
if path:
status = text_mod + prop_mod
# was there some kind of change?
if status != '_ ':
self.callback(status.rstrip(), path)
class DiffEditor(delta.Editor):
def __init__(self, root, base_root, callback=None):
"""callback argument is unused for this editor"""
self.root = root
self.base_root = base_root
self.target_revision = 0
def _do_diff(self, base_path, path):
if base_path is None:
print("Added: " + path)
label = path
elif path is None:
print("Removed: " + base_path)
label = base_path
print("Modified: " + path)
label = path
print("===============================================================" + \
args = []
args.append(label + "\t(original)")
args.append(label + "\t(new)")
differ = fs.FileDiff(self.base_root, base_path, self.root,
path, diffoptions=args)
pobj = differ.get_pipe()
while True:
line = pobj.readline()
if not line:
sys.stdout.write("%s " % line)
def _do_prop_diff(self, path, prop_name, prop_val, pool):
print("Property changes on: " + path)
print("_______________________________________________________________" + \
old_prop_val = None
old_prop_val = fs.node_prop(self.base_root, path, prop_name, pool)
except core.SubversionException:
pass # Must be a new path
if old_prop_val:
if prop_val:
print("Modified: " + prop_name)
print(" - " + str(old_prop_val))
print(" + " + str(prop_val))
print("Deleted: " + prop_name)
print(" - " + str(old_prop_val))
print("Added: " + prop_name)
print(" + " + str(prop_val))
def delete_entry(self, path, revision, parent_baton, pool):
### need more logic to detect 'replace'
if not fs.is_dir(self.base_root, '/' + path):
self._do_diff(path, None)
def add_directory(self, path, parent_baton, copyfrom_path,
copyfrom_revision, dir_pool):
return [ 1, path ]
def add_file(self, path, parent_baton,
copyfrom_path, copyfrom_revision, file_pool):
self._do_diff(None, path)
return [ '_', ' ', None ]
def open_root(self, base_revision, dir_pool):
return [ 1, '' ]
def open_directory(self, path, parent_baton, base_revision, dir_pool):
return [ 1, path ]
def open_file(self, path, parent_baton, base_revision, file_pool):
return [ '_', ' ', path ]
def apply_textdelta(self, file_baton, base_checksum):
if file_baton[2] is not None:
self._do_diff(file_baton[2], file_baton[2])
return None
def change_file_prop(self, file_baton, name, value, pool):
if file_baton[2] is not None:
self._do_prop_diff(file_baton[2], name, value, pool)
return None
def change_dir_prop(self, dir_baton, name, value, pool):
if dir_baton[1] is not None:
self._do_prop_diff(dir_baton[1], name, value, pool)
return None
def set_target_revision(self, target_revision):
self.target_revision = target_revision
def _basename(path):
"Return the basename for a '/'-separated path."
idx = path.rfind('/')
if idx == -1:
return path
return path[idx+1:]
def usage(exit):
if exit:
output = sys.stderr
output = sys.stdout
"usage: %s REPOS_PATH rev REV [COMMAND] - inspect revision REV\n"
" %s REPOS_PATH txn TXN [COMMAND] - inspect transaction TXN\n"
" %s REPOS_PATH [COMMAND] - inspect the youngest revision\n"
"REV is a revision number > 0.\n"
"TXN is a transaction name.\n"
"If no command is given, the default output (which is the same as\n"
"running the subcommands `info' then `tree') will be printed.\n"
"COMMAND can be one of: \n"
" author: print author.\n"
" changed: print full change summary: all dirs & files changed.\n"
" date: print the timestamp (revisions only).\n"
" diff: print GNU-style diffs of changed files and props.\n"
" dirs-changed: print changed directories.\n"
" ids: print the tree, with nodes ids.\n"
" info: print the author, data, log_size, and log message.\n"
" log: print log message.\n"
" tree: print the tree.\n"
% (sys.argv[0], sys.argv[0], sys.argv[0]))
def main():
if len(sys.argv) < 2:
rev = txn = None
args = sys.argv[2:]
if args:
cmd = args[0]
if cmd == 'rev':
if len(args) == 1:
rev = int(args[1])
except ValueError:
del args[:2]
elif cmd == 'txn':
if len(args) == 1:
txn = args[1]
del args[:2]
if args:
if len(args) > 1:
cmd = args[0].replace('-', '_')
cmd = 'default'
if not hasattr(SVNLook, 'cmd_' + cmd):
SVNLook(sys.argv[1], rev, txn, cmd)
if __name__ == '__main__':