--- a/Allura/allura/model/repository.py
+++ b/Allura/allura/model/repository.py
@@ -10,10 +10,12 @@
from collections import defaultdict
import tg
-from pylons import c,g
+from pylons import c,g, request
+from paste.registry import StackedObjectProxy
import pymongo.errors
from ming import schema as S
+from ming.base import Object
from ming.utils import LazyProperty
from ming.orm import FieldProperty, session, Mapper
from ming.orm.declarative import MappedClass
@@ -21,6 +23,7 @@
from allura.lib.patience import SequenceMatcher
from allura.lib import helpers as h
from allura.lib import utils
+from allura.lib.cache import LRUCache
from .artifact import Artifact, VersionedArtifact, Feed
from .auth import User
@@ -34,7 +37,6 @@
README_RE = re.compile('^README(\.[^.]*)?$', re.IGNORECASE)
-
class RepositoryImplementation(object):
# Repository-specific code
@@ -48,7 +50,7 @@
raise NotImplementedError, 'commit'
def new_commits(self, all_commits=False): # pragma no cover
- '''Return a list of (oid, commit) in topological order (heads first).
+ '''Return a list of native commits in topological order (heads first).
"commit" is a repo-native object, NOT a Commit object.
If all_commits is False, only return commits not already indexed.
@@ -56,7 +58,7 @@
raise NotImplementedError, 'new_commits'
def commit_parents(self, commit):
- '''Return a list of (oid, commit) for the parents of the given (native)
+ '''Return a list of native commits for the parents of the given (native)
commit'''
raise NotImplementedError, 'commit_parents'
@@ -68,9 +70,20 @@
'''Sets repository metadata such as heads, tags, and branches'''
raise NotImplementedError, 'refresh_heads'
- def refresh_commit(self, ci, seen_object_ids): # pragma no cover
+ def refresh_commit(self, ci, native_ci): # pragma no cover
'''Refresh the data in the commit object 'ci' with data from the repo'''
raise NotImplementedError, 'refresh_commit'
+
+ def refresh_tree(self, tree): # pragma no cover
+ '''Refresh the data in the tree object 'tree' with data from the repo'''
+ raise NotImplementedError, 'refresh_tree'
+
+ def generate_shortlinks(self, ci): # pragma no cover
+ raise NotImplementedError, 'generate_shortlinks'
+
+ def object_id(self, obj): # pragma no cover
+ '''Return the object_id for the given native object'''
+ raise NotImplementedError, 'object_id'
def _setup_hooks(self): # pragma no cover
'''Install a hook in the repository that will ping the refresh url for
@@ -78,11 +91,11 @@
raise NotImplementedError, '_setup_hooks'
def log(self, object_id, skip, count): # pragma no cover
- '''Return a list of object_ids beginning at the given commit ID and continuing
+ '''Return a list of (object_id, ci) beginning at the given commit ID and continuing
to the parent nodes in a breadth-first traversal. Also return a list of 'next commit' options
(these are candidates for he next commit after 'count' commits have been
exhausted).'''
- raise NotImplementedError, '_log'
+ raise NotImplementedError, 'log'
def compute_tree(self, commit, path='/'):
'''Used in hg and svn to compute a git-like-tree lazily'''
@@ -146,6 +159,7 @@
branches = FieldProperty([dict(name=str,object_id=str, count=int)])
repo_tags = FieldProperty([dict(name=str,object_id=str, count=int)])
upstream_repo = FieldProperty(dict(name=str,url=str))
+ refresh_context = StackedObjectProxy(name='refresh_context')
def __init__(self, **kw):
if 'name' in kw and 'tool' in kw:
@@ -290,13 +304,13 @@
def refresh_ancestor_graph(self, commits):
'''Make sure the CommitAncestor collection is up-to-date based on
- the given list of (oid, native_commit) commits
+ the given list of native commits
'''
- PAGESIZE = 1024
ca_doc = mapper(CommitAncestor).doc_cls
sess = main_doc_session
ancestor_cache = {} # ancestor_cache[oid] = [ a_oid0, a_oid1...]
- def _ancestors(oid, ci, indent=''):
+ def _ancestors(ci, oid=None):
+ if oid is None: oid = self._impl.object_id(ci)
if oid in ancestor_cache:
return ancestor_cache[oid]
stored_ancestors = []
@@ -307,25 +321,30 @@
ancestor_cache[oid] = stored_ancestors
return stored_ancestors
ancestor_ids = set()
- for p_oid, p_ci in self._impl.commit_parents(ci):
+ for p_ci in self._impl.commit_parents(ci):
+ p_oid = self._impl.object_id(p_ci)
ancestor_ids.add(p_oid)
- ancestor_ids.update(_ancestors(p_oid, p_ci, indent + ' '))
+ ancestor_ids.update(_ancestors(p_ci, p_oid))
result = ancestor_cache[oid] = list(ancestor_ids)
- for i in xrange(0, len(result), PAGESIZE):
+ for chunk in utils.chunked_iter(result, self.BATCH_SIZE):
sess.insert(ca_doc(
- dict(
- object_id=oid,
- ancestor_ids=result[i:i+PAGESIZE])))
+ dict(object_id=oid, ancestor_ids=list(chunk))))
# Compute graph in chunks to save memory
- for i, (oid, ci) in enumerate(reversed(commits)):
- _ancestors(oid, ci)
- if i and i % PAGESIZE == 0:
- log.info('=== Clear ancestor cache === ')
+ for i, ci in enumerate(reversed(commits)):
+ _ancestors(ci)
+ if (i+1) % (10 * self.BATCH_SIZE) == 0:
+ log.info('Computed ancestors for %d commits', i+1)
ancestor_cache = {}
+ log.info('Computed ancestors for %d commits', i+1)
def refresh(self, all_commits=False, notify=True):
'''Find any new commits in the repository and update'''
+ request.environ['paste.registry'].register(
+ self.refresh_context,
+ Object(
+ seen_oids=set(),
+ max_manifest_size=self.BATCH_SIZE))
self._impl.refresh_heads()
self.status = 'analyzing'
session(self).flush()
@@ -333,25 +352,19 @@
log.info('Refreshing repository %s', self)
commits = self._impl.new_commits(all_commits)
log.info('... %d new commits', len(commits))
- self.refresh_ancestor_graph(commits)
-
-
- return
+ # self.refresh_ancestor_graph(commits)
# Refresh history
- i=0
- seen_object_ids = set()
commit_msgs = []
- for i, oid in enumerate(commit_ids):
- if len(seen_object_ids) > 10000: # pragma no cover
- log.info('... flushing seen object cache')
- seen_object_ids = set()
+ for i, n_ci in enumerate(commits):
+ oid = self._impl.object_id(n_ci)
+ if oid in self.refresh_context.seen_oids: continue
+ self.refresh_context.seen_oids.add(oid)
ci, isnew = Commit.upsert(oid)
+ ci.set_context(self)
if not isnew and not all_commits:
- # race condition, let the other proc handle it
sess.expunge(ci)
continue
- ci.set_context(self)
- self._impl.refresh_commit(ci, seen_object_ids)
+ self._impl.refresh_commit(ci, n_ci)
if (i+1) % self.BATCH_SIZE == 0:
log.info('...... flushing %d commits (%d total)',
self.BATCH_SIZE, (i+1))
@@ -386,35 +399,55 @@
subject=subject,
text=text)
log.info('...... flushing %d commits (%d total)',
- i % self.BATCH_SIZE, i)
+ (i+1) % self.BATCH_SIZE, i+1)
sess.flush()
sess.clear()
- # Mark all commits in this repo as being in this repo
- all_commit_ids = self._impl.new_commits(True)
- Commit.query.update(
- dict(
- object_id={'$in':all_commit_ids},
- repositories={'$ne':self._id}),
- {'$push':dict(repositories=self._id)},
- upsert=False, multi=True)
+ # Find all commits that don't know they're in this repo. Mark them as
+ # being in this repo and add their shorlinks
+ ca_doc = mapper(CommitAncestor).doc_cls
+ ci_doc = mapper(Commit).doc_cls
+ visited_cis = set()
+ for ancestor in main_doc_session.find(
+ ca_doc,
+ dict(object_id={'$in':[hd.object_id for hd in self.heads]})):
+ ancestor_ids = [
+ oid for oid in ancestor.ancestor_ids
+ if oid not in visited_cis ]
+ visited_cis.update(ancestor_ids)
+ for ignorant_ci in main_doc_session.find(
+ ci_doc,
+ dict(
+ object_id={'$in': ancestor_ids},
+ repositories={'$ne': self._id }),
+ fields=['_id', 'object_id']):
+ ignorant_ci.repositories.append(self._id)
+ main_doc_session.update_partial(
+ ci_doc, dict(_id=ignorant_ci._id),
+ {'$set': dict(repositories=ignorant_ci.repositories) })
+ ignorant_ci.update(
+ project_id=c.project._id,
+ app_config_id=c.app.config._id,
+ url=self.url_for_commit(ignorant_ci))
+ self._impl.generate_shortlinks(ignorant_ci)
self.compute_diffs()
log.info('... refreshed repository %s. Found %d new commits',
- self, len(commit_ids))
+ self, len(commits))
self.status = 'ready'
for head in self.heads + self.branches + self.repo_tags:
ci = self.commit(head.object_id)
if ci is not None:
head.count = ci.count_revisions()
session(self).flush()
- return len(commit_ids)
+ return len(commits)
def compute_diffs(self):
- commit_ids = self._impl.new_commits(all_commits=True)
+ commits = self._impl.new_commits(all_commits=True)
sess = session(Commit)
# Compute diffs on all commits
log.info('... computing diffs')
i=0
- for i, oid in enumerate(commit_ids):
+ for i, n_ci in enumerate(reversed(commits)):
+ oid = self._impl.object_id(n_ci)
ci = self._impl.commit(oid)
ci.tree.set_last_commit(ci, self)
if not ci.diffs_computed:
@@ -425,7 +458,7 @@
sess.flush()
sess.clear()
log.info('...... flushing %d commits (%d total)',
- i % self.BATCH_SIZE, i)
+ (i+1) % self.BATCH_SIZE, i+1)
sess.flush()
sess.clear()
@@ -675,17 +708,42 @@
session = repository_orm_session
name='commit_ancestor'
indexes = [
- ('object_id'), ('ancestor_id') ]
+ ('object_id'), ('ancestor_ids') ]
_id = FieldProperty(S.ObjectId)
object_id = FieldProperty(str)
ancestor_ids = FieldProperty([str])
- @LazyProperty
- def ancestor(self):
- ci = Commit.query.get(object_id=self.ancestor_id)
- if ci is None: return ci
- ci.set_context(self.repo)
+class Manifest(MappedClass):
+ BATCH_SIZE=5000
+ class __mongometa__:
+ session = repository_orm_session
+ name='manifest'
+ indexes = [ ('object_id') ]
+
+ _id = FieldProperty(S.ObjectId)
+ object_id = FieldProperty(str)
+ object_ids = FieldProperty([
+ dict(object_id=str, name=str)])
+ _cache = LRUCache(BATCH_SIZE, 30)
+
+ @classmethod
+ def get(cls, oid):
+ result = cls._cache.get(
+ oid,
+ lambda: cls.query.find(dict(object_id=oid)).all())
+ if not result:
+ cls._cache.evict(oid)
+ return result
+
+ @classmethod
+ def from_iter(cls, oid, iterable):
+ cls.query.remove(dict(object_id=oid))
+ result = [
+ cls(object_id=oid, object_ids=list(chunk))
+ for chunk in utils.chunked_iter(iterable, cls.BATCH_SIZE) ]
+ cls._cache.set(oid, result)
+ return result
class Commit(RepoObject):
class __mongometa__:
@@ -848,7 +906,7 @@
'''
A representation of files & directories. E.g. what is present at a single commit
- :var object_ids: dict(object_id: name) Set by _refresh_tree in the scm implementation
+ :var object_ids: dict(object_id: name) Set by refresh_tree in the scm implementation
'''
class __mongometa__:
polymorphic_identity='tree'