blob: 659a9d23bb5edf778411f46889020e4722f44d6d [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Setup your environment so that `which svn` shows the svn you want to test.
# Just run this file, no parameters. Test files are created in /tmp/...
# To adjust which tests are run, look at 'p = Permutations(' way below...
#
# This runs an insane amount of tests of add-vs.-add situations during update
# and switch. The output scrolls by, and a summary for all tests with simple
# greps and wc.db SELECT results is printed in the very end.
#
# There is no automatic validation. You have to read the results.
#
# To run a gdb in any given place, replace a 'svn()' with 'gdbsvn()',
# presumably in either up() or sw().
from subprocess import Popen, PIPE, call
from types import FunctionType, ListType, TupleType
import tempfile, os
from itertools import product
def run_cmd(cmd, verbose=True, shell=False):
if verbose:
if shell:
print('\n---', cmd)
else:
print('\n---', ' '.join(cmd))
p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=shell)
stdout,stderr = p.communicate()[0:2]
if verbose and stdout:
print stdout,
if verbose and stderr:
print stderr,
return stdout,stderr
def qsvn(*args):
return run_cmd(['svn'] + list(args), False)
def svn(*args):
return run_cmd(['svn'] + list(args))
def gdbsvn(*args):
call(['gdb', '--args', 'svn'] + list(args))
return 'gdb', 'gdb'
def shell(script):
return run_cmd(script, shell=True)
def rewrite_file(path, contents, auto_newline='\n\n'):
dirname = os.path.dirname(path)
if dirname and not os.path.lexists(dirname):
os.makedirs(dirname)
f = open(path, "w")
f.write(contents + (auto_newline or ''))
f.close()
def read_file(path):
try:
f = open(path, "r")
contents = f.read()
f.close()
return contents
except IOError:
return None
def append_file(path, contents):
dirname = os.path.dirname(path)
if not os.path.lexists(dirname):
os.makedirs(dirname)
f = open(path, "a")
f.write(contents)
f.close()
def remove_file(path):
if os.path.isfile(path):
os.remove(path)
def tempdir():
return tempfile.mkdtemp(prefix='tc_add-')
j = os.path.join
f = 'f' # file
d = 'd' # dir
l = 'l' # symbolic link
class TestContext:
def __init__(self):
self.base = tempdir()
self.repos = j(self.base, 'repos')
self.URL = 'file://' + self.repos
shell('svnadmin create "' + self.repos + '"')
self.WC = j(self.base, 'wc')
svn('checkout', self.URL, self.WC)
def create_wc2(self):
self.WC2 = j(self.base, 'wc2')
svn('checkout', self.URL, self.WC2)
def wc(self, *relpath):
if not relpath:
return self.WC
return j(self.WC, *relpath)
def wc2(self, *relpath):
if not relpath:
return self.WC2
return j(self.WC2, *relpath)
def url(self, relpath):
if not relpath:
return self.URL
return self.URL + '/' + relpath
def head(self):
out, err = qsvn('info', self.URL)
revstr = 'Revision: '
for line in out.split('\n'):
if line.startswith(revstr):
return int(line.strip()[len(revstr):])
def unver(ctx, target, kind, content=None):
if not content:
content = 'content of ' + os.path.basename(target)
if kind == f:
rewrite_file(target, content)
shell('cat ' + target)
elif kind == l:
os.symlink('symlink', target)
else:
os.mkdir(target)
def add(ctx, target, kind, content):
unver(ctx, target, kind, content)
svn('add', target)
svn('ps', 'PROP_add_' + content + '_' + kind,
'content_add_' + content + '_' + kind,
target)
def cp(ctx, suffix, target, kind, content=None):
if not content:
content = 'modified ' + os.path.basename(target)
if kind == f:
src = ctx.url('file' + suffix)
elif kind == l:
src = ctx.url('symlink' + suffix)
else:
src = ctx.url('dir' + suffix)
src = src + '@1'
svn('copy', src, target)
svn('ps', 'PROP_copied_' + content + '_' + kind + suffix,
'content_' + content + '_' + kind + suffix,
target)
svn('status', target)
def cp1(ctx, target, kind, content=None):
return cp(ctx, '1', target, kind, content)
def cp2(ctx, target, kind, content=None):
return cp(ctx, '2', target, kind, content)
def prepare_cp(ctx, kind, suffix):
if kind == f:
target = ctx.wc('file' + suffix)
if not os.path.lexists(target):
rewrite_file(target, 'copy source ' + suffix)
svn('add', target)
svn('ps', 'PROP_copy_source_' + kind + suffix,
'content_copy_source_' + kind + suffix,
target)
elif kind == l:
target = ctx.wc('symlink' + suffix)
if not os.path.lexists(target):
os.symlink('copy_source' + suffix, target)
svn('add', target)
svn('ps', 'PROP_copy_source_' + kind + suffix,
'content_copy_source_' + kind + suffix,
target)
else:
target = ctx.wc('dir' + suffix)
svn('mkdir', target)
svn('ps', 'PROP_copy_source_' + kind + suffix,
'content_copy_source_' + kind + suffix,
target)
def postpare_cp(ctx, kind, suffix):
if kind == f:
target = ctx.wc('file' + suffix)
rewrite_file(target, 'local mod on copy source ' + suffix)
svn('ps', 'PROP_local_copy_src_mod_' + kind + suffix,
'content_local_copy_src_mod_' + kind + suffix,
target)
elif kind == l:
target = ctx.wc('symlink' + suffix)
svn('ps', 'PROP_local_copy_src_mod_' + kind + suffix,
'content_local_copy_src_mod_' + kind + suffix,
target)
else:
target = ctx.wc('dir' + suffix)
svn('ps', 'PROP_local_copy_src_mod_' + kind + suffix,
'content_local_copy_src_mod_' + kind + suffix,
target)
def prepare(ctx, action, kind):
if action == cp1:
prepare_cp(ctx, kind, '1')
elif action == cp2:
prepare_cp(ctx, kind, '2')
def postpare(ctx, action, kind):
if action == cp1:
postpare_cp(ctx, kind, '1')
elif action == cp2:
postpare_cp(ctx, kind, '2')
def co(name, local_action, local_kind, incoming_action, incoming_kind):
ctx = TestContext()
prepare(ctx, local_action, local_kind)
prepare(ctx, incoming_action, incoming_kind)
svn('commit', '-mm', ctx.WC)
svn('up', ctx.WC)
head = ctx.head()
print(head)
ctx.create_wc2()
target = ctx.wc2(name)
incoming_action(ctx, target, incoming_kind, 'incoming')
svn('commit', '-mm', ctx.WC2)
target = ctx.wc(name)
local_action(ctx, target, local_kind, 'local')
postpare(ctx, local_action, local_kind)
postpare(ctx, incoming_action, incoming_kind)
# get conflicts
o1,e1 = shell('yes p | svn checkout "' + ctx.URL + '" ' +
'"' + ctx.WC + '"')
o2,e2 = svn('status', ctx.WC)
o3,e3 = run_cmd(['sqlite3', ctx.wc('.svn', 'wc.db'),
'select local_relpath,properties from base_node; '
+'select local_relpath,properties from working_node; '
+'select local_relpath,properties from actual_node; '
])
return o1, e1, o2, e2, o3, e3
def up(name, local_action, local_kind, incoming_action, incoming_kind):
ctx = TestContext()
prepare(ctx, local_action, local_kind)
prepare(ctx, incoming_action, incoming_kind)
svn('commit', '-mm', ctx.WC)
svn('up', ctx.WC)
head = ctx.head()
print(head)
target = ctx.wc(name)
incoming_action(ctx, target, incoming_kind, 'incoming')
svn('commit', '-mm', ctx.WC)
# time warp
svn('update', '-r', str(head), ctx.WC)
local_action(ctx, target, local_kind, 'local')
postpare(ctx, local_action, local_kind)
postpare(ctx, incoming_action, incoming_kind)
# get conflicts
o1,e1 = svn('update', '--accept=postpone', ctx.WC)
o2,e2 = svn('status', ctx.WC)
o3,e3 = run_cmd(['sqlite3', ctx.wc('.svn', 'wc.db'),
'select local_relpath,properties from base_node; '
+'select local_relpath,properties from working_node; '
+'select local_relpath,properties from actual_node; '
])
return o1, e1, o2, e2, o3, e3
def sw(name, local_action, local_kind, incoming_action, incoming_kind):
ctx = TestContext()
prepare(ctx, local_action, local_kind)
prepare(ctx, incoming_action, incoming_kind)
svn('commit', '-mm', ctx.WC)
svn('mkdir', '-mm', ctx.url('trunk'))
svn('copy', '-mm', ctx.url('trunk'), ctx.url('branch'))
svn('up', ctx.WC)
target = ctx.wc('branch', name)
incoming_action(ctx, target, incoming_kind, 'incoming')
svn('commit', '-mm', ctx.WC)
svn('up', ctx.WC)
target = ctx.wc('trunk', name)
local_action(ctx, target, local_kind, 'local')
postpare(ctx, local_action, local_kind)
postpare(ctx, incoming_action, incoming_kind)
# get conflicts
o1,e1 = svn('switch', '--accept=postpone', ctx.url('branch'), ctx.wc('trunk'))
o2,e2 = svn('status', ctx.WC)
o3,e3 = run_cmd(['sqlite3', ctx.wc('trunk', '.svn', 'wc.db'),
'select local_relpath,properties from base_node; select'
+ ' local_relpath,properties from working_node;'])
# This is a bit stupid. Someone rewire this.
return o1, e1, o2, e2, o3, e3
# This controls which tests are run. All possible combinations are tested.
# The elements are functions for up,sw and add,cp1,cp2,unver, and they are
# simple strings for f (file), l (symlink), d (directory).
#
# cmd local action and kind incoming action and kind
p = product((co,up,sw), (add,cp1,unver), (f,l,d), (add,cp2,cp1), (f,l,d))
# Incoming cp1 is meant to match up only with local cp1. Also, cp1-cp1 is
# supposed to perform identical copies in both incoming and local, so they
# only make sense with matching kinds. Skip all rows that don't match this:
skip = lambda row: (row[3] == cp1 and (row[4] != row[2] or row[1] != cp1)
#Select subsets if desired
#or (row[0] != up or row[3] not in [cp1, cp2])
#or (row[2] != l and row[4] != l)
#
#or row not in (
##[up, cp1, l, add, l],
##[up, cp1, f, cp2, l],
#[up, cp1, f, cp2, f],
##[up, cp1, f, add, f],
##[up, cp1, f, add, l],
##[up, add, l, add, l],
#)
)
def nameof(thing):
if isinstance(thing, FunctionType):
return thing.__name__
if isinstance(thing, ListType) or isinstance(thing, TupleType):
return '_'.join([ nameof(thang) for thang in thing])
return str(thing)
def analyze(name, outs):
stats = []
for o in outs:
for line in o.split('\n'):
if (line.startswith('svn: ') or line.startswith(' > ')
or line.find(name) > -1):
stats.append(line)
return stats
#os.environ['SVN_I_LOVE_CORRUPTED_WORKING_COPIES_SO_DISABLE_SLEEP_FOR_TIMESTAMPS'] = 'yes'
results = []
name = None
try:
# there is probably a better way than this:
for row in list(p):
if skip(row):
continue
name = nameof(row)
print(name)
test_func = row[0]
results.append( (name, analyze( name, test_func( name, *row[1:] ) )) )
except:
if name:
print('Error during', name)
raise
finally:
lines = []
for result in results:
name = result[0]
if result[1]:
lines.append('----- ' + name)
for stat in result[1]:
lines.append(stat)
else:
lines.append('----- ' + name + ': nothing.')
dump = '\n'.join(lines)
print(dump)
rewrite_file('tree-conflicts-add-vs-add.py.results', dump)