# -*-python-*- # # Copyright (C) 1999-2006 The ViewCVS Group. All Rights Reserved. # # By using this file, you agree to the terms and conditions set forth in # the LICENSE.html file which can be found at the top level of the ViewVC # distribution or at http://viewvc.org/license-1.html. # # For more information, visit http://viewvc.org/ # # ----------------------------------------------------------------------- "Version Control lib driver for locally accessible Subversion repositories" import vclib import os import os.path import string import cStringIO import signal import time import tempfile import popen import re from svn import fs, repos, core, delta ### Require Subversion 1.2.0 or better. if (core.SVN_VER_MAJOR, core.SVN_VER_MINOR, core.SVN_VER_PATCH) < (1, 2, 0): raise Exception, "Version requirement not met (needs 1.2.0 or better)" def _allow_all(root, path, pool): """Generic authz_read_func that permits access to all paths""" return 1 def _fs_path_join(base, relative): # Subversion filesystem paths are '/'-delimited, regardless of OS. joined_path = base + '/' + relative parts = filter(None, string.split(joined_path, '/')) return string.join(parts, '/') def _cleanup_path(path): """Return a cleaned-up Subversion filesystem path""" return string.join(filter(None, string.split(path, '/')), '/') def _compare_paths(path1, path2): path1_len = len (path1); path2_len = len (path2); min_len = min(path1_len, path2_len) i = 0 # Are the paths exactly the same? if path1 == path2: return 0 # Skip past common prefix while (i < min_len) and (path1[i] == path2[i]): i = i + 1 # Children of paths are greater than their parents, but less than # greater siblings of their parents char1 = '\0' char2 = '\0' if (i < path1_len): char1 = path1[i] if (i < path2_len): char2 = path2[i] if (char1 == '/') and (i == path2_len): return 1 if (char2 == '/') and (i == path1_len): return -1 if (i < path1_len) and (char1 == '/'): return -1 if (i < path2_len) and (char2 == '/'): return 1 # Common prefix was skipped above, next character is compared to # determine order return cmp(char1, char2) def _datestr_to_date(datestr, pool): if datestr is None: return None return core.svn_time_from_cstring(datestr, pool) / 1000000 def _fs_rev_props(fsptr, rev, pool): author = fs.revision_prop(fsptr, rev, core.SVN_PROP_REVISION_AUTHOR, pool) msg = fs.revision_prop(fsptr, rev, core.SVN_PROP_REVISION_LOG, pool) date = fs.revision_prop(fsptr, rev, core.SVN_PROP_REVISION_DATE, pool) return date, author, msg def date_from_rev(svnrepos, rev): if (rev < 0) or (rev > svnrepos.youngest): raise vclib.InvalidRevision(rev) datestr = fs.revision_prop(svnrepos.fs_ptr, rev, core.SVN_PROP_REVISION_DATE, svnrepos.pool) return _datestr_to_date(datestr, svnrepos.pool) def get_location(svnrepos, path, rev, old_rev): try: results = repos.svn_repos_trace_node_locations(svnrepos.fs_ptr, path, rev, [old_rev], _allow_all, svnrepos.pool) except core.SubversionException, e: if e.apr_err == core.SVN_ERR_FS_NOT_FOUND: raise vclib.ItemNotFound(path) raise try: old_path = results[old_rev] except KeyError: raise vclib.ItemNotFound(path) return _cleanup_path(old_path) def last_rev(svnrepos, path, peg_revision, limit_revision=None): """Given PATH, known to exist in PEG_REVISION, find the youngest revision older than, or equal to, LIMIT_REVISION in which path exists. Return that revision, and the path at which PATH exists in that revision.""" # Here's the plan, man. In the trivial case (where PEG_REVISION is # the same as LIMIT_REVISION), this is a no-brainer. If # LIMIT_REVISION is older than PEG_REVISION, we can use Subversion's # history tracing code to find the right location. If, however, # LIMIT_REVISION is younger than PEG_REVISION, we suffer from # Subversion's lack of forward history searching. Our workaround, # ugly as it may be, involves a binary search through the revisions # between PEG_REVISION and LIMIT_REVISION to find our last live # revision. peg_revision = svnrepos._getrev(peg_revision) limit_revision = svnrepos._getrev(limit_revision) try: if peg_revision == limit_revision: return peg_revision, path elif peg_revision > limit_revision: fsroot = svnrepos._getroot(peg_revision) history = fs.node_history(fsroot, path, svnrepos.scratch_pool) while history: path, peg_revision = fs.history_location(history, svnrepos.scratch_pool); if peg_revision <= limit_revision: return max(peg_revision, limit_revision), _cleanup_path(path) history = fs.history_prev(history, 1, svnrepos.scratch_pool) return peg_revision, _cleanup_path(path) else: ### Warning: this is *not* an example of good pool usage. orig_id = fs.node_id(svnrepos._getroot(peg_revision), path, svnrepos.scratch_pool) while peg_revision != limit_revision: mid = (peg_revision + 1 + limit_revision) / 2 try: mid_id = fs.node_id(svnrepos._getroot(mid), path, svnrepos.scratch_pool) except core.SubversionException, e: if e.apr_err == core.SVN_ERR_FS_NOT_FOUND: cmp = -1 else: raise else: ### Not quite right. Need a comparison function that only returns ### true when the two nodes are the same copy, not just related. cmp = fs.compare_ids(orig_id, mid_id) if cmp in (0, 1): peg_revision = mid else: limit_revision = mid - 1 return peg_revision, path finally: svnrepos._scratch_clear() def created_rev(svnrepos, full_name, rev): fsroot = svnrepos._getroot(rev) return fs.node_created_rev(fsroot, full_name, svnrepos.pool) class Revision(vclib.Revision): "Hold state for each revision's log entry." def __init__(self, rev, date, author, msg, size, filename, copy_path, copy_rev): vclib.Revision.__init__(self, rev, str(rev), date, author, None, msg, size) self.filename = filename self.copy_path = copy_path self.copy_rev = copy_rev class NodeHistory: def __init__(self, fs_ptr, show_all_logs): self.histories = {} self.fs_ptr = fs_ptr self.show_all_logs = show_all_logs def add_history(self, path, revision, pool): # If filtering, only add the path and revision to the histories # list if they were actually changed in this revision (where # change means the path itself was changed, or one of its parents # was copied). This is useful for omitting bubble-up directory # changes. if not self.show_all_logs: rev_root = fs.revision_root(self.fs_ptr, revision, pool) changed_paths = fs.paths_changed(rev_root, pool) paths = changed_paths.keys() if path not in paths: # Look for a copied parent test_path = path found = 0 subpool = core.svn_pool_create(pool) while 1: core.svn_pool_clear(subpool) off = string.rfind(test_path, '/') if off < 0: break test_path = test_path[0:off] if test_path in paths: copyfrom_rev, copyfrom_path = \ fs.copied_from(rev_root, test_path, subpool) if copyfrom_rev >= 0 and copyfrom_path: found = 1 break core.svn_pool_destroy(subpool) if not found: return self.histories[revision] = _cleanup_path(path) def _get_history(svnrepos, full_name, rev, options={}): fsroot = svnrepos._getroot(rev) show_all_logs = options.get('svn_show_all_dir_logs', 0) if not show_all_logs: # See if the path is a file or directory. kind = fs.check_path(fsroot, full_name, svnrepos.pool) if kind is core.svn_node_file: show_all_logs = 1 # Instantiate a NodeHistory collector object. history = NodeHistory(svnrepos.fs_ptr, show_all_logs) # Do we want to cross copy history? cross_copies = options.get('svn_cross_copies', 0) # Get the history items for PATH. repos.svn_repos_history(svnrepos.fs_ptr, full_name, history.add_history, 1, rev, cross_copies, svnrepos.pool) return history.histories class ChangedPath: def __init__(self, filename, pathtype, prop_mods, text_mods, base_path, base_rev, action, is_copy): self.filename = filename self.pathtype = pathtype self.prop_mods = prop_mods self.text_mods = text_mods self.base_path = base_path self.base_rev = base_rev self.action = action self.is_copy = is_copy def get_revision_info(svnrepos, rev): fsroot = svnrepos._getroot(rev) # Get the changes for the revision editor = repos.ChangeCollector(svnrepos.fs_ptr, fsroot, svnrepos.pool) e_ptr, e_baton = delta.make_editor(editor, svnrepos.pool) repos.svn_repos_replay(fsroot, e_ptr, e_baton, svnrepos.pool) changes = editor.get_changes() changedpaths = {} # Copy the Subversion changes into a new hash, converting them into # ChangedPath objects. for path in changes.keys(): change = changes[path] if change.path: change.path = _cleanup_path(change.path) if change.base_path: change.base_path = _cleanup_path(change.base_path) is_copy = 0 if not hasattr(change, 'action'): # new to subversion 1.4.0 action = 'modified' if not change.path: action = 'deleted' elif change.added: action = 'added' replace_check_path = path if change.base_path and change.base_rev: replace_check_path = change.base_path if changedpaths.has_key(replace_check_path) \ and changedpaths[replace_check_path].action == 'deleted': action = 'replaced' else: if change.action == repos.CHANGE_ACTION_ADD: action = 'added' elif change.action == repos.CHANGE_ACTION_DELETE: action = 'deleted' elif change.action == repos.CHANGE_ACTION_REPLACE: action = 'replaced' else: action = 'modified' if (action == 'added' or action == 'replaced') \ and change.base_path \ and change.base_rev: is_copy = 1 if change.item_kind == core.svn_node_dir: pathtype = vclib.DIR elif change.item_kind == core.svn_node_file: pathtype = vclib.FILE else: pathtype = None changedpaths[path] = ChangedPath(path, pathtype, change.prop_changes, change.text_changed, change.base_path, change.base_rev, action, is_copy) # Actually, what we want is a sorted list of ChangedPath objects. change_items = changedpaths.values() change_items.sort(lambda a, b: _compare_paths(a.filename, b.filename)) # Now get the revision property info. Would use # editor.get_root_props(), but something is broken there... datestr, author, msg = _fs_rev_props(svnrepos.fs_ptr, rev, svnrepos.pool) date = _datestr_to_date(datestr, svnrepos.pool) return date, author, msg, change_items def _log_helper(svnrepos, rev, path, pool): rev_root = fs.revision_root(svnrepos.fs_ptr, rev, pool) # Was this path@rev the target of a copy? copyfrom_rev, copyfrom_path = fs.copied_from(rev_root, path, pool) # Assemble our LogEntry datestr, author, msg = _fs_rev_props(svnrepos.fs_ptr, rev, pool) date = _datestr_to_date(datestr, pool) if fs.is_file(rev_root, path, pool): size = fs.file_length(rev_root, path, pool) else: size = None entry = Revision(rev, date, author, msg, size, path, copyfrom_path and _cleanup_path(copyfrom_path), copyfrom_rev) return entry def _fetch_log(svnrepos, full_name, which_rev, options, pool): revs = [] if options.get('svn_latest_log', 0): rev = _log_helper(svnrepos, which_rev, full_name, pool) if rev: revs.append(rev) else: history_set = _get_history(svnrepos, full_name, which_rev, options) history_revs = history_set.keys() history_revs.sort() history_revs.reverse() subpool = core.svn_pool_create(pool) for history_rev in history_revs: core.svn_pool_clear(subpool) rev = _log_helper(svnrepos, history_rev, history_set[history_rev], subpool) if rev: revs.append(rev) core.svn_pool_destroy(subpool) return revs def _get_last_history_rev(fsroot, path, pool): history = fs.node_history(fsroot, path, pool) history = fs.history_prev(history, 0, pool) history_path, history_rev = fs.history_location(history, pool); return history_rev def get_logs(svnrepos, full_name, rev, files): fsroot = svnrepos._getroot(rev) subpool = core.svn_pool_create(svnrepos.pool) for file in files: core.svn_pool_clear(subpool) path = _fs_path_join(full_name, file.name) rev = _get_last_history_rev(fsroot, path, subpool) datestr, author, msg = _fs_rev_props(svnrepos.fs_ptr, rev, subpool) date = _datestr_to_date(datestr, subpool) file.rev = str(rev) file.date = date file.author = author file.log = msg if file.kind == vclib.FILE: file.size = fs.file_length(fsroot, path, subpool) core.svn_pool_destroy(subpool) def get_youngest_revision(svnrepos): return svnrepos.youngest def temp_checkout(svnrepos, path, rev, pool): """Check out file revision to temporary file""" temp = tempfile.mktemp() fp = open(temp, 'wb') try: root = svnrepos._getroot(rev) stream = fs.file_contents(root, path, pool) try: while 1: chunk = core.svn_stream_read(stream, core.SVN_STREAM_CHUNK_SIZE) if not chunk: break fp.write(chunk) finally: core.svn_stream_close(stream) finally: fp.close() return temp class FileContentsPipe: def __init__(self, root, path, pool): self._pool = core.svn_pool_create(pool) self._stream = fs.file_contents(root, path, self._pool) self._eof = 0 def __del__(self): core.svn_pool_destroy(self._pool) def read(self, len=None): chunk = None if not self._eof: if len is None: buffer = cStringIO.StringIO() try: while 1: hunk = core.svn_stream_read(self._stream, 8192) if not hunk: break buffer.write(hunk) chunk = buffer.getvalue() finally: buffer.close() else: chunk = core.svn_stream_read(self._stream, len) if not chunk: self._eof = 1 return chunk def readline(self): chunk = None if not self._eof: chunk, self._eof = core.svn_stream_readline(self._stream, '\n', self._pool) if not self._eof: chunk = chunk + '\n' if not chunk: self._eof = 1 return chunk def readlines(self): lines = [] while True: line = self.readline() if not line: break lines.append(line) return lines def close(self): return core.svn_stream_close(self._stream) def eof(self): return self._eof _re_blameinfo = re.compile(r"\s*(\d+)\s*(.*)") class BlameSource: def __init__(self, svn_client_path, rootpath, fs_path, rev, first_rev): self.idx = -1 self.line_number = 1 self.last = None self.first_rev = first_rev # Do a little dance to get a URL that works in both Unix-y and # Windows worlds. rootpath = os.path.abspath(rootpath) if rootpath and rootpath[0] != '/': rootpath = '/' + rootpath if os.sep != '/': rootpath = string.replace(rootpath, os.sep, '/') url = 'file://' + string.join([rootpath, fs_path], "/") fp = popen.popen(svn_client_path, ('blame', "-r%d" % int(rev), "--non-interactive", "%s@%d" % (url, int(rev))), 'rb', 1) self.fp = fp def __getitem__(self, idx): if idx == self.idx: return self.last if idx != self.idx + 1: raise BlameSequencingError() line = self.fp.readline() if not line: raise IndexError("No more annotations") m = _re_blameinfo.match(line[:17]) if not m: raise vclib.Error("Could not parse blame output at line %i\n%s" % (idx+1, line)) rev, author = m.groups() text = line[18:] rev = int(rev) prev_rev = None if rev > self.first_rev: prev_rev = rev - 1 item = _item(text=text, line_number=idx+1, rev=rev, prev_rev=prev_rev, author=author, date=None) self.last = item self.idx = idx return item class BlameSequencingError(Exception): pass class SubversionRepository(vclib.Repository): def __init__(self, name, rootpath, svn_path): if not os.path.isdir(rootpath): raise vclib.ReposNotFound(name) # Initialize some stuff. self.pool = None self.apr_init = 0 self.rootpath = rootpath self.name = name self.svn_client_path = os.path.normpath(os.path.join(svn_path, 'svn')) # Register a handler for SIGTERM so we can have a chance to # cleanup. If ViewVC takes too long to start generating CGI # output, Apache will grow impatient and SIGTERM it. While we # don't mind getting told to bail, we want to gracefully close the # repository before we bail. def _sigterm_handler(signum, frame, self=self): self._close() sys.exit(-1) try: signal.signal(signal.SIGTERM, _sigterm_handler) except ValueError: # This is probably "ValueError: signal only works in main # thread", which will get thrown by the likes of mod_python # when trying to install a signal handler from a thread that # isn't the main one. We'll just not care. pass # Initialize APR and get our top-level pool. core.apr_initialize() self.apr_init = 1 self.pool = core.svn_pool_create(None) self.scratch_pool = core.svn_pool_create(self.pool) # Open the repository and init some other variables. self.repos = repos.svn_repos_open(rootpath, self.pool) self.fs_ptr = repos.svn_repos_fs(self.repos) self.youngest = fs.youngest_rev(self.fs_ptr, self.pool) self._fsroots = {} def __del__(self): self._close() def _close(self): if self.pool: core.svn_pool_destroy(self.pool) self.pool = None if self.apr_init: core.apr_terminate() self.apr_init = 0 def _scratch_clear(self): core.svn_pool_clear(self.scratch_pool) def itemtype(self, path_parts, rev): rev = self._getrev(rev) basepath = self._getpath(path_parts) kind = fs.check_path(self._getroot(rev), basepath, self.scratch_pool) self._scratch_clear() if kind == core.svn_node_dir: return vclib.DIR if kind == core.svn_node_file: return vclib.FILE raise vclib.ItemNotFound(path_parts) def openfile(self, path_parts, rev): path = self._getpath(path_parts) rev = self._getrev(rev) fsroot = self._getroot(rev) revision = str(_get_last_history_rev(fsroot, path, self.scratch_pool)) self._scratch_clear() fp = FileContentsPipe(fsroot, path, self.pool) return fp, revision def listdir(self, path_parts, rev, options): basepath = self._getpath(path_parts) if self.itemtype(path_parts, rev) != vclib.DIR: raise vclib.Error("Path '%s' is not a directory." % basepath) rev = self._getrev(rev) fsroot = self._getroot(rev) dirents = fs.dir_entries(fsroot, basepath, self.scratch_pool) entries = [ ] for entry in dirents.values(): if entry.kind == core.svn_node_dir: kind = vclib.DIR elif entry.kind == core.svn_node_file: kind = vclib.FILE entries.append(vclib.DirEntry(entry.name, kind)) self._scratch_clear() return entries def dirlogs(self, path_parts, rev, entries, options): get_logs(self, self._getpath(path_parts), self._getrev(rev), entries) def itemlog(self, path_parts, rev, options): """see vclib.Repository.itemlog docstring Option values recognized by this implementation svn_show_all_dir_logs boolean, default false. if set for a directory path, will include revisions where files underneath the directory have changed svn_cross_copies boolean, default false. if set for a path created by a copy, will include revisions from before the copy svn_latest_log boolean, default false. if set will return only newest single log entry """ path = self._getpath(path_parts) rev = self._getrev(rev) revs = _fetch_log(self, path, rev, options, self.scratch_pool) self._scratch_clear() revs.sort() prev = None for rev in revs: rev.prev = prev prev = rev return revs def annotate(self, path_parts, rev): path = self._getpath(path_parts) rev = self._getrev(rev) fsroot = self._getroot(rev) history_set = _get_history(self, path, rev, {'svn_cross_copies': 1}) history_revs = history_set.keys() history_revs.sort() revision = history_revs[-1] first_rev = history_revs[0] source = BlameSource(self.svn_client_path, self.rootpath, path, rev, first_rev) return source, revision def rawdiff(self, path_parts1, rev1, path_parts2, rev2, type, options={}): p1 = self._getpath(path_parts1) p2 = self._getpath(path_parts2) r1 = self._getrev(rev1) r2 = self._getrev(rev2) args = vclib._diff_args(type, options) try: temp1 = temp_checkout(self, p1, r1, self.pool) temp2 = temp_checkout(self, p2, r2, self.pool) info1 = p1, date_from_rev(self, r1), r1 info2 = p2, date_from_rev(self, r2), r2 return vclib._diff_fp(temp1, temp2, info1, info2, args) except vclib.svn.core.SubversionException, e: if e.apr_err == vclib.svn.core.SVN_ERR_FS_NOT_FOUND: raise vclib.InvalidRevision raise def _getpath(self, path_parts): return string.join(path_parts, '/') def _getrev(self, rev): if rev is None or rev == 'HEAD': return self.youngest try: rev = int(rev) except ValueError: raise vclib.InvalidRevision(rev) if (rev < 0) or (rev > self.youngest): raise vclib.InvalidRevision(rev) return rev def _getroot(self, rev): try: return self._fsroots[rev] except KeyError: r = self._fsroots[rev] = fs.revision_root(self.fs_ptr, rev, self.pool) return r class _item: def __init__(self, **kw): vars(self).update(kw)