blob: bf5faba4879ee5ba6a9834d63319fcd9202bd61a [file] [log] [blame]
# -*-python-*-
#
# Copyright (C) 1999-2021 The ViewCVS Group. All Rights Reserved.
#
# By using this file, you agree to the terms and conditions set forth in
# the LICENSE.html file which can be found at the top level of the ViewVC
# distribution or at http://viewvc.org/license-1.html.
#
# For more information, visit http://viewvc.org/
#
# -----------------------------------------------------------------------
#
# viewvc: View CVS/SVN repositories via a web browser
#
# -----------------------------------------------------------------------
from __future__ import print_function
__version__ = "1.3.0-dev"
# Standard modules that we know are in the path or builtin.
import sys
import os
import calendar
import copy
import fnmatch
import gzip
import mimetypes
import re
import email.utils
import stat
import struct
import tempfile
import time
from operator import attrgetter
import io
from urllib.parse import urlencode as _urlencode, quote as _quote
# These modules come from our library (the stub has set up the path)
from common import (
ViewVCException,
get_exception_data,
print_exception_data,
_RCSDIFF_NO_CHANGES,
_RCSDIFF_IS_BINARY,
_RCSDIFF_ERROR,
TemplateData,
_item,
get_repos_encodings,
)
import accept
import config
import ezt
import sapi
import vcauth
import vclib
import vclib.ccvs
import vclib.svn
try:
import idiff
except (SyntaxError, ImportError):
idiff = None
# Initialize the system tracebacklimit value to 0, meaning stack
# traces will carry only the top-level exception string. This can be
# overridden via configuration.
sys.tracebacklimit = 0
#########################################################################
docroot_magic_path = "*docroot*"
viewcvs_mime_type = "text/vnd.viewcvs-markup"
alt_mime_type = "text/x-cvsweb-markup"
view_roots_magic = "*viewroots*"
# Put here the variables we need in order to hold our state - they
# will be added (with their current value) to (almost) any link/query
# string you construct.
_sticky_vars = [
"hideattic",
"sortby",
"sortdir",
"logsort",
"diff_format",
"search",
"limit_changes",
]
# for reading/writing between a couple descriptors
CHUNK_SIZE = 8192
# special characters that don't need to be URL encoded
_URL_SAFE_CHARS = "/*~"
class TextIOWrapper_noclose(io.TextIOWrapper):
"""Custom TextIOWrapper class which doesn't close underlaying IO object when
close() is called or this object is destroyed."""
def close(self):
if not self.closed:
self.closed = True
self.flush()
self.detach()
class Request:
def __init__(self, server, cfg):
self.server = server
self.cfg = cfg
self.script_name = _normalize_path(server.getenv("SCRIPT_NAME", ""))
self.browser = server.getenv("HTTP_USER_AGENT", "unknown")
# process the Accept-Language: header, and load the key/value
# files, given the selected language
hal = server.getenv("HTTP_ACCEPT_LANGUAGE", "")
try:
self.lang_selector = accept.language(hal)
except accept.AcceptLanguageParseError:
self.lang_selector = accept.language("en")
self.language = self.lang_selector.select_from(cfg.general.languages)
self.kv = cfg.load_kv_files(self.language)
# check for an authenticated username
self.username = server.getenv("REMOTE_USER")
# if we allow compressed output, see if the client does too
self.gzip_compress_level = 0
if cfg.options.allow_compress:
http_accept_encoding = os.environ.get("HTTP_ACCEPT_ENCODING", "")
if "gzip" in [x.strip() for x in http_accept_encoding.split(",")]:
self.gzip_compress_level = 9 # make this configurable?
def run_viewvc(self):
cfg = self.cfg
# This function first parses the query string and sets the following
# variables. Then it executes the request.
self.view_func = None # function to call to process the request
self.repos = None # object representing current repository
self.rootname = None # name of current root (as used in viewvc.conf)
self.roottype = None # current root type ('svn' or 'cvs')
self.rootpath = None # physical path to current root
self.pathtype = None # type of path, either vclib.FILE or vclib.DIR
self.where = None # path to file or directory in current root
self.query_dict = {} # validated and cleaned up query options
self.path_parts = None # for convenience, equals where.split('/')
self.pathrev = None # current path revision or tag
self.auth = None # authorizer module in use
# redirect if we're loading from a valid but irregular URL
# These redirects aren't neccessary to make ViewVC work, it functions
# just fine without them, but they make it easier for server admins to
# implement access restrictions based on URL
needs_redirect = 0
# Process the query params
for name, values in self.server.params().items():
# we only care about the first value
value = values[0]
# patch up old queries that use 'cvsroot' to look like they used 'root'
if name == "cvsroot":
name = "root"
needs_redirect = 1
# same for 'only_with_tag' and 'pathrev'
if name == "only_with_tag":
name = "pathrev"
needs_redirect = 1
# redirect view=rev to view=revision, too
if name == "view" and value == "rev":
value = "revision"
needs_redirect = 1
# validate the parameter
_validate_param(name, value)
# if we're here, then the parameter is okay
self.query_dict[name] = value
# Resolve the view parameter into a handler function.
self.view_func = _views.get(self.query_dict.get("view", None), self.view_func)
# Process PATH_INFO component of query string
path_info = self.server.getenv("PATH_INFO", "")
# clean it up. this removes duplicate '/' characters and any that may
# exist at the front or end of the path.
#
# TODO: we might want to redirect to the cleaned up URL
path_parts = _path_parts(path_info)
if path_parts:
# handle docroot magic path prefixes
if path_parts[0] == docroot_magic_path:
# if this is just a simple hunk of doc, then serve it up
self.where = _path_join(path_parts[1:])
return view_doc(self)
# handle tarball magic suffixes
if self.view_func is download_tarball:
if self.query_dict.get("parent"):
del path_parts[-1]
elif path_parts[-1][-7:] == ".tar.gz":
path_parts[-1] = path_parts[-1][:-7]
# Figure out root name
self.rootname = self.query_dict.get("root")
if self.rootname == view_roots_magic:
del self.query_dict["root"]
self.rootname = ""
needs_redirect = 1
elif self.rootname is None:
if cfg.options.root_as_url_component:
if path_parts:
roottype, rootpath, self.rootname, new_path_parts = locate_root_from_path(
cfg, path_parts
)
if roottype is None:
# Perhaps the root name is candidate for renaming...
# Take care of old-new roots mapping
for old_root, new_root in cfg.general.renamed_roots.items():
pp = _path_parts(old_root)
if _path_starts_with(path_parts, pp):
path_parts = path_parts[len(pp) :]
self.rootname = new_root
needs_redirect = 1
if self.rootname is None:
# Not found; interpret whole path as root, to show as error
self.rootname = _path_join(path_parts)
path_parts = []
else:
path_parts = new_path_parts
else:
self.rootname = ""
elif self.view_func != view_roots:
self.rootname = cfg.general.default_root
elif cfg.options.root_as_url_component:
needs_redirect = 1
# Take care of old-new roots mapping
for old_root, new_root in cfg.general.renamed_roots.items():
if self.rootname == old_root:
self.rootname = new_root
needs_redirect = 1
self.where = _path_join(path_parts)
self.path_parts = path_parts
if self.rootname:
roottype, rootpath = locate_root(cfg, self.rootname)
if roottype:
# Overlay root-specific options.
cfg.overlay_root_options(self.rootname)
# Setup an Authorizer for this rootname and username
self.auth = setup_authorizer(cfg, self.username)
# get the encoding for the path and contents
path_encoding, content_encoding = get_repos_encodings(cfg, self.rootname)
# Create the repository object
try:
if roottype == "cvs":
self.rootpath = vclib.ccvs.canonicalize_rootpath(rootpath)
self.repos = vclib.ccvs.CVSRepository(
self.rootname,
self.rootpath,
self.auth,
cfg.utilities,
cfg.options.use_rcsparse,
content_encoding,
path_encoding,
)
# required so that spawned rcs programs correctly expand
# $CVSHeader$
if sys.platform == 'win32':
os.environ["CVSROOT"] = self.rootpath
else:
os.environb[b"CVSROOT"] = self.rootpath.encode(path_encoding)
elif roottype == "svn":
self.rootpath = vclib.svn.canonicalize_rootpath(rootpath)
self.repos = vclib.svn.SubversionRepository(
self.rootname,
self.rootpath,
self.auth,
cfg.utilities,
cfg.options.svn_config_dir,
content_encoding,
path_encoding,
)
else:
raise vclib.ReposNotFound()
except vclib.ReposNotFound:
pass
if self.repos is None:
raise ViewVCException(
'The root "%s" is unknown. If you believe the value is '
"correct, then please double-check your configuration." % self.rootname,
"404 Not Found",
)
if self.repos:
self.repos.open()
vctype = self.repos.roottype()
if vctype == vclib.SVN:
self.roottype = "svn"
elif vctype == vclib.CVS:
self.roottype = "cvs"
else:
raise ViewVCException(
'The root "%s" has an unknown type ("%s"). Expected "cvs" or "svn".'
% (self.rootname, vctype),
"500 Internal Server Error",
)
# If this is using an old-style 'rev' parameter, redirect to new hotness.
# Subversion URLs will now use 'pathrev'; CVS ones use 'revision'.
if self.repos and "rev" in self.query_dict:
if (
self.roottype == "svn"
and "pathrev" not in self.query_dict
and not self.view_func == view_revision
):
self.query_dict["pathrev"] = self.query_dict["rev"]
del self.query_dict["rev"]
else: # elif 'revision' not in self.query_dict: ?
self.query_dict["revision"] = self.query_dict["rev"]
del self.query_dict["rev"]
needs_redirect = 1
if self.repos and self.view_func is not redirect_pathrev:
# If this is an intended-to-be-hidden CVSROOT path, complain.
if cfg.options.hide_cvsroot and is_cvsroot_path(self.roottype, path_parts):
raise ViewVCException("Unknown location: /%s" % self.where, "404 Not Found")
# Make sure path exists
self.pathrev = pathrev = self.query_dict.get("pathrev")
self.pathtype = _repos_pathtype(self.repos, path_parts, pathrev)
if self.pathtype is None:
# Path doesn't exist, see if it could be an old-style ViewVC URL
# with a fake suffix.
result = (
_strip_suffix(".diff", path_parts, pathrev, vclib.FILE, self.repos, view_diff)
or _strip_suffix(
".tar.gz", path_parts, pathrev, vclib.DIR, self.repos, download_tarball
)
or _strip_suffix(
"root.tar.gz", path_parts, pathrev, vclib.DIR, self.repos, download_tarball
)
or _strip_suffix(
self.rootname + "-root.tar.gz",
path_parts,
pathrev,
vclib.DIR,
self.repos,
download_tarball,
)
or _strip_suffix(
"root", path_parts, pathrev, vclib.DIR, self.repos, download_tarball
)
or _strip_suffix(
self.rootname + "-root",
path_parts,
pathrev,
vclib.DIR,
self.repos,
download_tarball,
)
)
if result:
self.path_parts, self.pathtype, self.view_func = result
self.where = _path_join(self.path_parts)
needs_redirect = 1
else:
raise ViewVCException("Unknown location: /%s" % self.where, "404 Not Found")
# If we have an old ViewCVS Attic URL which is still valid, redirect
if self.roottype == "cvs":
attic_parts = None
if (
self.pathtype == vclib.FILE
and len(self.path_parts) > 1
and self.path_parts[-2] == "Attic"
):
attic_parts = self.path_parts[:-2] + self.path_parts[-1:]
elif (
self.pathtype == vclib.DIR
and len(self.path_parts) > 0
and self.path_parts[-1] == "Attic"
):
attic_parts = self.path_parts[:-1]
if attic_parts:
self.path_parts = attic_parts
self.where = _path_join(attic_parts)
needs_redirect = 1
if self.view_func is None:
# view parameter is not set, try looking at pathtype and the
# other parameters
if not self.rootname:
self.view_func = view_roots
elif self.pathtype == vclib.DIR:
# ViewCVS 0.9.2 used to put ?tarball=1 at the end of tarball urls
if "tarball" in self.query_dict:
self.view_func = download_tarball
elif "r1" in self.query_dict and "r2" in self.query_dict:
self.view_func = view_diff
else:
self.view_func = view_directory
elif self.pathtype == vclib.FILE:
if "r1" in self.query_dict and "r2" in self.query_dict:
self.view_func = view_diff
elif "annotate" in self.query_dict:
self.view_func = view_annotate
elif "graph" in self.query_dict:
if "makeimage" not in self.query_dict:
self.view_func = view_cvsgraph
else:
self.view_func = view_cvsgraph_image
elif "revision" in self.query_dict or cfg.options.default_file_view != "log":
if cfg.options.default_file_view == "markup" or (
self.query_dict.get("content-type", None)
in (viewcvs_mime_type, alt_mime_type)
):
self.view_func = view_markup
else:
self.view_func = view_checkout
else:
self.view_func = view_log
# If we've chosen the roots or revision view, our effective
# location is not really "inside" the repository, so we have no
# path and therefore no path parts or type, either.
if self.view_func is view_revision or self.view_func is view_roots:
self.where = ""
self.path_parts = []
self.pathtype = None
# if we have a directory and the request didn't end in "/", then redirect
# so that it does.
if (
self.pathtype == vclib.DIR
and path_info[-1:] != "/"
and self.view_func is not download_tarball
and self.view_func is not redirect_pathrev
):
needs_redirect = 1
# startup is done now.
# If we need to redirect, do so. Otherwise, handle our requested view.
if needs_redirect:
self.server.redirect(self.get_url())
else:
self.view_func(self)
def get_url(self, escape=0, partial=0, prefix=0, **args):
"""Constructs a link to another ViewVC page just like the get_link
function except that it returns a single URL instead of a URL
split into components. If PREFIX is set, include the protocol and
server name portions of the URL."""
url, params = self.get_link(*(), **args)
qs = _urlencode(params)
if qs:
result = _quote(url, _URL_SAFE_CHARS, "utf-8", "surrogateescape") + "?" + qs
else:
result = _quote(url, _URL_SAFE_CHARS, "utf-8", "surrogateescape")
if partial:
result = result + (qs and "&" or "?")
if escape:
result = self.server.escape(result)
if prefix:
result = "%s://%s%s" % (
self.server.getenv("HTTPS") == "on" and "https" or "http",
self.server.getenv("HTTP_HOST"),
result,
)
return result
def get_form(self, **args):
"""Constructs a link to another ViewVC page just like the get_link
function except that it returns a base URL suitable for use as an
HTML form action, and an iterable object with .name and .value
attributes representing stuff that should be in <input
type=hidden> tags with the link parameters."""
url, params = self.get_link(*(), **args)
action = self.server.escape(_quote(url, _URL_SAFE_CHARS, "utf-8", "surrogateescape"))
hidden_values = []
for name, value in params.items():
hidden_values.append(
_item(name=self.server.escape(name), value=self.server.escape(value))
)
return action, hidden_values
def get_link(self, view_func=None, where=None, pathtype=None, params=None):
"""Constructs a link pointing to another ViewVC page. All arguments
correspond to members of the Request object. If they are set to
None they take values from the current page. Return value is a base
URL and a dictionary of parameters"""
cfg = self.cfg
if view_func is None:
view_func = self.view_func
if params is None:
params = self.query_dict.copy()
else:
params = params.copy()
# must specify both where and pathtype or neither
assert (where is None) == (pathtype is None)
# if we are asking for the revision info view, we don't need any
# path information
if view_func is view_revision or view_func is view_roots or view_func is redirect_pathrev:
where = pathtype = None
elif where is None:
where = self.where
pathtype = self.pathtype
# no need to add sticky variables for views with no links
sticky_vars = not (view_func is view_checkout or view_func is download_tarball)
# The logic used to construct the URL is an inverse of the
# logic used to interpret URLs in Request.run_viewvc
url = self.script_name
# add root to url
rootname = None
if view_func is not view_roots:
if cfg.options.root_as_url_component:
# remove root from parameter list if present
try:
rootname = params["root"]
except KeyError:
rootname = self.rootname
else:
del params["root"]
# add root path component
if rootname is not None:
url = url + "/" + rootname
else:
# add root to parameter list
try:
rootname = params["root"]
except KeyError:
rootname = params["root"] = self.rootname
# no need to specify default root
if rootname == cfg.general.default_root:
del params["root"]
# add 'pathrev' value to parameter list
if (
self.pathrev is not None
and "pathrev" not in params
and view_func is not view_revision
and rootname == self.rootname
):
params["pathrev"] = self.pathrev
# add path
if where:
url = url + "/" + where
# add trailing slash for a directory
if pathtype == vclib.DIR:
url = url + "/"
# normalize top level URLs for use in Location headers and A tags
elif not url:
url = "/"
# no need to explicitly specify directory view for a directory
if view_func is view_directory and pathtype == vclib.DIR:
view_func = None
# no need to explicitly specify roots view when in root_as_url
# mode or there's no default root
if view_func is view_roots and (
cfg.options.root_as_url_component or not cfg.general.default_root
):
view_func = None
# no need to explicitly specify annotate view when
# there's an annotate parameter
if view_func is view_annotate and params.get("annotate") is not None:
view_func = None
# no need to explicitly specify diff view when
# there's r1 and r2 parameters
if view_func is view_diff and params.get("r1") is not None and params.get("r2") is not None:
view_func = None
# no need to explicitly specify checkout view when it's the default view
if view_func is view_checkout:
if cfg.options.default_file_view == "co" and pathtype == vclib.FILE:
view_func = None
# no need to explicitly specify markup view when it's the default view
if view_func is view_markup:
if cfg.options.default_file_view == "markup" and pathtype == vclib.FILE:
view_func = None
# set the view parameter
view_code = _view_codes.get(view_func)
if view_code and not ("view" in params and params["view"] is None):
params["view"] = view_code
# add sticky values to parameter list
if sticky_vars:
for name in _sticky_vars:
value = self.query_dict.get(name)
if value is not None and name not in params:
params[name] = value
# remove null values from parameter list
for name, value in list(params.items()):
if value is None:
del params[name]
return url, params
def _path_parts(path):
"""Split up a repository path into a list of path components"""
# clean it up. this removes duplicate '/' characters and any that may
# exist at the front or end of the path.
return [pp for pp in path.split("/") if pp]
def _normalize_path(path):
"""Collapse leading slashes in the script name
You only get multiple slashes in the script name when users accidentally
type urls like http://abc.com//viewvc.cgi/, but we correct for it
because we output the script name in links and web browsers
interpret //viewvc.cgi/ as http://viewvc.cgi/
"""
i = 0
for c in path:
if c != "/":
break
i = i + 1
if i:
return path[(i - 1) :]
return path
def _validate_param(name, value):
"""Validate whether the given value is acceptable for the param name.
If the value is not allowed, then an error response is generated, and
this function throws an exception. Otherwise, it simply returns None.
"""
# First things first -- check that we have a legal parameter name.
try:
validator = _legal_params[name]
except KeyError:
raise ViewVCException("An illegal parameter name was provided.", "400 Bad Request")
# Is there a validator? Is it a regex or a function? Validate if
# we can, returning without incident on valid input.
if validator is None:
return
elif hasattr(validator, "match"):
if validator.match(value):
return
else:
if validator(value):
return
# If we get here, the input value isn't valid.
raise ViewVCException(
'An illegal value was provided for the "%s" parameter.' % (name), "400 Bad Request"
)
def _validate_regex(value):
# We need to watch the flow of these parameters through the system
# to ensure they don't hit the page unescaped. otherwise, these
# parameters could constitute a CSS attack.
try:
re.compile(value)
return True
except Exception:
return None
def _validate_view(value):
# Return true iff VALUE is one of our allowed views.
return value in _views
def _validate_mimetype(value):
# For security purposes, we only allow mimetypes from a predefined set
# thereof.
return value in (viewcvs_mime_type, alt_mime_type, "text/plain")
# obvious things here. note that we don't need uppercase for alpha.
_re_validate_alpha = re.compile("^[a-z]+$")
_re_validate_number = re.compile("^[0-9]+$")
_re_validate_boolint = re.compile("^[01]$")
# when comparing two revs, we sometimes construct REV:SYMBOL, so ':' is needed
_re_validate_revnum = re.compile("^[-_.a-zA-Z0-9:~\\[\\]/]*$")
# date time values
_re_validate_datetime = re.compile(r"^(\d\d\d\d-\d\d-\d\d(\s+\d\d:\d\d" r"(:\d\d)?)?)?$")
# the legal query parameters and their validation functions
_legal_params = {
"root": None,
"view": _validate_view,
"search": _validate_regex,
"p1": None,
"p2": None,
"hideattic": _re_validate_boolint,
"limit_changes": _re_validate_number,
"sortby": _re_validate_alpha,
"sortdir": _re_validate_alpha,
"logsort": _re_validate_alpha,
"diff_format": _re_validate_alpha,
"pathrev": _re_validate_revnum,
"dir_pagestart": _re_validate_number,
"log_pagestart": _re_validate_number,
"annotate": _re_validate_revnum,
"graph": _re_validate_revnum,
"makeimage": _re_validate_boolint,
"r1": _re_validate_revnum,
"tr1": _re_validate_revnum,
"r2": _re_validate_revnum,
"tr2": _re_validate_revnum,
"revision": _re_validate_revnum,
"content-type": _validate_mimetype,
# for cvsgraph
"gflip": _re_validate_boolint,
"gbbox": _re_validate_boolint,
"gshow": _re_validate_alpha,
"gleft": _re_validate_boolint,
"gmaxtag": _re_validate_number,
# for query
"file_match": _re_validate_alpha,
"branch_match": _re_validate_alpha,
"who_match": _re_validate_alpha,
"comment_match": _re_validate_alpha,
"dir": None,
"file": None,
"branch": None,
"who": None,
"comment": None,
"querysort": _re_validate_alpha,
"date": _re_validate_alpha,
"hours": _re_validate_number,
"mindate": _re_validate_datetime,
"maxdate": _re_validate_datetime,
"format": _re_validate_alpha,
# for redirect_pathrev
"orig_path": None,
"orig_pathtype": None,
"orig_pathrev": None,
"orig_view": None,
# DEPRECATED - these are no longer used, but kept around so that
# bookmarked URLs still "work" (for some definition thereof) after a
# ViewVC upgrade.
"parent": _re_validate_boolint,
"rev": _re_validate_revnum,
"tarball": _re_validate_boolint,
"hidecvsroot": _re_validate_boolint,
"limit": _re_validate_number,
}
def _path_join(path_parts):
return "/".join(path_parts)
def _path_starts_with(path_parts, first_path_parts):
if not path_parts:
return False
if len(path_parts) < len(first_path_parts):
return False
return path_parts[0 : len(first_path_parts)] == first_path_parts
def _strip_suffix(suffix, path_parts, rev, pathtype, repos, view_func):
"""strip the suffix from a repository path if the resulting path
is of the specified type, otherwise return None"""
if not path_parts:
return None
suffix_len = len(suffix)
if path_parts[-1][-suffix_len:] == suffix:
path_parts = path_parts[:]
if len(path_parts[-1]) == suffix_len:
del path_parts[-1]
else:
path_parts[-1] = path_parts[-1][:-suffix_len]
t = _repos_pathtype(repos, path_parts, rev)
if pathtype == t:
return path_parts, t, view_func
return None
def _repos_pathtype(repos, path_parts, rev):
"""Return the type of a repository path, or None if the path doesn't
exist"""
try:
return repos.itemtype(path_parts, rev)
except vclib.ItemNotFound:
return None
def _orig_path(request, rev_param="revision", path_param=None):
"Get original path of requested file at old revision before copies or moves"
# The 'pathrev' variable is interpreted by nearly all ViewVC views to
# provide a browsable snapshot of a repository at some point in its history.
# 'pathrev' is a tag name for CVS repositories and a revision number for
# Subversion repositories. It's automatically propagated between pages by
# logic in the Request.get_link() function which adds it to links like a
# sticky variable. When 'pathrev' is set, directory listings only include
# entries that exist in the specified revision or tag. Similarly, log pages
# will only show revisions preceding the point in history specified by
# 'pathrev.' Markup, checkout, and annotate pages show the 'pathrev'
# revision of files by default when no other revision is specified.
#
# In Subversion repositories, paths are always considered to refer to the
# pathrev revision. For example, if there is a "circle.jpg" in revision 3,
# which is renamed and modified as "square.jpg" in revision 4, the original
# circle image is visible at the following URLs:
#
# .../circle.jpg?pathrev=3
# .../square.jpg?revision=3
# .../square.jpg?revision=3&pathrev=4
#
# Note that the following:
#
# .../circle.jpg?rev=3
#
# now gets redirected to one of the following URLs:
#
# .../circle.jpg?pathrev=3 (for Subversion)
# .../circle.jpg?revision=3 (for CVS)
#
rev = request.query_dict.get(rev_param, request.pathrev)
path = request.query_dict.get(path_param, request.where)
if rev is not None and hasattr(request.repos, "_getrev"):
try:
pathrev = request.repos._getrev(request.pathrev)
rev = request.repos._getrev(rev)
except vclib.InvalidRevision:
raise ViewVCException("Invalid revision", "404 Not Found")
return _path_parts(request.repos.get_location(path, pathrev, rev)), rev
return _path_parts(path), rev
def setup_authorizer(cfg, username, rootname=None):
"""Setup the authorizer. If ROOTNAME is provided, assume that
per-root options have not been overlayed. Otherwise, assume they
have (and fetch the authorizer for the configured root)."""
if rootname is None:
authorizer = cfg.options.authorizer
params = cfg.get_authorizer_params()
else:
authorizer, params = cfg.get_authorizer_and_params_hack(rootname)
# No configured authorizer? No problem.
if not authorizer:
return None
# First, try to load a module with the configured name.
import imp
fp = None
try:
try:
fp, path, desc = imp.find_module("%s" % (authorizer), vcauth.__path__)
my_auth = imp.load_module("viewvc", fp, path, desc)
except ImportError:
raise ViewVCException(
'Invalid authorizer (%s) specified for root "%s"' % (authorizer, rootname),
"500 Internal Server Error",
)
finally:
if fp:
fp.close()
# Add a rootname mapping callback function to the parameters.
def _root_lookup_func(cb_rootname):
return locate_root(cfg, cb_rootname)
# Finally, instantiate our Authorizer.
return my_auth.ViewVCAuthorizer(_root_lookup_func, username, params)
def check_freshness(request, mtime=None, etag=None, weak=0):
cfg = request.cfg
# See if we are supposed to disable etags (for debugging, usually)
if not cfg.options.generate_etags:
return 0
request_etag = request_mtime = None
if etag is not None:
if weak:
etag = 'W/"%s"' % etag
else:
etag = '"%s"' % etag
request_etag = request.server.getenv("HTTP_IF_NONE_MATCH")
if mtime is not None:
try:
request_mtime = request.server.getenv("HTTP_IF_MODIFIED_SINCE")
request_mtime = email.utils.mktime_tz(email.utils.parsedate_tz(request_mtime))
except Exception:
request_mtime = None
# if we have an etag, use that for freshness checking.
# if not available, then we use the last-modified time.
# if not available, then the document isn't fresh.
if etag is not None:
isfresh = request_etag == etag
elif mtime is not None and request_mtime is not None:
isfresh = request_mtime >= mtime
else:
isfresh = 0
# require revalidation after the configured amount of time
if cfg and cfg.options.http_expiration_time >= 0:
expiration = email.utils.formatdate(time.time() + cfg.options.http_expiration_time)
request.server.add_header("Expires", expiration)
request.server.add_header("Cache-Control", "max-age=%d" % cfg.options.http_expiration_time)
if isfresh:
request.server.start_response(status="304 Not Modified")
else:
if etag is not None:
request.server.add_header("ETag", etag)
if mtime is not None:
request.server.add_header("Last-Modified", email.utils.formatdate(mtime))
return isfresh
def get_view_template(cfg, view_name, language="en"):
# See if the configuration specifies a template for this view. If
# not, use the default template path for this view.
tname = vars(cfg.templates).get(view_name) or view_name + ".ezt"
# Template paths are relative to the configurated template_dir (if
# any, "templates" otherwise), so build the template path as such.
tname = os.path.join(cfg.options.template_dir or "templates", tname)
# Allow per-language template selection.
tname = tname.replace("%lang%", language)
# Finally, construct the whole template path and return the Template.
return ezt.Template(cfg.path(tname))
def get_writeready_server_file(
request,
content_type=None,
encoding=None,
content_length=None,
allow_compress=True,
is_text=False,
):
"""Return a file handle to a response body stream, after outputting
any queued special headers (on REQUEST.server) and (optionally) a
'Content-Type' header whose value is CONTENT_TYPE and character set
is ENCODING.
If CONTENT_LENGTH is provided and compression is not in use, also
generate a 'Content-Length' header for this response.
Callers my use ALLOW_COMPRESS to disable compression where it would
otherwise be allowed. (Such as when transmitting an
already-compressed response.)
After this function is called, it is too late to add new headers to
the response."""
if allow_compress and request.gzip_compress_level:
request.server.add_header("Content-Encoding", "gzip")
elif content_length is not None:
request.server.add_header("Content-Length", content_length)
if content_type and encoding:
request.server.start_response("%s; charset=%s" % (content_type, encoding))
elif content_type:
request.server.start_response(content_type)
else:
request.server.start_response()
if allow_compress and request.gzip_compress_level:
fp = gzip.GzipFile("", "wb", request.gzip_compress_level, request.server.file())
else:
fp = request.server.file()
if is_text:
fp = TextIOWrapper_noclose(fp, "utf-8", "surrogateescape", write_through=True)
return fp
def generate_page(request, view_name, data, content_type=None):
server_fp = get_writeready_server_file(request, content_type, "utf-8", is_text=True)
template = get_view_template(request.cfg, view_name, request.language)
template.generate(server_fp, data)
def nav_path(request):
"""Return current path as list of items with "name" and "href" members
The href members are view_directory links for directories and view_log
links for files, but are set to None when the link would point to
the current view"""
if not request.repos:
return []
is_dir = request.pathtype == vclib.DIR
# add root item
items = []
root_item = _item(name=request.server.escape(request.repos.name), href=None)
if request.path_parts or request.view_func is not view_directory:
root_item.href = request.get_url(
view_func=view_directory, where="", pathtype=vclib.DIR, params={}, escape=1
)
items.append(root_item)
# add path part items
path_parts = []
for part in request.path_parts:
path_parts.append(part)
is_last = len(path_parts) == len(request.path_parts)
item = _item(name=request.server.escape(part), href=None)
if not is_last or (is_dir and request.view_func is not view_directory):
item.href = request.get_url(
view_func=view_directory,
where=_path_join(path_parts),
pathtype=vclib.DIR,
params={},
escape=1,
)
elif not is_dir and request.view_func is not view_log:
item.href = request.get_url(
view_func=view_log,
where=_path_join(path_parts),
pathtype=vclib.FILE,
params={},
escape=1,
)
items.append(item)
return items
def prep_tags(request, tags):
url, params = request.get_link(params={"pathrev": None})
params = _urlencode(params)
if params:
url = _quote(url, _URL_SAFE_CHARS, "utf-8", "surrogateescape") + "?" + params + "&pathrev="
else:
url = _quote(url, _URL_SAFE_CHARS, "utf-8", "surrogateescape") + "?pathrev="
url = request.server.escape(url)
links = []
for tag in tags:
href = url + tag.name
links.append(_item(name=tag.name, href=href))
links.sort(key=attrgetter('name'))
return links
def guess_mime(filename):
return mimetypes.guess_type(filename)[0]
def is_viewable_image(mime_type):
return mime_type and mime_type in ("image/gif", "image/jpeg", "image/png")
def is_text(mime_type):
return not mime_type or mime_type[:5] == "text/"
def is_cvsroot_path(roottype, path_parts):
return roottype == "cvs" and path_parts and path_parts[0] == "CVSROOT"
def is_plain_text(mime_type):
return not mime_type or mime_type == "text/plain"
def default_view(mime_type, cfg):
"Determine whether file should be viewed through markup page or sent raw"
# If the mime type is text/anything or a supported image format we view
# through the markup page. If the mime type is something else, we send
# it directly to the browser. That way users can see things like flash
# animations, pdfs, word documents, multimedia, etc, which wouldn't be
# very useful marked up. If the mime type is totally unknown (happens when
# we encounter an unrecognized file extension) we also view it through
# the markup page since that's better than sending it text/plain.
if ("markup" in cfg.options.allowed_views) and (
is_viewable_image(mime_type) or is_text(mime_type)
):
return view_markup
return view_checkout
def is_binary_file_mime_type(mime_type, cfg):
"""Return True iff MIME_TYPE is set and matches one of the binary
file mime type patterns in CFG."""
if mime_type:
# We require explicit handling of the web-friendly images.
# For all other types, pattern-matching is used.
if is_viewable_image(mime_type):
return mime_type in cfg.options.binary_mime_types
for pattern in cfg.options.binary_mime_types:
if fnmatch.fnmatch(mime_type, pattern):
return True
return False
def is_dir_ignored_file(file_name, cfg):
"""Return True if FILE_NAME is set and matches one of the file names
or extensions to be ignored in directory listing per CFG."""
if file_name:
for pattern in cfg.options.dir_ignored_files:
if fnmatch.fnmatch(file_name, pattern):
return True
return False
def get_file_view_info(request, where, rev=None, mime_type=None, pathrev=-1):
"""Return an object holding common hrefs and a viewability flag used
for various views of FILENAME at revision REV whose MIME type is
MIME_TYPE.
The object's members include:
view_href
download_href
download_text_href
annotate_href
revision_href
prefer_markup
is_viewable_image
is_binary
"""
rev = rev and str(rev) or None
mime_type = mime_type or guess_mime(where)
if pathrev == -1: # cheesy default value, since we need to preserve None
pathrev = request.pathrev
view_href = None
download_href = None
download_text_href = None
annotate_href = None
revision_href = None
if "markup" in request.cfg.options.allowed_views:
view_href = request.get_url(
view_func=view_markup,
where=where,
pathtype=vclib.FILE,
params={"revision": rev, "pathrev": pathrev},
escape=1,
)
if "co" in request.cfg.options.allowed_views:
download_href = request.get_url(
view_func=view_checkout,
where=where,
pathtype=vclib.FILE,
params={"revision": rev, "pathrev": pathrev},
escape=1,
)
if not is_plain_text(mime_type):
download_text_href = request.get_url(
view_func=view_checkout,
where=where,
pathtype=vclib.FILE,
params={"content-type": "text/plain", "revision": rev, "pathrev": pathrev},
escape=1,
)
if "annotate" in request.cfg.options.allowed_views:
annotate_href = request.get_url(
view_func=view_annotate,
where=where,
pathtype=vclib.FILE,
params={"annotate": rev, "pathrev": pathrev},
escape=1,
)
if request.roottype == "svn":
revision_href = request.get_url(view_func=view_revision, params={"revision": rev}, escape=1)
is_binary_file = is_binary_file_mime_type(mime_type, request.cfg)
prefer_markup = default_view(mime_type, request.cfg) == view_markup
return _item(
view_href=view_href,
download_href=download_href,
download_text_href=download_text_href,
annotate_href=annotate_href,
revision_href=revision_href,
prefer_markup=ezt.boolean(prefer_markup),
is_viewable_image=ezt.boolean(is_viewable_image(mime_type)),
is_binary=ezt.boolean(is_binary_file),
)
# Matches URLs
_re_rewrite_url = re.compile(
r"((http|https|ftp|file|svn|svn\+ssh)"
r"(://[-a-zA-Z0-9%.~:_/]+)((\?|\&)"
r"([-a-zA-Z0-9%.~:_]+)=([-a-zA-Z0-9%.~:_])+)*"
r"(#([-a-zA-Z0-9%.~:_]+)?)?)"
)
# Matches email addresses
_re_rewrite_email = re.compile(r"([-a-zA-Z0-9_.\+]+)@" r"(([-a-zA-Z0-9]+\.)+[A-Za-z]{2,4})")
# Matches revision references
_re_rewrite_svnrevref = re.compile(r"\b(r|rev #?|revision #?)([0-9]+)\b")
class ViewVCHtmlFormatterTokens:
def __init__(self, tokens):
self.tokens = tokens
def get_result(self, maxlen=0):
"""Format the tokens per the registered set of formatters, and
limited to MAXLEN visible characters (or unlimited if MAXLEN is
0). Return a 3-tuple containing the formatted result string, the
number of visible characters in the result string, and a boolean
flag indicating whether or not S was truncated."""
out = ""
out_len = 0
for token in self.tokens:
chunk, chunk_len = token.converter(token.match, token.userdata, maxlen)
out = out + chunk
out_len = out_len + chunk_len
if maxlen:
maxlen = maxlen - chunk_len
if maxlen <= 0:
return out, out_len, 1
return out, out_len, 0
class ViewVCHtmlFormatter:
"""Format a string as HTML-encoded output with customizable markup
rules, for example turning strings that look like URLs into anchor links.
NOTE: While there might appear to be some unused portions of this
interface, there is a good chance that there are consumers outside
of ViewVC itself that make use of these things.
"""
def __init__(self):
self._formatters = []
def format_url(self, mobj, userdata, maxlen=0):
"""Return a 2-tuple containing:
- the text represented by MatchObject MOBJ, formatted as
linkified URL, with no more than MAXLEN characters in the
non-HTML-tag bits. If MAXLEN is 0, there is no maximum.
- the number of non-HTML-tag characters returned.
"""
s = mobj.group(0)
trunc_s = maxlen and s[:maxlen] or s
return '<a href="%s">%s</a>' % (sapi.escape(s), sapi.escape(trunc_s)), len(trunc_s)
def format_email(self, mobj, userdata, maxlen=0):
"""Return a 2-tuple containing:
- the text represented by MatchObject MOBJ, formatted as
linkified email address, with no more than MAXLEN characters
in the non-HTML-tag bits. If MAXLEN is 0, there is no maximum.
- the number of non-HTML-tag characters returned.
"""
s = mobj.group(0)
trunc_s = maxlen and s[:maxlen] or s
return '<a href="mailto:%s">%s</a>' % (_quote(s), self._entity_encode(trunc_s)), len(
trunc_s
)
def format_email_obfuscated(self, mobj, userdata, maxlen=0):
"""Return a 2-tuple containing:
- the text represented by MatchObject MOBJ, formatted as an
entity-encoded email address, with no more than MAXLEN characters
in the non-HTML-tag bits. If MAXLEN is 0, there is no maximum.
- the number of non-HTML-tag characters returned.
"""
s = mobj.group(0)
trunc_s = maxlen and s[:maxlen] or s
return self._entity_encode(trunc_s), len(trunc_s)
def format_email_truncated(self, mobj, userdata, maxlen=0):
"""Return a 2-tuple containing:
- the text represented by MatchObject MOBJ, formatted as an
HTML-escaped truncated email address of no more than MAXLEN
characters. If MAXLEN is 0, there is no maximum.
- the number of characters returned.
"""
s = mobj.group(1)
s_len = len(s)
if (maxlen == 0) or (s_len < (maxlen - 1)):
return self._entity_encode(s) + "&#64;&hellip;", s_len + 2
elif s_len < maxlen:
return self._entity_encode(s) + "&#64;", s_len + 1
else:
trunc_s = mobj.group(1)[:maxlen]
return self._entity_encode(trunc_s), len(trunc_s)
def format_svnrevref(self, mobj, userdata, maxlen=0):
"""Return a 2-tuple containing:
- the text represented by MatchObject MOBJ, formatted as an
linkified URL to a ViewVC Subversion revision view, with no
more than MAXLEN characters in the non-HTML-tag portions.
If MAXLEN is 0, there is no maximum.
- the number of characters returned.
USERDATA is a function that accepts a revision reference
and returns a URL to that revision.
"""
s = mobj.group(0)
revref = mobj.group(2)
trunc_s = maxlen and s[:maxlen] or s
revref_url = userdata(revref)
return '<a href="%s">%s</a>' % (sapi.escape(revref_url), sapi.escape(trunc_s)), len(trunc_s)
def format_custom_url(self, mobj, userdata, maxlen=0):
"""Return a 2-tuple containing:
- the text represented by MatchObject MOBJ, formatted as an
linkified URL created by substituting match groups 0-9 into
USERDATA (which is a format string that uses \\N to
represent the substitution locations) and with no more than
MAXLEN characters in the non-HTML-tag portions. If MAXLEN
is 0, there is no maximum.
- the number of characters returned.
"""
format = userdata
text = mobj.group(0)
url = format
for i in range(9):
try:
repl = mobj.group(i)
except Exception:
repl = ""
url = url.replace(r"\%d" % (i), repl)
trunc_s = maxlen and text[:maxlen] or text
return '<a href="%s">%s</a>' % (sapi.escape(url), sapi.escape(trunc_s)), len(trunc_s)
def format_text(self, s, unused, maxlen=0):
"""Return a 2-tuple containing:
- the text S, HTML-escaped, containing no more than MAXLEN
characters. If MAXLEN is 0, there is no maximum.
- the number of characters returned.
"""
trunc_s = maxlen and s[:maxlen] or s
return sapi.escape(trunc_s), len(trunc_s)
def add_formatter(self, regexp, conv, userdata=None):
"""Register a formatter which finds instances of strings matching
REGEXP, and using the function CONV and USERDATA to format them.
CONV is a function which accepts three parameters:
- the MatchObject which holds the string portion to be formatted,
- the USERDATA object,
- the maximum number of characters from that string to use for
human-readable output (or 0 to indicate no maximum).
"""
if isinstance(regexp, str):
regexp = re.compile(regexp)
self._formatters.append([regexp, conv, userdata])
def get_result(self, s, maxlen=0):
"""Format S per the set of formatters registered with this object,
and limited to MAXLEN visible characters (or unlimited if MAXLEN
is 0). Return a 3-tuple containing the formatted result string,
the number of visible characters in the result string, and a
boolean flag indicating whether or not S was truncated.
"""
return self.tokenize_text(s).get_result(maxlen)
def tokenize_text(self, s):
"""Return a ViewVCHtmlFormatterTokens object containing the tokens
created when parsing the string S. Callers can use that object's
get_result() function to retrieve HTML-formatted text.
"""
tokens = []
# We could just have a "while s:" here instead of "for line: while
# line:", but for really large log messages with heavy
# tokenization, the cost in both performance and memory
# consumption of the approach taken was atrocious.
for line in s.replace("\r\n", "\n").split("\n"):
line = line + "\n"
while line:
best_match = best_conv = best_userdata = None
for test in self._formatters:
match = test[0].search(line)
# If we find and match and (a) its our first one, or (b) it
# matches text earlier than our previous best match, or (c) it
# matches text at the same location as our previous best match
# but extends to cover more text than that match, then this is
# our new best match.
#
# Implied here is that when multiple formatters match exactly
# the same text, the first formatter in the registration list wins.
if match and (
(best_match is None)
or (match.start() < best_match.start())
or (
(match.start() == best_match.start())
and (match.end() > best_match.end())
)
):
best_match = match
best_conv = test[1]
best_userdata = test[2]
# If we found a match...
if best_match:
# ... add any non-matching stuff first, then the matching bit.
start = best_match.start()
end = best_match.end()
if start > 0:
tokens.append(
_item(match=line[:start], converter=self.format_text, userdata=None)
)
tokens.append(
_item(match=best_match, converter=best_conv, userdata=best_userdata)
)
line = line[end:]
else:
# Otherwise, just add the rest of the string.
tokens.append(_item(match=line, converter=self.format_text, userdata=None))
line = ""
return ViewVCHtmlFormatterTokens(tokens)
def _entity_encode(self, s):
return "".join(["&#%d;" % (ord(x)) for x in s])
class LogFormatter:
def __init__(self, request, log):
self.request = request
self.log = log or ""
self.tokens = None
self.cache = {} # (maxlen, htmlize) => resulting_log
def get(self, maxlen=0, htmlize=1):
cfg = self.request.cfg
# Prefer the cache.
if (maxlen, htmlize) in self.cache:
return self.cache[(maxlen, htmlize)]
# If we are HTML-izing...
if htmlize:
# ...and we don't yet have ViewVCHtmlFormatter() object tokens...
if not self.tokens:
# ... then get them.
lf = ViewVCHtmlFormatter()
# Rewrite URLs.
lf.add_formatter(_re_rewrite_url, lf.format_url)
# Rewrite Subversion revision references.
if self.request.roottype == "svn":
def revision_to_url(rev):
return self.request.get_url(
view_func=view_revision, params={"revision": rev}, escape=0
)
lf.add_formatter(_re_rewrite_svnrevref, lf.format_svnrevref, revision_to_url)
# Rewrite email addresses.
if cfg.options.mangle_email_addresses == 2:
lf.add_formatter(_re_rewrite_email, lf.format_email_truncated)
elif cfg.options.mangle_email_addresses == 1:
lf.add_formatter(_re_rewrite_email, lf.format_email_obfuscated)
else:
lf.add_formatter(_re_rewrite_email, lf.format_email)
# Add custom rewrite handling per configuration.
for rule in cfg.options.custom_log_formatting:
rule = rule.replace("\\:", "\x01")
regexp, format = [x.strip() for x in rule.split(":", 1)]
regexp = regexp.replace("\x01", ":")
format = format.replace("\x01", ":")
lf.add_formatter(re.compile(regexp), lf.format_custom_url, format)
# Tokenize the log message.
self.tokens = lf.tokenize_text(self.log)
# Use our formatter to ... you know ... format.
log, log_len, truncated = self.tokens.get_result(maxlen)
result_log = log + (truncated and "&hellip;" or "")
# But if we're not HTML-izing...
else:
# ...then do much more simplistic transformations as necessary.
log = self.log
if cfg.options.mangle_email_addresses == 2:
log = re.sub(_re_rewrite_email, r"\1@...", log)
result_log = maxlen and log[:maxlen] or log
# In either case, populate the cache and return the results.
self.cache[(maxlen, htmlize)] = result_log
return result_log
_time_desc = {
1: "second",
60: "minute",
3600: "hour",
86400: "day",
604800: "week",
2628000: "month",
31536000: "year",
}
def get_time_text(request, interval, num):
"Get some time text, possibly internationalized."
# Some languages have even harder pluralization rules. We'll have
# to deal with those on demand.
if num == 0:
return ""
text = _time_desc[interval]
if num == 1:
attr = text + "_singular"
fmt = "%d " + text
else:
attr = text + "_plural"
fmt = "%d " + text + "s"
try:
fmt = getattr(request.kv.i18n.time, attr)
except AttributeError:
pass
return fmt % num
def little_time(request):
try:
return request.kv.i18n.time.little_time
except AttributeError:
return "very little time"
def html_time(request, secs, extended=0):
secs = int(time.time()) - secs
if secs < 2:
return little_time(request)
breaks = sorted(_time_desc.keys())
i = 0
while i < len(breaks):
if secs < 2 * breaks[i]:
break
i = i + 1
value = breaks[i - 1]
s = get_time_text(request, value, secs // value)
if extended and i > 1:
secs = secs % value
value = breaks[i - 2]
ext = get_time_text(request, value, secs // value)
if ext:
# FIXME: This is not i18n compatible. Pass on it for now.
s = s + ", " + ext
return s
def common_template_data(request, revision=None, mime_type=None):
"""Return a TemplateData instance with data dictionary items
common to most ViewVC views."""
cfg = request.cfg
# Initialize data dictionary members (sorted alphanumerically)
data = TemplateData(
{
"annotate_href": None,
"cfg": cfg,
"docroot": (
cfg.options.docroot is None
and request.script_name + "/" + docroot_magic_path
or cfg.options.docroot
),
"download_href": None,
"download_text_href": None,
"graph_href": None,
"home_href": request.script_name or "/",
"kv": request.kv,
"lockinfo": None,
"log_href": None,
"nav_path": nav_path(request),
"pathtype": None,
"prefer_markup": ezt.boolean(0),
"queryform_href": None,
"rev": None,
"revision_href": None,
"rootname": (request.rootname and request.server.escape(request.rootname) or None),
"rootpath": request.rootpath,
"roots_href": None,
"roottype": request.roottype,
"rss_href": None,
"tarball_href": None,
"up_href": None,
"username": request.username,
"view": _view_codes[request.view_func],
"view_href": None,
"vsn": __version__,
"where": request.server.escape(request.where),
}
)
rev = revision
if not rev:
rev = request.query_dict.get("annotate")
if not rev:
rev = request.query_dict.get("revision")
if not rev and request.roottype == "svn":
rev = request.query_dict.get("pathrev")
try:
data["rev"] = hasattr(request.repos, "_getrev") and request.repos._getrev(rev) or rev
except vclib.InvalidRevision:
raise ViewVCException("Invalid revision", "404 Not Found")
if request.pathtype == vclib.DIR:
data["pathtype"] = "dir"
elif request.pathtype == vclib.FILE:
data["pathtype"] = "file"
if request.path_parts:
dir = _path_join(request.path_parts[:-1])
data["up_href"] = request.get_url(
view_func=view_directory, where=dir, pathtype=vclib.DIR, params={}, escape=1
)
if "roots" in cfg.options.allowed_views:
data["roots_href"] = request.get_url(view_func=view_roots, escape=1, params={})
if request.pathtype == vclib.FILE:
fvi = get_file_view_info(request, request.where, data["rev"], mime_type)
data["view_href"] = fvi.view_href
data["download_href"] = fvi.download_href
data["download_text_href"] = fvi.download_text_href
data["annotate_href"] = fvi.annotate_href
data["revision_href"] = fvi.revision_href
data["prefer_markup"] = fvi.prefer_markup
data["log_href"] = request.get_url(view_func=view_log, params={}, escape=1)
if request.roottype == "cvs" and cfg.options.use_cvsgraph:
data["graph_href"] = request.get_url(view_func=view_cvsgraph, params={}, escape=1)
file_data = request.repos.listdir(request.path_parts[:-1], request.pathrev, {})
entries = [item for item in file_data if item.name == request.path_parts[-1]]
if len(entries) == 1:
request.repos.dirlogs(request.path_parts[:-1], request.pathrev, entries, {})
data["lockinfo"] = entries[0].lockinfo
elif request.pathtype == vclib.DIR:
data["view_href"] = request.get_url(view_func=view_directory, params={}, escape=1)
if "tar" in cfg.options.allowed_views:
data["tarball_href"] = request.get_url(view_func=download_tarball, params={}, escape=1)
if request.roottype == "svn":
data["revision_href"] = request.get_url(
view_func=view_revision, params={"revision": data["rev"]}, escape=1
)
data["log_href"] = request.get_url(view_func=view_log, params={}, escape=1)
if is_querydb_nonempty_for_root(request):
if request.pathtype == vclib.DIR:
params = {}
if request.roottype == "cvs" and request.pathrev:
params["branch"] = request.pathrev
data["queryform_href"] = request.get_url(
view_func=view_queryform, params=params, escape=1
)
data["rss_href"] = request.get_url(
view_func=view_query, params={"date": "month", "format": "rss"}, escape=1
)
elif request.pathtype == vclib.FILE:
parts = _path_parts(request.where)
where = _path_join(parts[:-1])
data["rss_href"] = request.get_url(
view_func=view_query,
where=where,
pathtype=request.pathtype,
params={"date": "month", "format": "rss", "file": parts[-1], "file_match": "exact"},
escape=1,
)
return data
def retry_read(src, reqlen=CHUNK_SIZE):
while 1:
chunk = src.read(CHUNK_SIZE)
if not chunk:
# need to check for eof methods because the cStringIO file objects
# returned by ccvs don't provide them
if hasattr(src, "eof") and src.eof() is None:
time.sleep(1)
continue
return chunk
def copy_stream(src, dst, htmlize=0):
while 1:
chunk = retry_read(src)
if not chunk:
break
if htmlize:
chunk = sapi.escape(chunk)
dst.write(chunk)
class MarkupPipeWrapper:
"""An EZT callback that outputs a filepointer, plus some optional
pre- and post- text."""
def __init__(self, fp, pretext=None, posttext=None, htmlize=0):
self.fp = fp
self.pretext = pretext
self.posttext = posttext
self.htmlize = htmlize
def __call__(self, fp, ctx, filename, line_number):
if self.pretext:
fp.write(self.pretext)
copy_stream(self.fp, fp, self.htmlize)
self.fp.close()
if self.posttext:
fp.write(self.posttext)
_re_rewrite_escaped_url = re.compile(
r"((http|https|ftp|file|svn|svn\+ssh)"
r"(://[-a-zA-Z0-9%.~:_/]+)"
r"((\?|\&amp;amp;|\&amp;|\&)"
r"([-a-zA-Z0-9%.~:_]+)="
r"([-a-zA-Z0-9%.~:_])+)*"
r"(#([-a-zA-Z0-9%.~:_]+)?)?)"
)
def markup_escaped_urls(s):
# Return a copy of S with all URL references -- which are expected
# to be already HTML-escaped -- wrapped in <a href=""></a>.
def _url_repl(match_obj):
url = match_obj.group(0)
unescaped_url = url.replace("&amp;amp;", "&amp;")
return '<a href="%s">%s</a>' % (unescaped_url, url)
return re.sub(_re_rewrite_escaped_url, _url_repl, s)
def detect_encoding(text_block):
"""Return the encoding used by TEXT_BLOCK as detected by the chardet
Python module. (Currently, this is used only when syntax
highlighting is not enabled/available; otherwise, Pygments does this
work for us.)"""
# Does the TEXT_BLOCK start with a BOM?
for bom, encoding in [
(b"\xef\xbb\xbf", "utf-8"),
(b"\xff\xfe", "utf-16"),
(b"\xfe\xff", "utf-16be"),
(b"\xff\xfe\0\0", "utf-32"),
(b"\0\0\xfe\xff", "utf-32be"),
]:
if text_block.startswith(bom):
return encoding
# If no recognized BOM, see if chardet can help us.
try:
try:
import cchardet as chardet
except ImportError:
import chardet
# If chardet can confidently claimed a match, we'll use its
# findings. (And if that match is 'ascii' -- which is a subset of
# utf-8 -- we'll just call it 'utf-8' and score a zero transform.)
resp = chardet.detect(text_block)
if resp.get("confidence") == 1.0:
encoding = resp.get("encoding")
if encoding == "ascii":
encoding = "utf-8"
return encoding
except Exception:
pass
# By default ... we have no idea.
return None
# FIXME: This implementation expects (without ample Pygments API
# promises to guarantee it) that its .write() will be called exactly
# once per line. So far, it's worked out okay...
class CustomPygmentsSink:
def __init__(self):
self.colorized_file_lines = []
def write(self, buf):
self.colorized_file_lines.append(markup_escaped_urls(buf.rstrip("\n\r")))
def markup_file_contents(request, cfg, file_lines, filename, mime_type, encoding, colorize):
"""Perform syntax coloration via Pygments (where allowed and
possible; a lesser bit of HTML-ification otherwise) on FILE_LINES,
which is a list of bytestrings believed to be using character
ENCODING. Return those same lines, converted to Unicode strings and
colorized."""
# Nothing to mark up? So be it.
if not file_lines:
return []
# Determine if we should (and can) use Pygments to highlight our
# output. Reasons not to include a) being told not to by the
# configuration, b) not being able to import the Pygments modules,
# and c) Pygments not having a lexer for our file's format.
pygments_lexer = None
if colorize:
from pygments import highlight
from pygments.formatters import HtmlFormatter
from pygments.lexers import (
ClassNotFound,
get_lexer_for_mimetype,
get_lexer_for_filename,
guess_lexer,
)
# First, see if there's a Pygments lexer associated with MIME_TYPE.
if mime_type:
try:
pygments_lexer = get_lexer_for_mimetype(
mime_type, tabsize=cfg.options.tabsize, stripnl=False
)
except ClassNotFound:
pygments_lexer = None
# If we've no lexer thus far, try to find one based on the FILENAME.
if not pygments_lexer:
try:
pygments_lexer = get_lexer_for_filename(
filename, tabsize=cfg.options.tabsize, stripnl=False
)
except ClassNotFound:
pygments_lexer = None
# Still no lexer? If we've reason to believe this is a text
# file, try to guess the lexer based on the file's content.
if not pygments_lexer and is_text(mime_type) and file_lines:
try:
pygments_lexer = guess_lexer(
file_lines[0], tabsize=cfg.options.tabsize, stripnl=False
)
except (ClassNotFound, UnicodeDecodeError):
pygments_lexer = None
# If we aren't highlighting, just return FILE_LINES with URLs
# manually marked up and tabs manually expanded.
if not pygments_lexer:
def _poor_mans_markup(line):
line = line.expandtabs(cfg.options.tabsize)
return markup_escaped_urls(sapi.escape(line))
return [_poor_mans_markup(line) for line in file_lines]
# If we get here, we're letting Pygments highlight syntax.
ps = CustomPygmentsSink()
highlight(
"".join(file_lines),
pygments_lexer,
HtmlFormatter(nowrap=True, classprefix="pygments-", encoding=None),
ps,
)
return ps.colorized_file_lines
def empty_blame_item(line, line_no):
blame_item = vclib.Annotation(line, line_no, None, None, None, None)
blame_item.diff_href = None
return blame_item
def merge_blame_data(file_lines, blame_data):
errorful = 0
if blame_data and (len(file_lines) != len(blame_data)):
errorful = 1
blame_data = None
if not blame_data:
new_blame_data = []
for i in range(len(file_lines)):
line = file_lines[i]
if blame_data:
blame_data[i].text = line
else:
new_blame_data.append(empty_blame_item(line, i + 1))
return blame_data or new_blame_data, errorful
def make_time_string(date, cfg):
"""Returns formatted date string in either local time or UTC.
The passed in 'date' variable is seconds since epoch.
"""
if date is None:
return None
if cfg.options.use_localtime:
tm = time.localtime(date)
else:
tm = time.gmtime(date)
if cfg.options.iso8601_timestamps:
if cfg.options.use_localtime:
if tm[8] and time.daylight:
tz = -time.altzone
else:
tz = -time.timezone
if tz < 0:
tz = "-%02d:%02d" % (-tz // 3600, (-tz % 3600) // 60)
else:
tz = "+%02d:%02d" % (tz // 3600, (tz % 3600) // 60)
else:
tz = "Z"
return time.strftime("%Y-%m-%dT%H:%M:%S", tm) + tz
else:
tz_string = cfg.options.use_localtime and time.tzname[tm[8]] or "UTC"
return time.asctime(tm) + " " + tz_string
def make_rss_time_string(date, cfg):
"""Returns formatted date string in UTC, formatted for RSS.
The passed in 'date' variable is seconds since epoch.
"""
if date is None:
return None
return time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime(date)) + " UTC"
def make_comma_sep_list_string(items):
return ", ".join([x.name for x in items])
def is_undisplayable(val, encoding="utf-8"):
# FIXME: must revise usage later (hopefully, we can display val
# with encodings other than utf-8 ...)
if not isinstance(val, bytes):
val = val.encode("utf-8", "surrogateescape")
try:
val.decode(encoding, "strict")
return 0
except (UnicodeDecodeError, TypeError):
return 1
def get_itemprops(request, path_parts, rev):
itemprops = request.repos.itemprops(path_parts, rev)
propnames = sorted(itemprops.keys())
props = []
for name in propnames:
undisplayable = not isinstance(itemprops[name], str)
if not undisplayable:
lf = LogFormatter(request, itemprops[name])
value = lf.get(maxlen=0, htmlize=1)
else:
value = None
props.append(_item(name=name, value=value, undisplayable=ezt.boolean(undisplayable)))
return props
def parse_mime_type(mime_type):
mime_parts = [x.strip() for x in mime_type.split(";")]
type_subtype = mime_parts[0].lower()
parameters = {}
for part in mime_parts[1:]:
name, value = part.split("=", 1)
parameters[name] = value
return type_subtype, parameters
def calculate_mime_type(request, path_parts, rev):
"""Return a 2-tuple carrying the MIME content type and character
encoding for the file represented by PATH_PARTS in REV. Use REQUEST
for repository access as necessary."""
if not path_parts:
return None, None
mime_type = None
if request.roottype == "svn" and (not request.cfg.options.svn_ignore_mimetype):
try:
itemprops = request.repos.itemprops(path_parts, rev)
mime_type = itemprops.get("svn:mime-type")
if mime_type:
mime_type, parameters = parse_mime_type(mime_type)
return mime_type, parameters.get("charset")
except Exception:
pass
return guess_mime(path_parts[-1]), None
def assert_viewable_filesize(cfg, filesize):
if (
cfg.options.max_filesize_kbytes
and filesize != -1
and filesize > (1024 * cfg.options.max_filesize_kbytes)
):
raise ViewVCException(
"Display of files larger than %d KB "
"disallowed by configuration" % (cfg.options.max_filesize_kbytes),
"403 Forbidden",
)
def markup_or_annotate(request, is_annotate):
cfg = request.cfg
path, rev = _orig_path(request, is_annotate and "annotate" or "revision")
is_binary = False
lines = fp = image_src_href = None
annotation = "none"
revision = None
mime_type, encoding = calculate_mime_type(request, path, rev)
# Is this display blocked by 'binary_mime_types' configuration?
if is_binary_file_mime_type(mime_type, cfg):
fp, revision = request.repos.openfile(path, rev, {})
fp.close()
if check_freshness(request, None, revision, weak=1):
return
is_binary = True
if is_annotate:
annotation = "binary"
# If this is viewable image that we're allowed to show embedded, we
# need only resolve its revision and generate an image src=
# attribute URL for it.
elif is_viewable_image(mime_type) and "co" in cfg.options.allowed_views:
fp, revision = request.repos.openfile(path, rev, {})
fp.close()
if check_freshness(request, None, revision, weak=1):
return
if is_annotate:
annotation = "binary"
image_src_href = request.get_url(
view_func=view_checkout, params={"revision": rev}, escape=1
)
# If we get here, the request is not for an image that we can
# display embedded.
else:
# If configuration disallows display of large files, try to honor
# that request.
filesize = request.repos.filesize(path, rev)
assert_viewable_filesize(cfg, filesize)
# If this was an annotation request, try to annotate this file.
# If something goes wrong, that's okay -- we'll gracefully revert
# to a plain markup display.
blame_data = None
if is_annotate:
try:
blame_source, revision = request.repos.annotate(path, rev, False)
if check_freshness(request, None, revision, weak=1):
return
# Create BLAME_DATA list from BLAME_SOURCE, adding diff_href
# items to each relevant "line".
blame_data = []
for item in blame_source:
item.diff_href = None
if item.prev_rev:
item.diff_href = request.get_url(
view_func=view_diff,
params={"r1": item.prev_rev, "r2": item.rev},
escape=1,
partial=1,
)
blame_data.append(item)
annotation = "annotated"
except vclib.NonTextualFileContents:
annotation = "binary"
except Exception:
annotation = "error"
# Grab the file contents.
fp, revision = request.repos.openfile(path, rev, {"cvs_oldkeywords": 1})
if check_freshness(request, None, revision, weak=1):
fp.close()
return
# If we're limiting by filesize but couldn't pull off the cheap
# check above, we'll try to do so line by line here (while
# building our file_lines array).
if cfg.options.max_filesize_kbytes and filesize == -1:
file_lines = []
filesize = 0
while 1:
line = fp.readline()
if not line:
break
filesize = filesize + len(line)
assert_viewable_filesize(cfg, filesize)
file_lines.append(line)
else:
file_lines = fp.readlines()
fp.close()
# If allowed by configuration, try to detect the source encoding
# for this file. We'll assemble a block of data from the file
# contents to do so... 1024 bytes should be enough.
if not encoding and cfg.options.detect_encoding:
text_block = b""
for i in range(len(file_lines)):
text_block = text_block + file_lines[i]
if len(text_block) >= 2048:
break
encoding = detect_encoding(text_block)
if not encoding:
encoding = request.repos.content_encoding
# Decode the file's lines from the detected encoding to Unicode.
try:
for i in range(len(file_lines)):
line = file_lines[i]
try:
line = line.decode(encoding)
except UnicodeDecodeError:
if not cfg.options.allow_mojibake:
raise
line = line.decode(encoding, "surrogateescape")
file_lines[i] = line
except Exception:
is_binary = True
# Unless we've determined that the file is binary, try to colorize
# the file contents. If that fails, we'll give it another shot
# with colorization disabled.
if not is_binary:
colorize = cfg.options.enable_syntax_coloration
try:
lines = markup_file_contents(
request, cfg, file_lines, path[-1], mime_type, encoding, colorize
)
except Exception:
if colorize:
lines = markup_file_contents(
request, cfg, file_lines, path[-1], mime_type, encoding, False
)
else:
raise ViewVCException(
"Error displaying file contents", "500 Internal Server Error"
)
# Now, try to match up the annotation data (if any) with the file
# lines.
lines, errorful = merge_blame_data(lines, blame_data)
if errorful:
annotation = "error"
data = common_template_data(request, revision, mime_type)
data.merge(
TemplateData(
{
"mime_type": mime_type,
"log": None,
"date": None,
"ago": None,
"author": None,
"branches": None,
"tags": None,
"branch_points": None,
"changed": None,
"size": None,
"state": None,
"vendor_branch": None,
"prev": None,
"orig_path": None,
"orig_href": None,
"image_src_href": image_src_href,
"lines": lines,
"properties": get_itemprops(request, path, rev),
"is_binary": ezt.boolean(is_binary),
"annotation": annotation,
}
)
)
if cfg.options.show_log_in_markup:
options = {
"svn_latest_log": 1, # FIXME: Use of this magical value is uncool.
"svn_cross_copies": 1,
}
revs = request.repos.itemlog(path, revision, vclib.SORTBY_REV, 0, 1, options)
entry = revs[-1]
lf = LogFormatter(request, entry.log)
data["date"] = make_time_string(entry.date, cfg)
data["author"] = entry.author
data["changed"] = entry.changed
data["log"] = lf.get(maxlen=0, htmlize=1)
data["size"] = entry.size
if entry.date is not None:
data["ago"] = html_time(request, entry.date, 1)
if request.roottype == "cvs":
branch = entry.branch_number
prev = entry.prev or entry.parent
data["state"] = entry.dead and "dead"
data["prev"] = prev and prev.string
data["vendor_branch"] = ezt.boolean(branch and branch[2] % 2 == 1)
# TODO: Should this be using prep_tags() instead?
data["branches"] = make_comma_sep_list_string(entry.branches)
data["tags"] = make_comma_sep_list_string(entry.tags)
data["branch_points"] = make_comma_sep_list_string(entry.branch_points)
if path != request.path_parts:
orig_path = _path_join(path)
data["orig_path"] = orig_path
data["orig_href"] = request.get_url(
view_func=view_log,
where=orig_path,
pathtype=vclib.FILE,
params={"pathrev": revision},
escape=1,
)
generate_page(request, "file", data)
def view_markup(request):
if "markup" not in request.cfg.options.allowed_views:
raise ViewVCException("Markup view is disabled", "403 Forbidden")
if request.pathtype != vclib.FILE:
raise ViewVCException("Unsupported feature: markup view on directory", "400 Bad Request")
markup_or_annotate(request, 0)
def view_annotate(request):
if "annotate" not in request.cfg.options.allowed_views:
raise ViewVCException("Annotation view is disabled", "403 Forbidden")
if request.pathtype != vclib.FILE:
raise ViewVCException("Unsupported feature: annotate view on directory", "400 Bad Request")
markup_or_annotate(request, 1)
def revcmpkey(rev):
"""Normalize rev into list of integer, for compare"""
return list(map(int, rev.split(".")))
def sort_file_data(file_data, roottype, sortdir, sortby, group_dirs):
# convert sortdir into reverse parameter in sort() method
reverse = (sortdir == "down")
# in cvs, revision numbers can't be compared meaningfully between
# files, so try to do the right thing and compare dates instead
if roottype == "cvs" and sortby == "rev":
sortby = "date"
if group_dirs:
# if we're grouping directories together, sorting is pretty
# simple. a directory sorts "higher" than a non-directory, and
# two directories are sorted as normal.
# Note: True > False
latter = vclib.DIR if reverse else vclib.FILE
if sortby == 'rev':
# roottype is "svn" only, and the file has always rev value.
def key(x):
return (x.kind == latter, x.rev)
elif sortby == 'date':
def key(x):
# latest date is first
return (x.kind == latter, - x.sortkey('date', -1))
elif sortby == 'file':
# the key attibute name != sortby
def key(x):
return (x.kind == latter, x.name)
else:
# the key attibute name == sortby
def key(x):
return (x.kind == latter, x.sortkey(sortby))
else:
# If the file does not have revision data, which can be only in
# "cvs" roottype, the file should been placed after those files
# which have revision data, and it is sorted by its name.
if roottype == 'cvs':
if sortby == 'rev':
def key(x):
return ((x.rev is None) ^ reverse,
x.name if x.rev is None else x.rev)
elif sortby == 'date':
def key(x):
# latest date is first
return ((x.rev is None) ^ reverse,
x.name if x.rev is None
else - x.sortkey('date', -1))
elif sortby == 'file':
# the key attibute name != sortby
def key(x):
return ((x.rev is None) ^ reverse, x.name)
else:
# the key attibute name == sortby
def key(x):
return ((x.rev is None) ^ reverse,
x.name if x.rev is None else x.sortkey(sortby))
else:
# In case roottype is 'svn', we simply use sotrby key.
if sortby == 'date':
def key(x):
# latest date is first
return - x.sortkey('date', -1)
elif sortby == 'file':
# the key attibute name != sortby
key = attrgetter('name')
else:
# the key attibute name == sortby and can be None
def key(x):
return x.sortkey(sortby)
file_data.sort(key=key, reverse=reverse)
def view_roots(request):
if "roots" not in request.cfg.options.allowed_views:
raise ViewVCException("Root listing view is disabled", "403 Forbidden")
# add in the roots for the selection
roots = []
expand_root_parents(request.cfg)
allroots = list_roots(request)
if len(allroots):
rootnames = sorted(allroots.keys(), key=str.lower)
for rootname in rootnames:
root_path, root_type, lastmod = allroots[rootname]
href = request.get_url(
view_func=view_directory,
where="",
pathtype=vclib.DIR,
params={"root": rootname},
escape=1,
)
if root_type == vclib.SVN:
log_href = request.get_url(
view_func=view_log,
where="",
pathtype=vclib.DIR,
params={"root": rootname},
escape=1,
)
else:
log_href = None
roots.append(
_item(
name=request.server.escape(rootname),
type=root_type,
path=root_path,
author=lastmod and lastmod.author or None,
ago=lastmod and lastmod.ago or None,
date=lastmod and lastmod.date or None,
log=lastmod and lastmod.log or None,
short_log=lastmod and lastmod.short_log or None,
rev=lastmod and lastmod.rev or None,
href=href,
log_href=log_href,
)
)
data = common_template_data(request)
data.merge(
TemplateData(
{
"roots": roots,
"roots_shown": len(roots),
}
)
)
generate_page(request, "roots", data)
def view_directory(request):
cfg = request.cfg
# For Subversion repositories, the revision acts as a weak validator for
# the directory listing (to take into account template changes or
# revision property changes).
if request.roottype == "svn":
try:
rev = request.repos._getrev(request.pathrev)
except vclib.InvalidRevision:
raise ViewVCException("Invalid revision", "404 Not Found")
tree_rev = request.repos.created_rev(request.where, rev)
if check_freshness(request, None, str(tree_rev), weak=1):
return
# List current directory
options = {}
if request.roottype == "cvs":
hideattic = int(request.query_dict.get("hideattic", cfg.options.hide_attic))
options["cvs_subdirs"] = cfg.options.show_subdir_lastmod and cfg.options.show_logs
file_data = request.repos.listdir(request.path_parts, request.pathrev, options)
# sort with directories first, and using the "sortby" criteria
sortby = request.query_dict.get("sortby", cfg.options.sort_by) or "file"
sortdir = request.query_dict.get("sortdir", "up")
# when paging and sorting by filename, we can greatly improve
# performance by "cheating" -- first, we sort (we already have the
# names), then we just fetch dirlogs for the needed entries.
# however, when sorting by other properties or not paging, we've no
# choice but to fetch dirlogs for everything.
if cfg.options.dir_pagesize and sortby == "file":
dirlogs_first = int(request.query_dict.get("dir_pagestart", 0))
if dirlogs_first > len(file_data):
dirlogs_first = 0
dirlogs_last = dirlogs_first + cfg.options.dir_pagesize
for file in file_data:
file.rev = None
file.date = None
file.log = None
file.author = None
file.size = None
file.lockinfo = None
file.dead = None
sort_file_data(file_data, request.roottype, sortdir, sortby, cfg.options.sort_group_dirs)
# request dirlogs only for the slice of files in "this page"
request.repos.dirlogs(
request.path_parts, request.pathrev, file_data[dirlogs_first:dirlogs_last], options
)
else:
request.repos.dirlogs(request.path_parts, request.pathrev, file_data, options)
sort_file_data(file_data, request.roottype, sortdir, sortby, cfg.options.sort_group_dirs)
# If a regex is specified, build a compiled form thereof for filtering
searchstr = None
search_re = request.query_dict.get("search", "")
if cfg.options.use_re_search and search_re:
searchstr = re.compile(search_re)
# loop through entries creating rows and changing these values
rows = []
dirs_displayed = files_displayed = 0
num_dead = 0
# set some values to be used inside loop
where = request.where
where_prefix = where and where + "/"
for file in file_data:
file_where = where_prefix + file.name
if is_dir_ignored_file(file.name, cfg):
continue
row = _item(
author=None,
log=None,
short_log=None,
state=None,
size=None,
log_file=None,
log_rev=None,
graph_href=None,
mime_type=None,
date=None,
ago=None,
view_href=None,
log_href=None,
revision_href=None,
annotate_href=None,
download_href=None,
download_text_href=None,
prefer_markup=ezt.boolean(0),
is_viewable_image=ezt.boolean(0),
is_binary=ezt.boolean(0),
)
if request.roottype == "cvs":
if file.absent:
continue
if cfg.options.hide_errorful_entries and file.errors:
continue
row.rev = file.rev
row.author = file.author
row.state = (request.roottype == "cvs" and file.dead) and "dead" or ""
if file.date is not None:
row.date = make_time_string(file.date, cfg)
row.ago = html_time(request, file.date)
if cfg.options.show_logs:
lf = LogFormatter(request, file.log)
row.log = lf.get(maxlen=0, htmlize=1)
row.short_log = lf.get(maxlen=cfg.options.short_log_len, htmlize=1)
row.lockinfo = file.lockinfo
row.anchor = request.server.escape(file.name)
row.name = request.server.escape(file.name)
row.pathtype = (file.kind == vclib.FILE and "file") or (file.kind == vclib.DIR and "dir")
row.errors = file.errors
if file.kind == vclib.DIR:
if cfg.options.hide_cvsroot and is_cvsroot_path(
request.roottype, request.path_parts + [file.name]
):
continue
dirs_displayed += 1
row.view_href = request.get_url(
view_func=view_directory, where=file_where, pathtype=vclib.DIR, params={}, escape=1
)
if request.roottype == "svn":
row.revision_href = request.get_url(
view_func=view_revision, params={"revision": file.rev}, escape=1
)
if request.roottype == "cvs" and file.rev is not None:
row.rev = None
if cfg.options.show_logs:
row.log_file = request.server.escape(file.newest_file)
row.log_rev = file.rev
if request.roottype == "svn":
row.log_href = request.get_url(
view_func=view_log, where=file_where, pathtype=vclib.DIR, params={}, escape=1
)
elif file.kind == vclib.FILE:
if searchstr is not None:
if request.roottype == "cvs" and (file.errors or file.dead):
continue
if not search_file(
request.repos, request.path_parts + [file.name], request.pathrev, searchstr
):
continue
if request.roottype == "cvs" and file.dead:
num_dead = num_dead + 1
if hideattic:
continue
files_displayed += 1
if request.roottype == "svn":
row.size = file.size
row.mime_type, encoding = calculate_mime_type(
request, _path_parts(file_where), file.rev
)
fvi = get_file_view_info(request, file_where, file.rev, row.mime_type)
row.view_href = fvi.view_href
row.download_href = fvi.download_href
row.download_text_href = fvi.download_text_href
row.annotate_href = fvi.annotate_href
row.revision_href = fvi.revision_href
row.prefer_markup = fvi.prefer_markup
row.is_viewable_image = fvi.is_viewable_image
row.is_binary = fvi.is_binary
row.log_href = request.get_url(
view_func=view_log, where=file_where, pathtype=vclib.FILE, params={}, escape=1
)
if cfg.options.use_cvsgraph and request.roottype == "cvs":
row.graph_href = request.get_url(
view_func=view_cvsgraph,
where=file_where,
pathtype=vclib.FILE,
params={},
escape=1,
)
rows.append(row)
# Prepare the data that will be passed to the template, based on the
# common template data.
data = common_template_data(request)
data.merge(
TemplateData(
{
"entries": rows,
"sortby": sortby,
"sortdir": sortdir,
"search_re": request.server.escape(search_re),
"dir_pagestart": None,
"sortby_file_href": request.get_url(
params={"sortby": "file", "sortdir": None}, escape=1
),
"sortby_rev_href": request.get_url(
params={"sortby": "rev", "sortdir": None}, escape=1
),
"sortby_date_href": request.get_url(
params={"sortby": "date", "sortdir": None}, escape=1
),
"sortby_author_href": request.get_url(
params={"sortby": "author", "sortdir": None}, escape=1
),
"sortby_log_href": request.get_url(
params={"sortby": "log", "sortdir": None}, escape=1
),
"files_shown": files_displayed,
"dirs_shown": dirs_displayed,
"num_dead": num_dead,
"youngest_rev": None,
"youngest_rev_href": None,
"selection_form": None,
"attic_showing": None,
"show_attic_href": None,
"hide_attic_href": None,
"branch_tags": None,
"plain_tags": None,
"properties": get_itemprops(request, request.path_parts, request.pathrev),
"tree_rev": None,
"tree_rev_href": None,
"dir_paging_action": None,
"dir_paging_hidden_values": [],
"search_re_action": None,
"search_re_hidden_values": [],
# Populated by paging()/paging_sws()
"picklist": [],
"picklist_len": 0,
# Populated by pathrev_form()
"pathrev_action": None,
"pathrev_hidden_values": [],
"pathrev_clear_action": None,
"pathrev_clear_hidden_values": [],
"pathrev": None,
"lastrev": None,
}
)
)
# clicking on sort column reverses sort order
if sortdir == "down":
revsortdir = None # 'up'
else:
revsortdir = "down"
if sortby in ["file", "rev", "date", "log", "author"]:
data["sortby_%s_href" % sortby] = request.get_url(params={"sortdir": revsortdir}, escape=1)
# CVS doesn't support sorting by rev
if request.roottype == "cvs":
data["sortby_rev_href"] = None
# set cvs-specific fields
if request.roottype == "cvs":
plain_tags = options["cvs_tags"]
plain_tags.sort(key=str.lower, reverse=True)
data["plain_tags"] = []
for plain_tag in plain_tags:
data["plain_tags"].append(_item(name=plain_tag, revision=None))
branch_tags = options["cvs_branches"]
branch_tags.sort(key=str.lower, reverse=True)
data["branch_tags"] = []
for branch_tag in branch_tags:
data["branch_tags"].append(_item(name=branch_tag, revision=None))
data["attic_showing"] = ezt.boolean(not hideattic)
data["show_attic_href"] = request.get_url(params={"hideattic": 0}, escape=1)
data["hide_attic_href"] = request.get_url(params={"hideattic": 1}, escape=1)
# set svn-specific fields
elif request.roottype == "svn":
data["tree_rev"] = tree_rev
data["tree_rev_href"] = request.get_url(
view_func=view_revision, params={"revision": tree_rev}, escape=1
)
data["youngest_rev"] = request.repos.get_youngest_revision()
data["youngest_rev_href"] = request.get_url(view_func=view_revision, params={}, escape=1)
if cfg.options.dir_pagesize:
data["dir_paging_action"], data["dir_paging_hidden_values"] = request.get_form(
params={"dir_pagestart": None}
)
pathrev_form(request, data)
if cfg.options.use_re_search:
data["search_re_action"], data["search_re_hidden_values"] = request.get_form(
params={"search": None}
)
if cfg.options.dir_pagesize:
data["dir_pagestart"] = int(request.query_dict.get("dir_pagestart", 0))
data["entries"] = paging(
data, "entries", data["dir_pagestart"], "name", cfg.options.dir_pagesize
)
generate_page(request, "directory", data)
def paging(data, key, pagestart, local_name, pagesize):
# Implement paging
# Create the picklist
picklist = data["picklist"] = []
for i in range(0, len(data[key]), pagesize):
pick = _item(start=None, end=None, count=None, more=ezt.boolean(0))
pick.start = getattr(data[key][i], local_name)
pick.count = i
pick.page = (i // pagesize) + 1
try:
pick.end = getattr(data[key][i + pagesize - 1], local_name)
except IndexError:
pick.end = getattr(data[key][-1], local_name)
picklist.append(pick)
data["picklist_len"] = len(picklist)
# Need to fix
# pagestart can be greater than the length of data[key] if you
# select a tag or search while on a page other than the first.
# Should reset to the first page, this test won't do that every
# time that it is needed.
# Problem might go away if we don't hide non-matching files when
# selecting for tags or searching.
if pagestart > len(data[key]):
pagestart = 0
pageend = pagestart + pagesize
# Slice
return data[key][pagestart:pageend]
def paging_sws(data, key, pagestart, local_name, pagesize, extra_pages, offset):
"""Implement sliding window-style paging."""
# Create the picklist
last_requested = pagestart + (extra_pages * pagesize)
picklist = data["picklist"] = []
for i in range(0, len(data[key]), pagesize):
pick = _item(start=None, end=None, count=None, more=ezt.boolean(0))
pick.start = getattr(data[key][i], local_name)
pick.count = offset + i
pick.page = (pick.count // pagesize) + 1
try:
pick.end = getattr(data[key][i + pagesize - 1], local_name)
except IndexError:
pick.end = getattr(data[key][-1], local_name)
picklist.append(pick)
if pick.count >= last_requested:
pick.more = ezt.boolean(1)
break
data["picklist_len"] = len(picklist)
first = pagestart - offset
# FIXME: first can be greater than the length of data[key] if
# you select a tag or search while on a page other than the first.
# Should reset to the first page, but this test won't do that every
# time that it is needed. Problem might go away if we don't hide
# non-matching files when selecting for tags or searching.
if first > len(data[key]):
pagestart = 0
pageend = first + pagesize
# Slice
return data[key][first:pageend]
def pathrev_form(request, data):
lastrev = None
if request.roottype == "svn":
data["pathrev_action"], data["pathrev_hidden_values"] = request.get_form(
view_func=redirect_pathrev,
params={
"pathrev": None,
"orig_path": request.where,
"orig_pathtype": request.pathtype,
"orig_pathrev": request.pathrev,
"orig_view": _view_codes.get(request.view_func),
},
)
if request.pathrev:
youngest = request.repos.get_youngest_revision()
lastrev = request.repos.last_rev(request.where, request.pathrev, youngest)[0]
if lastrev == youngest:
lastrev = None
data["pathrev"] = request.pathrev
data["lastrev"] = lastrev
action, hidden_values = request.get_form(params={"pathrev": lastrev})
if request.roottype != "svn":
data["pathrev_action"] = action
data["pathrev_hidden_values"] = hidden_values
data["pathrev_clear_action"] = action
data["pathrev_clear_hidden_values"] = hidden_values
return lastrev
def redirect_pathrev(request):
assert request.roottype == "svn"
new_pathrev = request.query_dict.get("pathrev") or None
path = request.query_dict.get("orig_path", "")
pathtype = request.query_dict.get("orig_pathtype")
pathrev = request.query_dict.get("orig_pathrev")
view = _views.get(request.query_dict.get("orig_view"))
youngest = request.repos.get_youngest_revision()
# go out of the way to allow revision numbers higher than youngest
try:
new_pathrev = int(new_pathrev)
except ValueError:
new_pathrev = youngest
except TypeError:
pass
else:
if new_pathrev > youngest:
new_pathrev = youngest
if _repos_pathtype(request.repos, _path_parts(path), new_pathrev):
pathrev = new_pathrev
else:
pathrev, path = request.repos.last_rev(path, pathrev, new_pathrev)
# allow clearing sticky revision by submitting empty string
if new_pathrev is None and pathrev == youngest:
pathrev = None
request.server.redirect(
request.get_url(view_func=view, where=path, pathtype=pathtype, params={"pathrev": pathrev})
)
def view_log(request):
cfg = request.cfg
diff_format = request.query_dict.get("diff_format", cfg.options.diff_format)
pathtype = request.pathtype
if pathtype is vclib.DIR:
if request.roottype == "cvs":
raise ViewVCException(
"Unsupported feature: log view on CVS directory", "400 Bad Request"
)
mime_type = encoding = None
else:
mime_type, encoding = calculate_mime_type(request, request.path_parts, request.pathrev)
options = {}
options["svn_show_all_dir_logs"] = 1 # TODO: someday make this optional?
options["svn_cross_copies"] = cfg.options.cross_copies
logsort = request.query_dict.get("logsort", cfg.options.log_sort)
if request.roottype == "svn":
sortby = vclib.SORTBY_DEFAULT
logsort = None
else:
if logsort == "date":
sortby = vclib.SORTBY_DATE
elif logsort == "rev":
sortby = vclib.SORTBY_REV
else:
sortby = vclib.SORTBY_DEFAULT
first = last = 0
log_pagestart = None
if cfg.options.log_pagesize:
log_pagestart = int(request.query_dict.get("log_pagestart", 0))
total = cfg.options.log_pagesextra * cfg.options.log_pagesize
first = log_pagestart - min(log_pagestart, total)
last = log_pagestart + (total + cfg.options.log_pagesize) + 1
show_revs = request.repos.itemlog(
request.path_parts, request.pathrev, sortby, first, last - first, options
)
# selected revision
selected_rev = request.query_dict.get("r1")
entries = []
name_printed = {}
cvs = request.roottype == "cvs"
for rev in show_revs:
entry = _item()
entry.rev = rev.string
entry.state = cvs and rev.dead and "dead"
entry.author = rev.author
entry.changed = rev.changed
entry.date = make_time_string(rev.date, cfg)
entry.ago = None
if rev.date is not None:
entry.ago = html_time(request, rev.date, 1)
entry.size = rev.size
entry.lockinfo = rev.lockinfo
entry.branch_point = None
entry.next_main = None
entry.orig_path = None
entry.copy_path = None
lf = LogFormatter(request, rev.log or "")
entry.log = lf.get(maxlen=0, htmlize=1)
entry.view_href = None
entry.download_href = None
entry.download_text_href = None
entry.annotate_href = None
entry.revision_href = None
entry.sel_for_diff_href = None
entry.diff_to_sel_href = None
entry.diff_to_prev_href = None
entry.diff_to_branch_href = None
entry.diff_to_main_href = None
if request.roottype == "cvs":
prev = rev.prev or rev.parent
entry.prev = prev and prev.string
branch = rev.branch_number
entry.vendor_branch = ezt.boolean(branch and branch[2] % 2 == 1)
entry.branches = prep_tags(request, rev.branches)
entry.tags = prep_tags(request, rev.tags)
entry.branch_points = prep_tags(request, rev.branch_points)
entry.tag_names = [x.name for x in rev.tags]
if branch and branch not in name_printed:
entry.branch_names = [x.name for x in rev.branches]
name_printed[branch] = 1
else:
entry.branch_names = []
if rev.parent and rev.parent is not prev and not entry.vendor_branch:
entry.branch_point = rev.parent.string
# if it's the last revision on a branch then diff against the
# last revision on the higher branch (e.g. change is committed and
# brought over to -stable)
if not rev.next and rev.parent and rev.parent.next:
r = rev.parent.next
while r.next:
r = r.next
entry.next_main = r.string
elif request.roottype == "svn":
entry.prev = rev.prev and rev.prev.string
entry.branches = entry.tags = entry.branch_points = []
entry.tag_names = entry.branch_names = []
entry.vendor_branch = None
if rev.filename != request.where:
entry.orig_path = rev.filename
entry.copy_path = rev.copy_path
entry.copy_rev = rev.copy_rev
if entry.orig_path:
entry.orig_href = request.get_url(
view_func=view_log,
where=entry.orig_path,
pathtype=vclib.FILE,
params={"pathrev": rev.string},
escape=1,
)
if rev.copy_path:
entry.copy_href = request.get_url(
view_func=view_log,
where=rev.copy_path,
pathtype=vclib.FILE,
params={"pathrev": rev.copy_rev},
escape=1,
)
# view/download links
if pathtype is vclib.FILE:
fvi = get_file_view_info(request, request.where, rev.string, mime_type)
entry.view_href = fvi.view_href
entry.download_href = fvi.download_href
entry.download_text_href = fvi.download_text_href
entry.annotate_href = fvi.annotate_href
entry.revision_href = fvi.revision_href
entry.prefer_markup = fvi.prefer_markup
else:
entry.revision_href = request.get_url(
view_func=view_revision, params={"revision": rev.string}, escape=1
)
entry.view_href = request.get_url(
view_func=view_directory,
where=rev.filename,
pathtype=vclib.DIR,
params={"pathrev": rev.string},
escape=1,
)
# calculate diff links
if selected_rev != entry.rev:
entry.sel_for_diff_href = request.get_url(
view_func=view_log,
params={"r1": entry.rev, "log_pagestart": log_pagestart},
escape=1,
)
if entry.prev is not None:
entry.diff_to_prev_href = request.get_url(
view_func=view_diff,
params={"r1": entry.prev, "r2": entry.rev, "diff_format": None},
escape=1,
)
if (
selected_rev
and (selected_rev != str(entry.rev))
and (selected_rev != str(entry.prev))
and (selected_rev != str(entry.branch_point))
and (selected_rev != str(entry.next_main))
):
entry.diff_to_sel_href = request.get_url(
view_func=view_diff,
params={"r1": selected_rev, "r2": entry.rev, "diff_format": None},
escape=1,
)
if entry.next_main:
entry.diff_to_main_href = request.get_url(
view_func=view_diff,
params={"r1": entry.next_main, "r2": entry.rev, "diff_format": None},
escape=1,
)
if entry.branch_point:
entry.diff_to_branch_href = request.get_url(
view_func=view_diff,
params={"r1": entry.branch_point, "r2": entry.rev, "diff_format": None},
escape=1,
)
# Save our escaping until the end so stuff above works
if entry.orig_path:
entry.orig_path = request.server.escape(entry.orig_path)
if entry.copy_path:
entry.copy_path = request.server.escape(entry.copy_path)
entries.append(entry)
diff_select_action, diff_select_hidden_values = request.get_form(
view_func=view_diff,
params={"r1": None, "r2": None, "tr1": None, "tr2": None, "diff_format": None},
)
logsort_action, logsort_hidden_values = request.get_form(params={"logsort": None})
data = common_template_data(request)
data.merge(
TemplateData(
{
"default_branch": None,
"mime_type": mime_type,
"rev_selected": selected_rev,
"diff_format": diff_format,
"logsort": logsort,
"human_readable": ezt.boolean(diff_format in ("f", "h", "l")),
"log_pagestart": None,
"log_paging_action": None,
"log_paging_hidden_values": [],
"entries": entries,
"head_prefer_markup": ezt.boolean(0),
"head_view_href": None,
"head_download_href": None,
"head_download_text_href": None,
"head_annotate_href": None,
"tag_prefer_markup": ezt.boolean(0),
"tag_view_href": None,
"tag_download_href": None,
"tag_download_text_href": None,
"tag_annotate_href": None,
"diff_select_action": diff_select_action,
"diff_select_hidden_values": diff_select_hidden_values,
"logsort_action": logsort_action,
"logsort_hidden_values": logsort_hidden_values,
"tags": [],
"branch_tags": [],
"plain_tags": [],
# Populated by paging()/paging_sws()
"picklist": [],
"picklist_len": 0,
# Populated by pathrev_form()
"pathrev_action": None,
"pathrev_hidden_values": [],
"pathrev_clear_action": None,
"pathrev_clear_hidden_values": [],
"pathrev": None,
"lastrev": None,
}
)
)
lastrev = pathrev_form(request, data)
if pathtype is vclib.FILE:
if not request.pathrev or lastrev is None:
fvi = get_file_view_info(request, request.where, None, mime_type, None)
data["head_view_href"] = fvi.view_href
data["head_download_href"] = fvi.download_href
data["head_download_text_href"] = fvi.download_text_href
data["head_annotate_href"] = fvi.annotate_href
data["head_prefer_markup"] = fvi.prefer_markup
if request.pathrev and request.roottype == "cvs":
fvi = get_file_view_info(request, request.where, None, mime_type)
data["tag_view_href"] = fvi.view_href
data["tag_download_href"] = fvi.download_href
data["tag_download_text_href"] = fvi.download_text_href
data["tag_annotate_href"] = fvi.annotate_href
data["tag_prefer_markup"] = fvi.prefer_markup
else:
data["head_view_href"] = request.get_url(view_func=view_directory, params={}, escape=1)
taginfo = options.get("cvs_tags", {})
tagitems = sorted(taginfo.items(), reverse=True)
main = taginfo.get("MAIN")
if main:
# Default branch may have multiple names so we list them
branches = []
for branch in main.aliases:
# Don't list MAIN
if branch is not main:
branches.append(branch)
data["default_branch"] = prep_tags(request, branches)
for tag, rev in tagitems:
rev_str = None
if rev.number:
rev_str = ".".join(map(str, rev.number))
if rev.co_rev:
data["tags"].append(_item(rev=rev.co_rev.string, name=tag))
if rev.is_branch:
data["branch_tags"].append(_item(name=tag, revision=rev_str))
else:
data["plain_tags"].append(_item(name=tag, revision=rev_str))
if cfg.options.log_pagesize:
data["log_paging_action"], data["log_paging_hidden_values"] = request.get_form(
params={
"log_pagestart": None,
"r1": selected_rev,
}
)
data["log_pagestart"] = int(request.query_dict.get("log_pagestart", 0))
data["entries"] = paging_sws(
data,
"entries",
data["log_pagestart"],
"rev",
cfg.options.log_pagesize,
cfg.options.log_pagesextra,
first,
)
generate_page(request, "log", data)
def view_checkout(request):
cfg = request.cfg
if "co" not in cfg.options.allowed_views:
raise ViewVCException("Checkout view is disabled", "403 Forbidden")
if request.pathtype != vclib.FILE:
raise ViewVCException("Unsupported feature: checkout view on directory", "400 Bad Request")
path, rev = _orig_path(request)
fp, revision = request.repos.openfile(path, rev, {})
# The revision number acts as a strong validator.
if not check_freshness(request, None, revision):
mime_type, encoding = calculate_mime_type(request, path, rev)
mime_type = request.query_dict.get("content-type") or mime_type or "text/plain"
server_fp = get_writeready_server_file(request, mime_type, encoding)
copy_stream(fp, server_fp)
fp.close()
def cvsgraph_make_reqopt(request, cfgname, queryparam, optvalue):
# Return a cvsgraph custom option substring bit OPTVALUE based on
# CFGNAME's presence in the allowed list of user-configurable
# options and QUERYPARAM's presence and boolean interpretation in
# the actual request; otherwise, return the empty string for options
# that either aren't overridden or aren't allowed to be overridden.
if (cfgname in request.cfg.options.allowed_cvsgraph_useropts) and (
int(request.query_dict.get(queryparam, 0))
):
return optvalue
return ""
def cvsgraph_normalize_gshow(request):
# Return the effective value of the 'gshow' query parameter, noting
# that a missing parameter is the same as gshow=all, and treating a
# bogus parameter value as the same as gshow=all, too.
gshow = request.query_dict.get("gshow", "all")
if gshow not in ("all", "inittagged", "tagged"):
gshow = "all"
return gshow
def cvsgraph_extraopts(request):
# Build a set of -O options for controlling cvsgraph's behavior,
# based on what the user has requested and filtered against what the
# user is allowed to request.
cfg = request.cfg
ep = "-O"
# Simple mappings of boolean flags
ep = ep + cvsgraph_make_reqopt(request, "invert", "gflip", ";upside_down=true")
ep = ep + cvsgraph_make_reqopt(request, "branchbox", "gbbox", ";branch_dupbox=true")
ep = ep + cvsgraph_make_reqopt(request, "rotate", "gleft", ";left_right=true")
# Stripping is a little more complex.
if "show" in cfg.options.allowed_cvsgraph_useropts:
gshow = cvsgraph_normalize_gshow(request)
if gshow == "inittagged":
ep = ep + ";strip_untagged=true"
elif gshow == "tagged":
ep = ep + ";strip_untagged=true;strip_first_rev=true"
# And tag limitation has a user-supplied value to mess with.
if ("limittags" in cfg.options.allowed_cvsgraph_useropts) and "gmaxtag" in request.query_dict:
ep = ep + ";rev_maxtags=" + request.query_dict["gmaxtag"]
return ep + ";"
def view_cvsgraph_image(request):
"output the image rendered by cvsgraph"
# this function is derived from cgi/cvsgraphmkimg.cgi
cfg = request.cfg
if not cfg.options.use_cvsgraph:
raise ViewVCException("Graph view is disabled", "403 Forbidden")
# If cvsgraph can't find its supporting libraries, uncomment and set
# accordingly. Do the same in view_cvsgraph().
#
# os.environ['LD_LIBRARY_PATH'] = '/usr/lib:/usr/local/lib:/path/to/cvsgraph'
rcsfile = request.repos.rcsfile(request.path_parts)
fp = request.repos.cvsgraph_popen(
(
"-c",
cfg.path(cfg.options.cvsgraph_conf),
"-r",
request.repos.rootpath,
cvsgraph_extraopts(request),
rcsfile,
),
)
copy_stream(fp, get_writeready_server_file(request, "image/png"))
fp.close()
def view_cvsgraph(request):
"output a page containing an image rendered by cvsgraph"
cfg = request.cfg
if not cfg.options.use_cvsgraph:
raise ViewVCException("Graph view is disabled", "403 Forbidden")
# If cvsgraph can't find its supporting libraries, uncomment and set
# accordingly. Do the same in view_cvsgraph_image().
#
# os.environ['LD_LIBRARY_PATH'] = '/usr/lib:/usr/local/lib:/path/to/cvsgraph'
imagesrc = request.get_url(view_func=view_cvsgraph_image, escape=1)
mime_type = guess_mime(request.where)
view = default_view(mime_type, cfg)
up_where = _path_join(request.path_parts[:-1])
# Create an image map
rcsfile = request.repos.rcsfile(request.path_parts)
fp = request.repos.cvsgraph_popen(
(
"-i",
"-c",
cfg.path(cfg.options.cvsgraph_conf),
"-r",
request.repos.rootpath,
"-x",
"x",
"-3",
request.get_url(view_func=view_log, params={}, escape=1),
"-4",
request.get_url(view_func=view, params={"revision": None}, escape=1, partial=1),
"-5",
request.get_url(
view_func=view_diff, params={"r1": None, "r2": None}, escape=1, partial=1
),
"-6",
request.get_url(
view_func=view_directory,
where=up_where,
pathtype=vclib.DIR,
params={"pathrev": None},
escape=1,
partial=1,
),
cvsgraph_extraopts(request),
rcsfile,
),
is_text=True,
)
graph_action, graph_hidden_values = request.get_form(view_func=view_cvsgraph, params={})
allowed_cvsgraph_useropts = cfg.options.allowed_cvsgraph_useropts
data = common_template_data(request)
data.merge(
TemplateData(
{
"imagemap": fp,
"imagesrc": imagesrc,
"graph_action": graph_action,
"graph_hidden_values": graph_hidden_values,
"opt_gflip": ezt.boolean("invert" in allowed_cvsgraph_useropts),
"opt_gbbox": ezt.boolean("branchbox" in allowed_cvsgraph_useropts),
"opt_gshow": ezt.boolean("show" in allowed_cvsgraph_useropts),
"opt_gleft": ezt.boolean("rotate" in allowed_cvsgraph_useropts),
"opt_gmaxtag": ezt.boolean("limittags" in allowed_cvsgraph_useropts),
"gflip": ezt.boolean(int(request.query_dict.get("gflip", 0))),
"gbbox": ezt.boolean(int(request.query_dict.get("gbbox", 0))),
"gleft": ezt.boolean(int(request.query_dict.get("gleft", 0))),
"gmaxtag": request.query_dict.get("gmaxtag", 0),
"gshow": cvsgraph_normalize_gshow(request),
}
)
)
generate_page(request, "graph", data)
def search_file(repos, path_parts, rev, search_re):
"""Return 1 iff the contents of the file at PATH_PARTS in REPOS as
of revision REV matches regular expression SEARCH_RE."""
# Read in each line of a checked-out file, and then use re.search to
# search line.
fp = repos.openfile(path_parts, rev, {})[0]
matches = 0
while 1:
line = fp.readline()
if not line:
break
# FIXME: Is there what can we do about file encoding?
if search_re.search(line.decode("utf-8", "surrogateescape")):
matches = 1
fp.close()
break
return matches
def view_doc(request):
"""Serve ViewVC static content locally.
Using this avoids the need for modifying the setup of the web server.
"""
cfg = request.cfg
document = request.where
filename = cfg.path(os.path.join(cfg.options.template_dir, "docroot", document))
# Stat the file to get content length and last-modified date.
try:
info = os.stat(filename)
except OSError as v:
raise ViewVCException(
'Static file "%s" not available (%s)' % (document, str(v)), "404 Not Found"
)
content_length = str(info[stat.ST_SIZE])
last_modified = info[stat.ST_MTIME]
# content_length + mtime makes a pretty good etag.
if check_freshness(request, last_modified, "%s-%s" % (content_length, last_modified)):
return
try:
fp = open(filename, "rb")
except IOError as v:
raise ViewVCException(
'Static file "%s" not available (%s)' % (document, str(v)), "404 Not Found"
)
if document[-3:] == "png":
mime_type = "image/png"
elif document[-3:] == "jpg":
mime_type = "image/jpeg"
elif document[-3:] == "gif":
mime_type = "image/gif"
elif document[-3:] == "css":
mime_type = "text/css"
else: # assume HTML
mime_type = None
copy_stream(fp, get_writeready_server_file(request, mime_type, content_length=content_length))
fp.close()
def rcsdiff_date_reformat(date_str, cfg):
if date_str is None:
return None
try:
date = vclib.ccvs.cvs_strptime(date_str)
except ValueError:
return date_str
return make_time_string(calendar.timegm(date), cfg)
_re_extract_rev = re.compile(r"^[-+*]{3} [^\t]+\t([^\t]+)\t((\d+\.)*\d+)$")
_re_extract_info = re.compile(r"@@ \-([0-9]+).*\+([0-9]+).*@@(.*)")
class DiffSource:
def __init__(self, fp, cfg):
self.fp = fp
self.cfg = cfg
self.save_line = None
self.line_number = None
self.prev_line_number = None
# keep track of where we are during an iteration
self.idx = -1
self.last = None
# these will be set once we start reading
self.state = "no-changes"
self.left_col = []
self.right_col = []
def __getitem__(self, idx):
if idx == self.idx:
return self.last
if idx != self.idx + 1:
raise DiffSequencingError()
# keep calling _get_row until it gives us something. sometimes, it
# doesn't return a row immediately because it is accumulating changes.
# when it is out of data, _get_row will raise IndexError.
while 1:
item = self._get_row()
if item:
self.idx = idx
self.last = item
return item
def _format_text(self, text):
text = text.rstrip("\r\n")
if self.cfg.options.tabsize > 0:
text = text.expandtabs(self.cfg.options.tabsize)
hr_breakable = self.cfg.options.hr_breakable
# in the code below, "\x01" will be our stand-in for "&". We don't want
# to insert "&" because it would get escaped by sapi.escape(). Similarly,
# we use "\x02" as a stand-in for "<br>"
if hr_breakable > 1 and len(text) > hr_breakable:
text = re.sub("(" + ("." * hr_breakable) + ")", "\\1\x02", text)
if hr_breakable:
# make every other space "breakable"
text = text.replace(" ", " \x01nbsp;")
else:
text = text.replace(" ", "\x01nbsp;")
text = sapi.escape(text)
text = text.replace("\x01", "&")
text = text.replace("\x02", '<span style="color:red">\\</span><br />')
return text
def _get_row(self):
if self.state[:5] == "flush":
item = self._flush_row()
if item:
return item
self.state = "dump"
if self.save_line:
line = self.save_line
self.save_line = None
else:
line = self.fp.readline()
if not line:
if self.state == "no-changes":
self.state = "done"
return _item(type=_RCSDIFF_NO_CHANGES)
# see if there are lines to flush
if self.left_col or self.right_col:
# move into the flushing state
self.state = "flush-" + self.state
return None
# nothing more to return
raise IndexError
if line[:2] == "@@":
self.state = "dump"
self.left_col = []
self.right_col = []
match = _re_extract_info.match(line)
self.line_number = int(match.group(2)) - 1
self.prev_line_number = int(match.group(1)) - 1
return _item(
type="header",
line_info_left=match.group(1),
line_info_right=match.group(2),
line_info_extra=self._format_text(match.group(3)),
)
if line[0] == "\\":
# \ No newline at end of file
# Just skip. This code used to move to flush state, but that resulted in
# changes being displayed as removals-and-readditions.
return None
diff_code = line[0]
output = self._format_text(line[1:])
if diff_code == "+":
if self.state == "dump":
self.line_number = self.line_number + 1
return _item(type="add", right=output, line_number=self.line_number)
self.state = "pre-change-add"
self.right_col.append(output)
return None
if diff_code == "-":
self.state = "pre-change-remove"
self.left_col.append(output)
return None # early exit to avoid line in
if self.left_col or self.right_col:
# save the line for processing again later, and move into the
# flushing state
self.save_line = line
self.state = "flush-" + self.state
return None
self.line_number = self.line_number + 1
self.prev_line_number = self.prev_line_number + 1
return _item(type="context", left=output, right=output, line_number=self.line_number)
def _flush_row(self):
if not self.left_col and not self.right_col:
# nothing more to flush
return None
if self.state == "flush-pre-change-remove":
self.prev_line_number = self.prev_line_number + 1
return _item(
type="remove", left=self.left_col.pop(0), line_number=self.prev_line_number
)
# state == flush-pre-change-add
item = _item(type="change", have_left=ezt.boolean(0), have_right=ezt.boolean(0))
if self.left_col:
self.prev_line_number = self.prev_line_number + 1
item.have_left = ezt.boolean(1)
item.left = self.left_col.pop(0)
item.line_number = self.prev_line_number
if self.right_col:
self.line_number = self.line_number + 1
item.have_right = ezt.boolean(1)
item.right = self.right_col.pop(0)
item.line_number = self.line_number
return item
class DiffSequencingError(Exception):
pass
def diff_parse_headers(fp, diff_type, path1, path2, rev1, rev2, sym1=None, sym2=None):
date1 = date2 = log_rev1 = log_rev2 = flag = None
header_lines = []
if diff_type == vclib.UNIFIED:
f1 = "--- "
f2 = "+++ "
elif diff_type == vclib.CONTEXT:
f1 = "*** "
f2 = "--- "
else:
f1 = f2 = None
# If we're parsing headers, then parse and tweak the diff headers,
# collecting them in an array until we've read and handled them all.
if f1 and f2:
parsing = 1
flag = _RCSDIFF_NO_CHANGES
while parsing:
try:
line = fp.readline()
if not line:
break
except vclib.ExternalDiffError as e:
raise ViewVCException(str(e), "500 Internal Server Error")
# Saw at least one line in the stream
flag = None
if line[: len(f1)] == f1:
match = _re_extract_rev.match(line)
if match:
date1 = match.group(1)
log_rev1 = match.group(2)
line = "%s%s\t%s\t%s%s\n" % (
f1,
path1,
date1,
log_rev1,
sym1 and " " + sym1 or "",
)
elif line[: len(f2)] == f2:
match = _re_extract_rev.match(line)
if match:
date2 = match.group(1)
log_rev2 = match.group(2)
line = "%s%s\t%s\t%s%s\n" % (
f2,
path2,
date2,
log_rev2,
sym2 and " " + sym2 or "",
)
parsing = 0
elif line[:3] == "Bin":
flag = _RCSDIFF_IS_BINARY
parsing = 0
elif line.find("not found") != -1 or line.find("illegal option") != -1:
flag = _RCSDIFF_ERROR
parsing = 0
header_lines.append(line)
if log_rev1 and log_rev1 != rev1:
raise ViewVCException(
"rcsdiff found revision %s, but expected " "revision %s" % (log_rev1, rev1),
"500 Internal Server Error",
)
if log_rev2 and log_rev2 != rev2:
raise ViewVCException(
"rcsdiff found revision %s, but expected " "revision %s" % (log_rev2, rev2),
"500 Internal Server Error",
)
return date1, date2, flag, "".join(header_lines)
def _get_diff_path_parts(request, query_key, rev, base_rev):
repos = request.repos
if query_key in request.query_dict:
parts = _path_parts(request.query_dict[query_key])
elif request.roottype == "svn":
try:
parts = _path_parts(
repos.get_location(request.where, repos._getrev(base_rev), repos._getrev(rev))
)
except vclib.InvalidRevision:
raise ViewVCException(
"Invalid path(s) or revision(s) passed to diff", "400 Bad Request"
)
except vclib.ItemNotFound:
raise ViewVCException(
"Invalid path(s) or revision(s) passed to diff", "400 Bad Request"
)
else:
parts = request.path_parts
return parts
def setup_diff(request):
query_dict = request.query_dict
rev1 = r1 = query_dict["r1"]
rev2 = r2 = query_dict["r2"]
sym1 = sym2 = None
# hack on the diff revisions
if r1 == "text":
rev1 = query_dict.get("tr1", None)
if not rev1:
raise ViewVCException(
"Missing revision from the diff form text field", "400 Bad Request"
)
else:
idx = r1.find(":")
if idx == -1:
rev1 = r1
else:
rev1 = r1[:idx]
sym1 = r1[(idx + 1) :]
if r2 == "text":
rev2 = query_dict.get("tr2", None)
if not rev2:
raise ViewVCException(
"Missing revision from the diff form text field", "400 Bad Request"
)
sym2 = ""
else:
idx = r2.find(":")
if idx == -1:
rev2 = r2
else:
rev2 = r2[:idx]
sym2 = r2[(idx + 1) :]
if request.roottype == "svn":
try:
rev1 = str(request.repos._getrev(rev1))
rev2 = str(request.repos._getrev(rev2))
except vclib.InvalidRevision:
raise ViewVCException("Invalid revision(s) passed to diff", "400 Bad Request")
p1 = _get_diff_path_parts(request, "p1", rev1, request.pathrev)
p2 = _get_diff_path_parts(request, "p2", rev2, request.pathrev)
try:
if revcmpkey(rev1) > revcmpkey(rev2):
rev1, rev2 = rev2, rev1
sym1, sym2 = sym2, sym1
p1, p2 = p2, p1
except ValueError:
raise ViewVCException("Invalid revision(s) passed to diff", "400 Bad Request")
return p1, p2, rev1, rev2, sym1, sym2
def view_patch(request):
if "diff" not in request.cfg.options.allowed_views:
raise ViewVCException("Diff generation is disabled", "403 Forbidden")
cfg = request.cfg
query_dict = request.query_dict
p1, p2, rev1, rev2, sym1, sym2 = setup_diff(request)
mime_type1, encoding1 = calculate_mime_type(request, p1, rev1)
mime_type2, encoding2 = calculate_mime_type(request, p2, rev2)
if is_binary_file_mime_type(mime_type1, cfg) or is_binary_file_mime_type(mime_type2, cfg):
raise ViewVCException(
"Display of binary file content disabled " "by configuration", "403 Forbidden"
)
# In the absence of a format dictation in the CGI params, we'll let
# use the configured diff format, allowing 'c' to mean 'c' and
# anything else to mean 'u'.
format = query_dict.get("diff_format", cfg.options.diff_format == "c" and "c" or "u")
if format == "c":
diff_type = vclib.CONTEXT
elif format == "u":
diff_type = vclib.UNIFIED
else:
raise ViewVCException("Diff format %s not understood" % (format), "400 Bad Request")
# Set some diff options. (Are there other options folks might want?
# Maybe not. For a patch, perhaps the precise change is ideal.)
diff_options = {}
diff_options["funout"] = cfg.options.hr_funout
try:
fp = request.repos.rawdiff(p1, rev1, p2, rev2, diff_type, diff_options)
except vclib.InvalidRevision:
raise ViewVCException("Invalid path(s) or revision(s) passed to diff", "400 Bad Request")
path_left = _path_join(p1)
path_right = _path_join(p2)
date1, date2, flag, headers = diff_parse_headers(
fp, diff_type, path_left, path_right, rev1, rev2, sym1, sym2
)
server_fp = get_writeready_server_file(request, "text/plain", is_text=True)
server_fp.write(headers)
copy_stream(fp, server_fp)
fp.close()
def diff_side_item(request, path_comp, rev, sym):
"""Prepare information about left/right side of the diff. Prepare two flavors,
for content and for property diffs."""
# TODO: Is the slice necessary, or is limit enough?
options = {"svn_show_all_dir_logs": 1}
log_entry = request.repos.itemlog(path_comp, rev, vclib.SORTBY_REV, 0, 1, options)[-1]
ago = log_entry.date is not None and html_time(request, log_entry.date, 1) or None
path_joined = _path_join(path_comp)
lf = LogFormatter(request, log_entry.log)
# Item for property diff: no hrefs, there's no view
# to download/annotate property
i_prop = _item(
log_entry=log_entry,
date=make_time_string(log_entry.date, request.cfg),
author=log_entry.author,
log=lf.get(maxlen=0, htmlize=1),
size=log_entry.size,
ago=ago,
path=path_joined,
path_comp=path_comp,
rev=rev,
tag=sym,
view_href=None,
download_href=None,
download_text_href=None,
annotate_href=None,
revision_href=None,
prefer_markup=ezt.boolean(0),
)
# Content diff item is based on property diff, with URIs added
fvi = get_file_view_info(request, path_joined, rev)
i_content = copy.copy(i_prop)
i_content.view_href = fvi.view_href
i_content.download_href = fvi.download_href
i_content.download_text_href = fvi.download_text_href
i_content.annotate_href = fvi.annotate_href
i_content.revision_href = fvi.revision_href
i_content.prefer_markup = fvi.prefer_markup
# Property diff item has properties hash, naturally. Content item doesn't.
i_content.properties = None
i_prop.properties = request.repos.itemprops(path_comp, rev)
return i_content, i_prop
class DiffDescription:
def __init__(self, request):
cfg = request.cfg
query_dict = request.query_dict
self.diff_format = query_dict.get("diff_format", cfg.options.diff_format)
self.human_readable = 0
self.hide_legend = 0
self.line_differ = None
self.fp_differ = None
self.request = request
self.context = None
self.changes = []
if self.diff_format == "c":
self.diff_type = vclib.CONTEXT
self.hide_legend = 1
elif self.diff_format == "s":
self.diff_type = vclib.SIDE_BY_SIDE
self.hide_legend = 1
elif self.diff_format == "l":
self.diff_type = vclib.UNIFIED
self.context = 15
self.human_readable = 1
elif self.diff_format == "f":
self.diff_type = vclib.UNIFIED
self.context = cfg.utilities.max_context
self.human_readable = 1
elif self.diff_format == "h":
self.diff_type = vclib.UNIFIED
self.human_readable = 1
elif self.diff_format == "u":
self.diff_type = vclib.UNIFIED
self.hide_legend = 1
else:
raise ViewVCException(
"Diff format %s not understood" % self.diff_format, "400 Bad Request"
)
# Determine whether idiff is avaialble and whether it could be used.
# idiff only supports side-by-side (conditionally) and unified formats,
# and is only used if intra-line diffs are requested.
if (
cfg.options.hr_intraline
and idiff
and (
(self.human_readable and idiff.sidebyside)
or (not self.human_readable and self.diff_type == vclib.UNIFIED)
)
):
# Override hiding legend for unified format. It is not marked 'human
# readable', and it is displayed differently depending on whether
# hr_intraline is disabled (displayed as raw diff) or enabled
# (displayed as colored). What a royal mess... Issue #301 should
# at some time address it; at that time, human_readable and hide_legend
# controls should both be merged into one, 'is_colored' or something.
self.hide_legend = 0
if self.human_readable:
self.line_differ = self._line_idiff_sidebyside
self.diff_block_format = "sidebyside-2"
else:
self.line_differ = self._line_idiff_unified
self.diff_block_format = "unified"
else:
if self.human_readable:
self.diff_block_format = "sidebyside-1"
self.fp_differ = self._fp_vclib_hr
else:
self.diff_block_format = "raw"
self.fp_differ = self._fp_vclib_raw
def anchor(self, anchor_name):
self.changes.append(_item(diff_block_format="anchor", anchor=anchor_name))
def get_content_diff(self, left, right):
options = self.request.cfg.options
diff_options = {}
if self.context is not None:
diff_options["context"] = self.context
if self.human_readable or self.diff_format == "u":
diff_options["funout"] = options.hr_funout
if self.human_readable:
diff_options["ignore_white"] = options.hr_ignore_white
diff_options["ignore_keyword_subst"] = options.hr_ignore_keyword_subst
self._get_diff(left, right, self._content_lines, self._content_fp, diff_options, None)
def get_prop_diff(self, left, right):
diff_options = {}
if self.context is not None:
diff_options["context"] = self.context
if self.human_readable:
cfg = self.request.cfg
diff_options["ignore_white"] = cfg.options.hr_ignore_white
for name in self._uniq(list(left.properties.keys()) + list(right.properties.keys())):
# Skip non-utf8 property names
if is_undisplayable(name):
continue
val_left = left.properties.get(name, "")
val_right = right.properties.get(name, "")
# Skip non-changed properties
if val_left == val_right:
continue
# Check for binary properties
if is_undisplayable(val_left) or is_undisplayable(val_right):
self.changes.append(
_item(
left=left,
right=right,
diff_block_format=self.diff_block_format,
changes=[_item(type=_RCSDIFF_IS_BINARY)],
propname=name,
)
)
continue
self._get_diff(left, right, self._prop_lines, self._prop_fp, diff_options, name)
def _get_diff(self, left, right, get_lines, get_fp, diff_options, propname):
if self.fp_differ is not None:
fp = get_fp(left, right, propname, diff_options)
changes = self.fp_differ(left, right, fp, propname)
else:
lines_left = get_lines(left, propname)
lines_right = get_lines(right, propname)
changes = self.line_differ(lines_left, lines_right, diff_options)
self.changes.append(
_item(
left=left,
right=right,
changes=changes,
diff_block_format=self.diff_block_format,
propname=propname,
)
)
def _line_idiff_sidebyside(self, lines_left, lines_right, diff_options):
return idiff.sidebyside(lines_left, lines_right, diff_options.get("context", 5))
def _line_idiff_unified(self, lines_left, lines_right, diff_options):
return idiff.unified(lines_left, lines_right, diff_options.get("context", 2))
def _fp_vclib_hr(self, left, right, fp, propname):
date1, date2, flag, headers = diff_parse_headers(
fp,
self.diff_type,
self._property_path(left, propname),
self._property_path(right, propname),
left.rev,
right.rev,
left.tag,
right.tag,
)
if flag is not None:
return [_item(type=flag)]
else:
return DiffSource(fp, self.request.cfg)
def _fp_vclib_raw(self, left, right, fp, propname):
date1, date2, flag, headers = diff_parse_headers(
fp,
self.diff_type,
self._property_path(left, propname),
self._property_path(right, propname),
left.rev,
right.rev,
left.tag,
right.tag,
)
if flag is not None:
return _item(type=flag)
else:
mpw = MarkupPipeWrapper(fp, self.request.server.escape(headers), None, 1)
return _item(type="raw", raw=mpw)
def _content_lines(self, side, propname):
f = self.request.repos.openfile(side.path_comp, side.rev, {})[0]
try:
lines = [line.decode(self.request.repos.content_encoding, 'surrogateescape')
for line in f.readlines()]
finally:
f.close()
return lines
def _content_fp(self, left, right, propname, diff_options):
return self.request.repos.rawdiff(
left.path_comp, left.rev, right.path_comp, right.rev, self.diff_type, diff_options
)
def _prop_lines(self, side, propname):
val = side.properties.get(propname, "")
return val.splitlines()
def _prop_fp(self, left, right, propname, diff_options):
fn_left = self._temp_file(left.properties.get(propname))
fn_right = self._temp_file(right.properties.get(propname))
diff_args = vclib._diff_args(self.diff_type, diff_options)
info_left = (self._property_path(left, propname), left.log_entry.date, left.rev)
info_right = (self._property_path(right, propname), right.log_entry.date, right.rev)
return vclib._diff_fp(
fn_left,
fn_right,
info_left,
info_right,
self.request.cfg.utilities.diff or "diff",
diff_args,
)
def _temp_file(self, val):
"""Create a temporary file with content from val"""
fd, fn = tempfile.mkstemp()
fp = os.fdopen(fd, "wb")
if val:
if not isinstance(val, bytes):
fp.write(val.encode("utf-8", "surrogateescape"))
else:
fp.write(val)
fp.close()
return fn
def _uniq(self, lst):
"""Determine unique set of list elements"""
h = {}
for e in lst:
h[e] = 1
return sorted(h.keys())
def _property_path(self, side, propname):
"""Return path to be displayed in raw diff - possibly augmented with
property name"""
if propname is None:
return side.path
else:
return "%s:property(%s)" % (side.path, propname)
def view_diff(request):
if "diff" not in request.cfg.options.allowed_views:
raise ViewVCException("Diff generation is disabled", "403 Forbidden")
cfg = request.cfg
p1, p2, rev1, rev2, sym1, sym2 = setup_diff(request)
mime_type1, encoding1 = calculate_mime_type(request, p1, rev1)
mime_type2, encoding2 = calculate_mime_type(request, p2, rev2)
if is_binary_file_mime_type(mime_type1, cfg) or is_binary_file_mime_type(mime_type2, cfg):
raise ViewVCException(
"Display of binary file content disabled " "by configuration", "403 Forbidden"
)
# since templates are in use and subversion allows changes to the dates,
# we can't provide a strong etag
if check_freshness(request, None, "%s-%s" % (rev1, rev2), weak=1):
return
left_side_content, left_side_prop = diff_side_item(request, p1, rev1, sym1)
right_side_content, right_side_prop = diff_side_item(request, p2, rev2, sym2)
desc = DiffDescription(request)
try:
if request.pathtype == vclib.FILE:
# Get file content diff
desc.anchor("content")
desc.get_content_diff(left_side_content, right_side_content)
# Get property list and diff each property
desc.anchor("properties")
desc.get_prop_diff(left_side_prop, right_side_prop)
except vclib.InvalidRevision:
raise ViewVCException("Invalid path(s) or revision(s) passed to diff", "400 Bad Request")
no_format_params = request.query_dict.copy()
no_format_params["diff_format"] = None
diff_format_action, diff_format_hidden_values = request.get_form(params=no_format_params)
data = common_template_data(request)
data.merge(
TemplateData(
{
"diffs": desc.changes,
"diff_format": desc.diff_format,
"hide_legend": ezt.boolean(desc.hide_legend),
"patch_href": request.get_url(
view_func=view_patch, params=no_format_params, escape=1
),
"diff_format_action": diff_format_action,
"diff_format_hidden_values": diff_format_hidden_values,
}
)
)
generate_page(request, "diff", data)
def generate_tarball_header(
out,
name,
size=0,
mode=None,
mtime=0,
uid=0,
gid=0,
typeflag=None,
linkname=b"",
uname=b"viewvc",
gname=b"viewvc",
devmajor=1,
devminor=0,
prefix=None,
magic=b"ustar",
version=b"00",
chksum=None,
):
if not isinstance(name, bytes):
name = name.encode("utf-8", "surrogateescape")
if not isinstance(linkname, bytes):
linkname = linkname.encode("utf-8", "surrogateescape")
if not mode:
if name[-1:] == b"/":
mode = 0o0755
else:
mode = 0o0644
if not typeflag:
if linkname:
typeflag = b"2" # symbolic link
elif name[-1:] == b"/":
typeflag = b"5" # directory
else:
typeflag = b"0" # regular file
if not prefix:
prefix = b""
elif not isinstance(prefix, bytes):
prefix = prefix.encode("utf-8", "surrogateescape")
# generate a GNU tar extension header for a long name.
if len(name) >= 100:
generate_tarball_header(out, b"././@LongLink", len(name), 0, 0, 0, 0, b"L")
out.write(name)
out.write(b"\0" * (511 - ((len(name) + 511) % 512)))
# generate a GNU tar extension header for a long symlink name.
if len(linkname) >= 100:
generate_tarball_header(out, b"././@LongLink", len(linkname), 0, 0, 0, 0, b"K")
out.write(linkname)
out.write(b"\0" * (511 - ((len(linkname) + 511) % 512)))
block1 = struct.pack(
"100s 8s 8s 8s 12s 12s",
name,
b"%07o" % mode,
b"%07o" % uid,
b"%07o" % gid,
b"%011o" % size,
b"%011o" % mtime,
)
block2 = struct.pack(
"c 100s 6s 2s 32s 32s 8s 8s 155s",
typeflag,
linkname,
magic,
version,
uname,
gname,
b"%07o" % devmajor,
b"%07o" % devminor,
prefix,
)
if not chksum:
dummy_chksum = b" "
block = block1 + dummy_chksum + block2
chksum = 0
for i in range(len(block)):
chksum = chksum + block[i]
block = block1 + struct.pack("8s", b"%07o" % chksum) + block2
block = block + b"\0" * (512 - len(block))
out.write(block)
def generate_tarball(out, request, reldir, stack, dir_mtime=None):
# get directory info from repository
rep_path = request.path_parts + reldir
entries = request.repos.listdir(rep_path, request.pathrev, {})
request.repos.dirlogs(rep_path, request.pathrev, entries, {})
entries.sort(key=attrgetter('name'))
# figure out corresponding path in tar file. everything gets put underneath
# a single top level directory named after the repository directory being
# tarred
if request.path_parts:
tar_dir = request.path_parts[-1] + "/"
else:
# Don't handle context as a directory in the tar ball.
root_path_parts = _path_parts(request.rootname)
tar_dir = root_path_parts[-1] + "/"
if reldir:
tar_dir = tar_dir + _path_join(reldir) + "/"
cvs = request.roottype == "cvs"
# If our caller doesn't dictate a datestamp to use for the current
# directory, its datestamps will be the youngest of the datestamps
# of versioned items in that subdirectory. We'll be ignoring dead
# or busted items and, in CVS, subdirs.
if dir_mtime is None:
dir_mtime = 0
for file in entries:
if cvs and (file.kind != vclib.FILE or file.rev is None or file.dead):
continue
if (file.date is not None) and (file.date > dir_mtime):
dir_mtime = file.date
# Push current directory onto the stack.
stack.append(tar_dir)
# If this is Subversion, we generate a header for this directory
# regardless of its contents. For CVS it will only get into the
# tarball if it has files underneath it, which we determine later.
if not cvs:
generate_tarball_header(out, tar_dir, mtime=dir_mtime)
# Run through the files in this directory, skipping busted and
# unauthorized ones.
for file in entries:
if file.kind != vclib.FILE:
continue
if cvs and (file.rev is None or file.dead):
continue
# If we get here, we've seen at least one valid file in the
# current directory. For CVS, we need to make sure there are
# directory parents to contain it, so we flush the stack.
if cvs:
for dir in stack:
generate_tarball_header(out, dir, mtime=dir_mtime)
del stack[:]
# Calculate the mode for the file. Sure, we could look directly
# at the ,v file in CVS, but that's a layering violation we'd like
# to avoid as much as possible.
if request.repos.isexecutable(rep_path + [file.name], request.pathrev):
mode = 0o0755
else:
mode = 0o0644
# Is this thing a symlink?
#
# FIXME: A better solution would be to have vclib returning
# symlinks with a new vclib.SYMLINK path type.
symlink_target = None
if hasattr(request.repos, "get_symlink_target"):
symlink_target = request.repos.get_symlink_target(
rep_path + [file.name], request.pathrev
)
# If the object is a symlink, generate the appropriate header.
# Otherwise, we're dealing with a regular file.
if symlink_target:
generate_tarball_header(
out,
tar_dir + file.name,
0,
mode,
file.date is not None and file.date or 0,
typeflag=b"2",
linkname=symlink_target,
)
else:
filesize = request.repos.filesize(rep_path + [file.name], request.pathrev)
if filesize == -1:
# Bummer. We have to calculate the filesize manually.
fp = request.repos.openfile(rep_path + [file.name], request.pathrev, {})[0]
filesize = 0
while 1:
chunk = retry_read(fp)
if not chunk:
break
filesize = filesize + len(chunk)
fp.close()
# Write the tarball header...
generate_tarball_header(
out, tar_dir + file.name, filesize, mode, file.date is not None and file.date or 0
)
# ...the file's contents ...
fp = request.repos.openfile(rep_path + [file.name], request.pathrev, {})[0]
while 1:
chunk = retry_read(fp)
if not chunk:
break
out.write(chunk)
fp.close()
# ... and then add the block padding.
out.write(b"\0" * (511 - (filesize + 511) % 512))
# Recurse into subdirectories, skipping busted and unauthorized (or
# configured-to-be-hidden) ones.
for file in entries:
if file.errors or file.kind != vclib.DIR:
continue
if request.cfg.options.hide_cvsroot and is_cvsroot_path(
request.roottype, rep_path + [file.name]
):
continue
mtime = request.roottype == "svn" and file.date or None
generate_tarball(out, request, reldir + [file.name], stack, mtime)
# Pop the current directory from the stack.
del stack[-1:]
def download_tarball(request):
cfg = request.cfg
if "tar" not in cfg.options.allowed_views:
raise ViewVCException("Tarball generation is disabled", "403 Forbidden")
# Set DEBUG_TARFILE_PATH to a server-local path to enable tarball
# generation debugging and cause ViewVC to write the generated
# tarball (minus the compression layer) to that server filesystem
# location. This is *NOT* suitable for production environments!
#
# Otherwise, we do tarball generation as usual by getting a
# writeable server output stream -- disabling any default
# compression thereupon -- and wrapping that in our own gzip stream
# wrapper.
DEBUG_TARFILE_PATH = None
if DEBUG_TARFILE_PATH is not None:
fp = open(DEBUG_TARFILE_PATH, "wb")
else:
tarfile = request.rootname
if request.path_parts:
tarfile = "%s-%s" % (tarfile, request.path_parts[-1])
request.server.add_header(
"Content-Disposition", 'attachment; filename="%s.tar.gz"' % (tarfile)
)
server_fp = get_writeready_server_file(request, "application/x-gzip", allow_compress=False)
fp = gzip.GzipFile("", "wb", 9, server_fp)
# FIXME: For Subversion repositories, we can get the real mtime of
# the top-level directory here.
generate_tarball(fp, request, [], [])
fp.write(b"\0" * 1024)
fp.close()
if DEBUG_TARFILE_PATH:
server_fp = get_writeready_server_file(request, is_text=True)
server_fp.write(
"""
<html>
<body>
<p>Tarball '%s' successfully generated!</p>
</body>
</html>"""
% (DEBUG_TARFILE_PATH)
)
def view_revision(request):
if request.roottype != "svn":
raise ViewVCException(
"Revision view not supported for CVS repositories " "at this time.", "400 Bad Request"
)
cfg = request.cfg
query_dict = request.query_dict
try:
rev = request.repos._getrev(query_dict.get("revision"))
except vclib.InvalidRevision:
raise ViewVCException("Invalid revision", "404 Not Found")
youngest_rev = request.repos.get_youngest_revision()
# The revision number acts as a weak validator (but we tell browsers
# not to cache the youngest revision).
if rev != youngest_rev and check_freshness(request, None, str(rev), weak=1):
return
# Fetch the revision information.
date, author, msg, revprops, changes = request.repos.revinfo(rev)
date_str = make_time_string(date, cfg)
# Fix up the revprops list (rather like get_itemprops()).
propnames = sorted(revprops.keys())
props = []
for name in propnames:
# skip non-utf8 property names
if is_undisplayable(name, "utf-8"):
continue
undisplayable = is_undisplayable(revprops[name])
if not undisplayable:
lf = LogFormatter(
request, revprops[name].decode(request.repos.content_encoding, "backslashreplace")
)
value = lf.get(maxlen=0, htmlize=1)
else:
# note non-utf8 property values
value = None
props.append(_item(name=name, value=value, undisplayable=ezt.boolean(undisplayable)))
# Sort the changes list by path.
changes.sort(key=attrgetter('path_parts'))
# Handle limit_changes parameter
cfg_limit_changes = cfg.options.limit_changes
limit_changes = int(query_dict.get("limit_changes", cfg_limit_changes))
more_changes = None
more_changes_href = None
first_changes = None
first_changes_href = None
num_changes = len(changes)
if limit_changes and len(changes) > limit_changes:
more_changes = len(changes) - limit_changes
params = query_dict.copy()
params["limit_changes"] = 0
more_changes_href = request.get_url(params=params, escape=1)
changes = changes[:limit_changes]
elif cfg_limit_changes and len(changes) > cfg_limit_changes:
first_changes = cfg_limit_changes
params = query_dict.copy()
params["limit_changes"] = None
first_changes_href = request.get_url(params=params, escape=1)
# Add the hrefs, types, and prev info
for change in changes:
change.view_href = change.diff_href = change.type = change.log_href = None
# If the path is newly added, don't claim text or property
# modifications.
if (change.action == vclib.ADDED or change.action == vclib.REPLACED) and not change.copied:
change.text_changed = 0
change.props_changed = 0
# Calculate the view link URLs (for which we must have a pathtype).
if change.pathtype:
view_func = None
if change.pathtype is vclib.FILE and "markup" in cfg.options.allowed_views:
view_func = view_markup
elif change.pathtype is vclib.DIR:
view_func = view_directory
path = _path_join(change.path_parts)
base_path = _path_join(change.base_path_parts)
if change.action == vclib.DELETED:
link_rev = str(change.base_rev)
link_where = base_path
else:
link_rev = str(rev)
link_where = path
change.view_href = request.get_url(
view_func=view_func,
where=link_where,
pathtype=change.pathtype,
params={"pathrev": link_rev},
escape=1,
)
change.log_href = request.get_url(
view_func=view_log,
where=link_where,
pathtype=change.pathtype,
params={"pathrev": link_rev},
escape=1,
)
if (change.pathtype is vclib.FILE and change.text_changed) or change.props_changed:
change.diff_href = request.get_url(
view_func=view_diff,
where=path,
pathtype=change.pathtype,
params={"pathrev": str(rev), "r1": str(rev), "r2": str(change.base_rev)},
escape=1,
)
# use same variable names as the log template
change.path = _path_join(change.path_parts)
change.copy_path = _path_join(change.base_path_parts)
change.copy_rev = change.base_rev
change.text_mods = ezt.boolean(change.text_changed)
change.prop_mods = ezt.boolean(change.props_changed)
change.is_copy = ezt.boolean(change.copied)
change.pathtype = (
(change.pathtype == vclib.FILE and "file")
or (change.pathtype == vclib.DIR and "dir")
or None
)
del change.path_parts
del change.base_path_parts
del change.base_rev
del change.text_changed
del change.props_changed
del change.copied
prev_rev_href = next_rev_href = None
if rev > 0:
prev_rev_href = request.get_url(
view_func=view_revision,
where=None,
pathtype=None,
params={"revision": str(rev - 1)},
escape=1,
)
if rev < request.repos.get_youngest_revision():
next_rev_href = request.get_url(
view_func=view_revision,
where=None,
pathtype=None,
params={"revision": str(rev + 1)},
escape=1,
)
jump_rev_action, jump_rev_hidden_values = request.get_form(params={"revision": None})
lf = LogFormatter(request, msg)
data = common_template_data(request)
data.merge(
TemplateData(
{
"rev": str(rev),
"author": author,
"date": date_str,
"log": lf.get(maxlen=0, htmlize=1),
"properties": props,
"ago": date is not None and html_time(request, date, 1) or None,
"changes": changes,
"prev_href": prev_rev_href,
"next_href": next_rev_href,
"num_changes": num_changes,
"limit_changes": limit_changes,
"more_changes": more_changes,
"more_changes_href": more_changes_href,
"first_changes": first_changes,
"first_changes_href": first_changes_href,
"jump_rev_action": jump_rev_action,
"jump_rev_hidden_values": jump_rev_hidden_values,
"revision_href": request.get_url(
view_func=view_revision,
where=None,
pathtype=None,
params={"revision": str(rev)},
escape=1,
),
}
)
)
if rev == youngest_rev:
request.server.add_header("Cache-control", "no-store")
generate_page(request, "revision", data)
def is_query_supported(request):
"""Returns true if querying is supported for the given path."""
return (
request.cfg.cvsdb.enabled
and request.pathtype == vclib.DIR
and request.roottype in ["cvs", "svn"]
)
def is_querydb_nonempty_for_root(request):
"""Return 1 iff commits database integration is supported *and* the
current root is found in that database. Only does this check if
check_database is set to 1."""
if request.cfg.cvsdb.enabled and request.roottype in ["cvs", "svn"]:
if request.cfg.cvsdb.check_database_for_root:
global cvsdb
import cvsdb
db = cvsdb.ConnectDatabaseReadOnly(request.cfg)
repos_root, repos_dir = cvsdb.FindRepository(db, request.rootpath)
if repos_root:
return 1
else:
return 1
return 0
def validate_query_args(request):
# Do some additional input validation of query form arguments beyond
# what is offered by the CGI param validation loop in Request.run_viewvc().
for arg_base in ["branch", "file", "comment", "who"]:
# First, make sure the the XXX_match args have valid values:
arg_match = arg_base + "_match"
arg_match_value = request.query_dict.get(arg_match, "exact")
if arg_match_value not in ("exact", "like", "glob", "regex", "notregex"):
raise ViewVCException(
'An illegal value was provided for the "%s" parameter.' % (arg_match),
"400 Bad Request",
)
# Now, for those args which are supposed to be regular expressions (per
# their corresponding XXX_match values), make sure they are.
if arg_match_value == "regex" or arg_match_value == "notregex":
arg_base_value = request.query_dict.get(arg_base)
if arg_base_value:
try:
re.compile(arg_base_value)
except Exception:
raise ViewVCException(
'An illegal value was provided for the "%s" parameter.' % (arg_base),
"400 Bad Request",
)
def view_queryform(request):
cfg = request.cfg
if not is_query_supported(request):
raise ViewVCException(
'Can not query project root "%s" at "%s".' % (request.rootname, request.where),
"403 Forbidden",
)
# Do some more precise input validation.
validate_query_args(request)
query_action, query_hidden_values = request.get_form(
view_func=view_query, params={"limit_changes": None}
)
limit_changes = int(request.query_dict.get("limit_changes", cfg.options.limit_changes))
def escaped_query_dict_get(itemname, itemdefault=""):
return request.server.escape(request.query_dict.get(itemname, itemdefault))
data = common_template_data(request)
data.merge(
TemplateData(
{
"branch": escaped_query_dict_get("branch", ""),
"branch_match": escaped_query_dict_get("branch_match", "exact"),
"dir": escaped_query_dict_get("dir", ""),
"file": escaped_query_dict_get("file", ""),
"file_match": escaped_query_dict_get("file_match", "exact"),
"who": escaped_query_dict_get("who", ""),
"who_match": escaped_query_dict_get("who_match", "exact"),
"comment": escaped_query_dict_get("comment", ""),
"comment_match": escaped_query_dict_get("comment_match", "exact"),
"querysort": escaped_query_dict_get("querysort", "date"),
"date": escaped_query_dict_get("date", "hours"),
"hours": escaped_query_dict_get("hours", "2"),
"mindate": escaped_query_dict_get("mindate", ""),
"maxdate": escaped_query_dict_get("maxdate", ""),
"query_action": query_action,
"query_hidden_values": query_hidden_values,
"limit_changes": limit_changes,
"dir_href": request.get_url(view_func=view_directory, params={}, escape=1),
}
)
)
generate_page(request, "query_form", data)
def parse_date(datestr):
"""Parse a date string from the query form."""
match = re.match(r"^(\d\d\d\d)-(\d\d)-(\d\d)(?:\ +" r"(\d\d):(\d\d)(?::(\d\d))?)?$", datestr)
if match:
year = int(match.group(1))
month = int(match.group(2))
day = int(match.group(3))
hour = match.group(4)
if hour is not None:
hour = int(hour)
else:
hour = 0
minute = match.group(5)
if minute is not None:
minute = int(minute)
else:
minute = 0
second = match.group(6)
if second is not None:
second = int(second)
else:
second = 0
# return a "seconds since epoch" value assuming date given in UTC
tm = (year, month, day, hour, minute, second, 0, 0, 0)
return calendar.timegm(tm)
else:
return None
def english_query(request):
"""Generate a sentance describing the query."""
cfg = request.cfg
ret = ["Checkins "]
dir = request.query_dict.get("dir", "")
if dir:
ret.append("to ")
if "," in dir:
ret.append("subdirectories")
else:
ret.append("subdirectory")
ret.append(" <em>%s</em> " % request.server.escape(dir))
file = request.query_dict.get("file", "")
if file:
if len(ret) != 1:
ret.append("and ")
ret.append("to file <em>%s</em> " % request.server.escape(file))
who = request.query_dict.get("who", "")
branch = request.query_dict.get("branch", "")
if branch:
ret.append("on branch <em>%s</em> " % request.server.escape(branch))
else:
ret.append("on all branches ")
comment = request.query_dict.get("comment", "")
if comment:
ret.append("with comment <i>%s</i> " % request.server.escape(comment))
if who:
ret.append("by <em>%s</em> " % request.server.escape(who))
date = request.query_dict.get("date", "hours")
if date == "hours":
ret.append(
"in the last %s hours" % request.server.escape(request.query_dict.get("hours", "2"))
)
elif date == "day":
ret.append("in the last day")
elif date == "week":
ret.append("in the last week")
elif date == "month":
ret.append("in the last month")
elif date == "all":
ret.append("since the beginning of time")
elif date == "explicit":
mindate = request.query_dict.get("mindate", "")
maxdate = request.query_dict.get("maxdate", "")
if mindate and maxdate:
w1, w2 = "between", "and"
else:
w1, w2 = "since", "before"
if mindate:
mindate = make_time_string(parse_date(mindate), cfg)
ret.append("%s <em>%s</em> " % (w1, mindate))
if maxdate:
maxdate = make_time_string(parse_date(maxdate), cfg)
ret.append("%s <em>%s</em> " % (w2, maxdate))
return "".join(ret)
def prev_rev(rev):
"""Returns a string representing the previous revision of the argument."""
r = rev.split(".")
# decrement final revision component
r[-1] = str(int(r[-1]) - 1)
# prune if we pass the beginning of the branch
if len(r) > 2 and r[-1] == "0":
r = r[:-2]
return ".".join(r)
def build_commit(request, files, max_files, dir_strip, format):
"""Return a commit object build from the information in FILES, or
None if no allowed files are present in the set. DIR_STRIP is the
path prefix to remove from the commit object's set of files. If
MAX_FILES is non-zero, it is used to limit the number of files
returned in the commit object. FORMAT is the requested output
format of the query request."""
cfg = request.cfg
author = files[0].GetAuthor()
date = files[0].GetTime()
desc = files[0].GetDescription()
commit_rev = files[0].GetRevision()
len_strip = len(dir_strip)
commit_files = []
num_allowed = 0
plus_count = 0
minus_count = 0
found_unreadable = 0
for f in files:
dirname = f.GetDirectory()
filename = f.GetFile()
if dir_strip:
assert dirname[:len_strip] == dir_strip
assert len(dirname) == len_strip or dirname[len(dir_strip)] == "/"
dirname = dirname[(len_strip + 1) :]
where = dirname and ("%s/%s" % (dirname, filename)) or filename
rev = f.GetRevision()
rev_prev = prev_rev(rev)
commit_time = f.GetTime()
if commit_time:
commit_time = make_time_string(commit_time, cfg)
change_type = f.GetTypeString()
# In CVS, we can actually look at deleted revisions; in Subversion
# we can't -- we'll look at the previous revision instead.
exam_rev = rev
if request.roottype == "svn" and change_type == "Remove":
exam_rev = rev_prev
# Check path access (since the commits database logic bypasses the
# vclib layer and, thus, the vcauth stuff that layer uses).
path_parts = _path_parts(where)
if path_parts:
# Skip files in CVSROOT if asked to hide such.
if cfg.options.hide_cvsroot and is_cvsroot_path(request.roottype, path_parts):
found_unreadable = 1
continue
# We have to do a rare authz check here because this data comes
# from the CVSdb, not from the vclib providers.
#
# WARNING: The Subversion CVSdb integration logic is weak, weak,
# weak. It has no ability to track copies, so complex
# situations like a copied directory with a deleted subfile (all
# in the same revision) are very ... difficult. We've no choice
# but to omit as unauthorized paths the authorization logic
# can't find.
try:
readable = vclib.check_path_access(request.repos, path_parts, None, exam_rev)
except vclib.ItemNotFound:
readable = 0
if not readable:
found_unreadable = 1
continue
if request.roottype == "svn":
params = {"pathrev": exam_rev}
else:
params = {"revision": exam_rev, "pathrev": f.GetBranch() or None}
dir_href = request.get_url(
view_func=view_directory, where=dirname, pathtype=vclib.DIR, params=params, escape=1
)
log_href = request.get_url(
view_func=view_log, where=where, pathtype=vclib.FILE, params=params, escape=1
)
diff_href = view_href = download_href = None
if "markup" in cfg.options.allowed_views:
view_href = request.get_url(
view_func=view_markup, where=where, pathtype=vclib.FILE, params=params, escape=1
)
if "co" in cfg.options.allowed_views:
download_href = request.get_url(
view_func=view_checkout, where=where, pathtype=vclib.FILE, params=params, escape=1
)
if change_type == "Change":
diff_href_params = params.copy()
diff_href_params.update({"r1": rev_prev, "r2": rev, "diff_format": None})
diff_href = request.get_url(
view_func=view_diff,
where=where,
pathtype=vclib.FILE,
params=diff_href_params,
escape=1,
)
mime_type, encoding = calculate_mime_type(request, path_parts, exam_rev)
prefer_markup = ezt.boolean(default_view(mime_type, cfg) == view_markup)
# Update plus/minus line change count.
plus = int(f.GetPlusCount())
minus = int(f.GetMinusCount())
plus_count = plus_count + plus
minus_count = minus_count + minus
num_allowed = num_allowed + 1
if max_files and num_allowed > max_files:
continue
commit_files.append(
_item(
date=commit_time,
dir=request.server.escape(dirname),
file=request.server.escape(filename),
author=request.server.escape(f.GetAuthor()),
rev=rev,
branch=f.GetBranch(),
plus=plus,
minus=minus,
type=change_type,
dir_href=dir_href,
log_href=log_href,
view_href=view_href,
download_href=download_href,
prefer_markup=prefer_markup,
diff_href=diff_href,
)
)
# No files survived authz checks? Let's just pretend this
# little commit didn't happen, shall we?
if not len(commit_files):
return None
commit = _item(
num_files=len(commit_files), files=commit_files, plus=plus_count, minus=minus_count
)
commit.limited_files = ezt.boolean(num_allowed > len(commit_files))
# We'll mask log messages in commits which contain unreadable paths,
# but even that is kinda iffy. If a person searches for
# '/some/hidden/path' across log messages, then gets a response set
# that shows commits lacking log message, said person can reasonably
# assume that the log messages contained the hidden path, and that
# this is likely because they are referencing a real path in the
# repository -- a path the user isn't supposed to even know about.
if found_unreadable:
commit.log = None
commit.short_log = None
else:
lf = LogFormatter(request, desc)
htmlize = format != "rss"
commit.log = lf.get(maxlen=0, htmlize=htmlize)
commit.short_log = lf.get(maxlen=cfg.options.short_log_len, htmlize=htmlize)
commit.author = request.server.escape(author)
commit.rss_date = make_rss_time_string(date, request.cfg)
if request.roottype == "svn":
commit.rev = commit_rev
is_https = request.server.getenv("HTTPS") == "on"
commit.rss_url = "%s://%s%s" % (
is_https and "https" or "http",
request.server.getenv("HTTP_HOST"),
request.get_url(view_func=view_revision, params={"revision": commit.rev}, escape=1),
)
else:
commit.rev = None
commit.rss_url = None
return commit
def query_backout(request, commits):
server_fp = get_writeready_server_file(request, "text/plain", is_text=True)
if not commits:
server_fp.write(
"""\
# No changes were selected by the query.
# There is nothing to back out.
"""
)
return
server_fp.write(
"""\
# This page can be saved as a shell script and executed.
# It should be run at the top of your work area. It will update
# your working copy to back out the changes selected by the
# query.
"""
)
for commit in commits:
for fileinfo in commit.files:
if request.roottype == "cvs":
server_fp.write(
"cvs update -j %s -j %s %s/%s\n"
% (fileinfo.rev, prev_rev(fileinfo.rev), fileinfo.dir, fileinfo.file)
)
elif request.roottype == "svn":
server_fp.write(
"svn merge -r %s:%s %s/%s\n"
% (fileinfo.rev, prev_rev(fileinfo.rev), fileinfo.dir, fileinfo.file)
)
def view_query(request):
if not is_query_supported(request):
raise ViewVCException(
'Can not query project root "%s" at "%s".' % (request.rootname, request.where),
"403 Forbidden",
)
cfg = request.cfg
# Do some more precise input validation.
validate_query_args(request)
# get form data
branch = request.query_dict.get("branch", "")
branch_match = request.query_dict.get("branch_match", "exact")
dir = request.query_dict.get("dir", "")
file = request.query_dict.get("file", "")
file_match = request.query_dict.get("file_match", "exact")
who = request.query_dict.get("who", "")
who_match = request.query_dict.get("who_match", "exact")
comment = request.query_dict.get("comment", "")
comment_match = request.query_dict.get("comment_match", "exact")
querysort = request.query_dict.get("querysort", "date")
date = request.query_dict.get("date", "hours")
hours = request.query_dict.get("hours", "2")
mindate = request.query_dict.get("mindate", "")
maxdate = request.query_dict.get("maxdate", "")
format = request.query_dict.get("format")
limit_changes = int(request.query_dict.get("limit_changes", cfg.options.limit_changes))
match_types = {
"exact": 1,
"like": 1,
"glob": 1,
"regex": 1,
"notregex": 1,
}
sort_types = {
"date": 1,
"author": 1,
"file": 1,
}
date_types = {
"hours": 1,
"day": 1,
"week": 1,
"month": 1,
"all": 1,
"explicit": 1,
}
# parse various fields, validating or converting them
if branch_match not in match_types:
branch_match = "exact"
if file_match not in match_types:
file_match = "exact"
if who_match not in match_types:
who_match = "exact"
if comment_match not in match_types:
comment_match = "exact"
if querysort not in sort_types:
querysort = "date"
if date not in date_types:
date = "hours"
mindate = parse_date(mindate)
maxdate = parse_date(maxdate)
global cvsdb
import cvsdb
db = cvsdb.ConnectDatabaseReadOnly(cfg)
repos_root, repos_dir = cvsdb.FindRepository(db, request.rootpath)
if not repos_root:
raise ViewVCException(
"The root '%s' was not found in the commit database " % request.rootname
)
# create the database query from the form data
query = cvsdb.CreateCheckinQuery()
query.SetRepository(repos_root)
# treat "HEAD" specially ...
if branch_match == "exact" and branch == "HEAD":
query.SetBranch("")
elif branch:
query.SetBranch(branch, branch_match)
if dir:
for subdir in dir.split(","):
path = _path_join(repos_dir + request.path_parts + _path_parts(subdir.strip()))
query.SetDirectory(path, "exact")
query.SetDirectory("%s/%%" % cvsdb.EscapeLike(path), "like")
else:
where = _path_join(repos_dir + request.path_parts)
if where: # if we are in a subdirectory ...
query.SetDirectory(where, "exact")
query.SetDirectory("%s/%%" % cvsdb.EscapeLike(where), "like")
if file:
query.SetFile(file, file_match)
if who:
query.SetAuthor(who, who_match)
if comment:
query.SetComment(comment, comment_match)
query.SetSortMethod(querysort)
if date == "hours":
query.SetFromDateHoursAgo(int(hours))
elif date == "day":
query.SetFromDateDaysAgo(1)
elif date == "week":
query.SetFromDateDaysAgo(7)
elif date == "month":
query.SetFromDateDaysAgo(31)
elif date == "all":
pass
elif date == "explicit":
if mindate is not None:
query.SetFromDateObject(mindate)
if maxdate is not None:
query.SetToDateObject(maxdate)
# Set the admin-defined (via configuration) row limits. This is to avoid
# slamming the database server with a monster query.
if format == "rss":
query.SetLimit(cfg.cvsdb.rss_row_limit)
else:
query.SetLimit(cfg.cvsdb.row_limit)
# run the query
db.RunQuery(query)
commit_list = query.GetCommitList()
row_limit_reached = query.GetLimitReached()
# gather commits
commits = []
plus_count = 0
minus_count = 0
mod_time = -1
if commit_list:
files = []
current_desc = commit_list[0].GetDescriptionID()
current_rev = commit_list[0].GetRevision()
dir_strip = _path_join(repos_dir)
for commit in commit_list:
commit_desc = commit.GetDescriptionID()
commit_rev = commit.GetRevision()
# base modification time on the newest commit
if commit.GetTime() > mod_time:
mod_time = commit.GetTime()
# For CVS, group commits with the same commit message.
# For Subversion, group them only if they have the same revision number
if request.roottype == "cvs":
if current_desc == commit_desc:
files.append(commit)
continue
else:
if current_rev == commit_rev:
files.append(commit)
continue
# append this grouping
commit_item = build_commit(request, files, limit_changes, dir_strip, format)
if commit_item:
# update running plus/minus totals
plus_count = plus_count + commit_item.plus
minus_count = minus_count + commit_item.minus
commits.append(commit_item)
files = [commit]
current_desc = commit_desc
current_rev = commit_rev
# we need to tack on our last commit grouping, if any
commit_item = build_commit(request, files, limit_changes, dir_strip, format)
if commit_item:
# update running plus/minus totals
plus_count = plus_count + commit_item.plus
minus_count = minus_count + commit_item.minus
commits.append(commit_item)
# only show the branch column if we are querying all branches
# or doing a non-exact branch match on a CVS repository.
show_branch = ezt.boolean(
request.roottype == "cvs" and (branch == "" or branch_match != "exact")
)
# backout link
params = request.query_dict.copy()
params["format"] = "backout"
backout_href = request.get_url(params=params, escape=1)
# link to zero limit_changes value
params = request.query_dict.copy()
params["limit_changes"] = 0
limit_changes_href = request.get_url(params=params, escape=1)
# if we got any results, use the newest commit as the modification time
if mod_time >= 0:
if check_freshness(request, mod_time):
return
if format == "backout":
query_backout(request, commits)
return
data = common_template_data(request)
data.merge(
TemplateData(
{
"sql": request.server.escape(db.CreateSQLQueryString(query)),
"english_query": english_query(request),
"queryform_href": request.get_url(view_func=view_queryform, escape=1),
"backout_href": backout_href,
"plus_count": plus_count,
"minus_count": minus_count,
"show_branch": show_branch,
"querysort": querysort,
"commits": commits,
"row_limit_reached": ezt.boolean(row_limit_reached),
"limit_changes": limit_changes,
"limit_changes_href": limit_changes_href,
"rss_link_href": request.get_url(
view_func=view_query, params={"date": "month"}, escape=1, prefix=1
),
}
)
)
if format == "rss":
generate_page(request, "rss", data, "application/rss+xml")
else:
generate_page(request, "query_results", data)
_views = {
"annotate": view_annotate,
"co": view_checkout,
"diff": view_diff,
"dir": view_directory,
"graph": view_cvsgraph,
"graphimg": view_cvsgraph_image,
"log": view_log,
"markup": view_markup,
"patch": view_patch,
"query": view_query,
"queryform": view_queryform,
"revision": view_revision,
"roots": view_roots,
"tar": download_tarball,
"redirect_pathrev": redirect_pathrev,
}
_view_codes = {}
for code, view in _views.items():
_view_codes[view] = code
def list_roots(request):
cfg = request.cfg
allroots = {}
# Add the viewable Subversion roots
for root in cfg.general.svn_roots.keys():
path_encoding, content_encoding = get_repos_encodings(cfg, root,
do_overlay=True,
preserve_cfg=True)
auth = setup_authorizer(cfg, request.username, root)
try:
repos = vclib.svn.SubversionRepository(
root,
cfg.general.svn_roots[root],
auth,
cfg.utilities,
cfg.options.svn_config_dir,
content_encoding,
path_encoding,
)
lastmod = None
if cfg.options.show_roots_lastmod:
try:
repos.open()
youngest_rev = repos.youngest
date, author, msg, revprops, changes = repos.revinfo(youngest_rev)
date_str = make_time_string(date, cfg)
ago = html_time(request, date)
lf = LogFormatter(request, msg)
log = lf.get(maxlen=0, htmlize=1)
short_log = lf.get(maxlen=cfg.options.short_log_len, htmlize=1)
lastmod = _item(
ago=ago,
author=author,
date=date_str,
log=log,
short_log=short_log,
rev=str(youngest_rev),
)
except Exception:
lastmod = None
except vclib.ReposNotFound:
continue
allroots[root] = [cfg.general.svn_roots[root], "svn", lastmod]
# Add the viewable CVS roots
for root in cfg.general.cvs_roots.keys():
path_encoding, content_encoding = get_repos_encodings(cfg, root,
do_overlay=True,
preserve_cfg=True)
auth = setup_authorizer(cfg, request.username, root)
try:
vclib.ccvs.CVSRepository(
root,
cfg.general.cvs_roots[root],
auth,
cfg.utilities,
cfg.options.use_rcsparse,
content_encoding,
path_encoding,
)
except vclib.ReposNotFound:
continue
allroots[root] = [cfg.general.cvs_roots[root], "cvs", None]
return allroots
def _parse_root_parent(pp):
"""Parse a single root parent "directory [= context]: repo_type" string
and return as tuple."""
pos = pp.rfind(":")
if pos > 0:
repo_type = pp[(pos + 1) :].strip()
pp = pp[:pos].strip()
else:
repo_type = None
pos = pp.rfind("=")
if pos > 0:
context = _path_parts(pp[(pos + 1) :].strip())
pp = pp[:pos].strip()
else:
context = None
path = os.path.normpath(pp)
return path, context, repo_type
def expand_root_parents(cfg):
"""Expand the configured root parents into individual roots."""
path_encoding, _ = get_repos_encodings(cfg, None)
# Each item in root_parents is a "directory [= context ]: repo_type" string.
for pp in cfg.general.root_parents:
path, context, repo_type = _parse_root_parent(pp)
if repo_type == "cvs":
roots = vclib.ccvs.expand_root_parent(path, path_encoding)
if cfg.options.hide_cvsroot and "CVSROOT" in roots:
del roots["CVSROOT"]
if context:
fullroots = {}
for root, rootpath in roots.items():
fullroots[_path_join(context + [root])] = rootpath
cfg.general.cvs_roots.update(fullroots)
else:
cfg.general.cvs_roots.update(roots)
elif repo_type == "svn":
roots = vclib.svn.expand_root_parent(path, path_encoding)
if context:
fullroots = {}
for root, rootpath in roots.items():
fullroots[_path_join(context + [root])] = rootpath
cfg.general.svn_roots.update(fullroots)
else:
cfg.general.svn_roots.update(roots)
elif repo_type is None:
raise ViewVCException(
'The path "%s" in "root_parents" does not include a '
'repository type. Expected "cvs" or "svn".' % (pp)
)
else:
raise ViewVCException(
'The path "%s" in "root_parents" has an unrecognized '
'repository type ("%s"). Expected "cvs" or "svn".' % (pp, repo_type)
)
def find_root_in_parents(cfg, path_parts, roottype):
"""Return the rootpath for configured ROOTNAME of ROOTTYPE."""
# Easy out: caller wants rootname "CVSROOT", and we're hiding those.
if path_parts[-1] == "CVSROOT" and cfg.options.hide_cvsroot:
return None
path_encoding, _ = get_repos_encodings(cfg, None)
for pp in cfg.general.root_parents:
path, context, repo_type = _parse_root_parent(pp)
if repo_type != roottype:
continue
if context is not None:
if not _path_starts_with(path_parts, context):
continue
rootidx = len(context)
else:
rootidx = 0
if len(path_parts) <= rootidx:
continue
rootname = path_parts[rootidx]
fullroot = _path_join(path_parts[0 : (rootidx + 1)])
remain = path_parts[(rootidx + 1) :]
rootpath = None
if roottype == "cvs":
rootpath = vclib.ccvs.find_root_in_parent(path, rootname,
path_encoding)
elif roottype == "svn":
rootpath = vclib.svn.find_root_in_parent(path, rootname,
path_encoding)
if rootpath is not None:
return fullroot, rootpath, remain
return None, None, None
def locate_root_from_path(cfg, path_parts):
"""Return a 4-tuple ROOTTYPE, ROOTPATH, ROOTNAME, REMAIN for path_parts."""
for rootname, rootpath in cfg.general.cvs_roots.items():
pp = _path_parts(rootname)
if _path_starts_with(path_parts, pp):
return "cvs", rootpath, rootname, path_parts[len(pp) :]
for rootname, rootpath in cfg.general.svn_roots.items():
pp = _path_parts(rootname)
if _path_starts_with(path_parts, pp):
return "svn", rootpath, rootname, path_parts[len(pp) :]
rootname, path_in_parent, remain = find_root_in_parents(cfg, path_parts, "cvs")
if path_in_parent:
cfg.general.cvs_roots[rootname] = path_in_parent
return "cvs", path_in_parent, rootname, remain
rootname, path_in_parent, remain = find_root_in_parents(cfg, path_parts, "svn")
if path_in_parent:
cfg.general.svn_roots[rootname] = path_in_parent
return "svn", path_in_parent, rootname, remain
return None, None, None, None
def locate_root(cfg, rootname):
"""Return a 2-tuple ROOTTYPE, ROOTPATH for configured ROOTNAME."""
# First try a direct match
if rootname in cfg.general.cvs_roots:
return "cvs", cfg.general.cvs_roots[rootname]
if rootname in cfg.general.svn_roots:
return "svn", cfg.general.svn_roots[rootname]
path_parts = _path_parts(rootname)
roottype, rootpath, rootname_dupl, remain = locate_root_from_path(cfg, path_parts)
if roottype is not None:
if rootname_dupl != rootname:
raise ViewVCException(
'Found root name "%s" doesn\'t match "%s"' % (rootname_dupl, rootname),
"500 Internal Server Error",
)
if len(remain) > 0:
raise ViewVCException(
'Have remaining path "%s"' % (remain), "500 Internal Server Error"
)
return roottype, rootpath
def load_config(pathname=None, server=None):
"""Load the ViewVC configuration file. SERVER is the server object
that will be using this configuration. Consult the environment for
the variable VIEWVC_CONF_PATHNAME and VIEWCVS_CONF_PATHNAME (its
legacy name) and, if set, use its value as the path of the
configuration file; otherwise, use PATHNAME (if provided). Failing
all else, use a hardcoded default configuration path."""
# See if the environment contains overrides to the configuration
# path. If we have a SERVER object, consult its environment; use
# the OS environment otherwise.
env_get = server and server.getenv or os.environ.get
env_pathname = env_get("VIEWVC_CONF_PATHNAME") or env_get("VIEWCVS_CONF_PATHNAME")
# Try to find the configuration pathname by searching these ordered
# locations: the environment, the passed-in PATHNAME, the hard-coded
# default.
pathname = (
env_pathname
or pathname
or os.path.join(os.path.dirname(os.path.dirname(__file__)), "viewvc.conf")
)
# Load the configuration!
cfg = config.Config()
cfg.set_defaults()
cfg.load_config(pathname, env_get("HTTP_HOST"))
# Apply the stacktrace configuration immediately.
sys.tracebacklimit = cfg.options.stacktraces and 1000 or 0
# Load mime types file(s), but reverse the order -- our
# configuration uses a most-to-least preferred approach, but the
# 'mimetypes' package wants things the other way around.
if cfg.general.mime_types_files:
files = cfg.general.mime_types_files[:]
files.reverse()
files = list(map(lambda x, y=pathname: os.path.join(os.path.dirname(y), x), files))
mimetypes.init(files)
return cfg
def view_error(server, cfg):
exc_dict = get_exception_data()
# Use the configured error template if possible.
try:
if cfg and not server.response_started():
status = exc_dict.get("status", '500 Internal Server Error')
server.start_response(status=status)
template = get_view_template(cfg, "error")
server_fp = TextIOWrapper_noclose(
server.file(), "utf-8", "xmlcharrefreplace", write_through=True
)
template.generate(server_fp, exc_dict)
return
except Exception:
pass
# Fallback to the old exception printer if no configuration is
# available, or if something went wrong. (In this case, we need
# to manually escape strings.)
for key in exc_dict:
if isinstance(exc_dict[key], str):
exc_dict[key] = server.escape(exc_dict[key])
print_exception_data(server, exc_dict)
def main(server, cfg):
try:
# build a Request object, which contains info about the HTTP request
request = Request(server, cfg)
request.run_viewvc()
except SystemExit:
return
except Exception:
view_error(server, cfg)