Switch to side-by-side view

--- a/Allura/allura/model/repository.py
+++ b/Allura/allura/model/repository.py
@@ -10,10 +10,12 @@
 from collections import defaultdict
 
 import tg
-from pylons import c,g
+from pylons import c,g, request
+from paste.registry import StackedObjectProxy
 import pymongo.errors
 
 from ming import schema as S
+from ming.base import Object
 from ming.utils import LazyProperty
 from ming.orm import FieldProperty, session, Mapper
 from ming.orm.declarative import MappedClass
@@ -21,6 +23,7 @@
 from allura.lib.patience import SequenceMatcher
 from allura.lib import helpers as h
 from allura.lib import utils
+from allura.lib.cache import LRUCache
 
 from .artifact import Artifact, VersionedArtifact, Feed
 from .auth import User
@@ -34,7 +37,6 @@
 
 README_RE = re.compile('^README(\.[^.]*)?$', re.IGNORECASE)
 
-
 class RepositoryImplementation(object):
 
     # Repository-specific code
@@ -48,7 +50,7 @@
         raise NotImplementedError, 'commit'
 
     def new_commits(self, all_commits=False): # pragma no cover
-        '''Return a list of (oid, commit) in topological order (heads first).
+        '''Return a list of native commits in topological order (heads first).
 
         "commit" is a repo-native object, NOT a Commit object.
         If all_commits is False, only return commits not already indexed.
@@ -56,7 +58,7 @@
         raise NotImplementedError, 'new_commits'
 
     def commit_parents(self, commit):
-        '''Return a list of (oid, commit) for the parents of the given (native)
+        '''Return a list of native commits for the parents of the given (native)
         commit'''
         raise NotImplementedError, 'commit_parents'
 
@@ -68,9 +70,20 @@
         '''Sets repository metadata such as heads, tags, and branches'''
         raise NotImplementedError, 'refresh_heads'
 
-    def refresh_commit(self, ci, seen_object_ids): # pragma no cover
+    def refresh_commit(self, ci, native_ci): # pragma no cover
         '''Refresh the data in the commit object 'ci' with data from the repo'''
         raise NotImplementedError, 'refresh_commit'
+
+    def refresh_tree(self, tree): # pragma no cover
+        '''Refresh the data in the tree object 'tree' with data from the repo'''
+        raise NotImplementedError, 'refresh_tree'
+
+    def generate_shortlinks(self, ci): # pragma no cover
+        raise NotImplementedError, 'generate_shortlinks'
+
+    def object_id(self, obj): # pragma no cover
+        '''Return the object_id for the given native object'''
+        raise NotImplementedError, 'object_id'
 
     def _setup_hooks(self): # pragma no cover
         '''Install a hook in the repository that will ping the refresh url for
@@ -78,11 +91,11 @@
         raise NotImplementedError, '_setup_hooks'
 
     def log(self, object_id, skip, count): # pragma no cover
-        '''Return a list of object_ids beginning at the given commit ID and continuing
+        '''Return a list of (object_id, ci) beginning at the given commit ID and continuing
         to the parent nodes in a breadth-first traversal.  Also return a list of 'next commit' options
         (these are candidates for he next commit after 'count' commits have been
         exhausted).'''
-        raise NotImplementedError, '_log'
+        raise NotImplementedError, 'log'
 
     def compute_tree(self, commit, path='/'):
         '''Used in hg and svn to compute a git-like-tree lazily'''
@@ -146,6 +159,7 @@
     branches = FieldProperty([dict(name=str,object_id=str, count=int)])
     repo_tags = FieldProperty([dict(name=str,object_id=str, count=int)])
     upstream_repo = FieldProperty(dict(name=str,url=str))
+    refresh_context = StackedObjectProxy(name='refresh_context')
 
     def __init__(self, **kw):
         if 'name' in kw and 'tool' in kw:
@@ -290,13 +304,13 @@
 
     def refresh_ancestor_graph(self, commits):
         '''Make sure the CommitAncestor collection is up-to-date based on
-        the given list of (oid, native_commit) commits
+        the given list of native commits
         '''
-        PAGESIZE = 1024
         ca_doc = mapper(CommitAncestor).doc_cls
         sess = main_doc_session
         ancestor_cache = {} # ancestor_cache[oid] = [ a_oid0, a_oid1...]
-        def _ancestors(oid, ci, indent=''):
+        def _ancestors(ci, oid=None):
+            if oid is None: oid = self._impl.object_id(ci)
             if oid in ancestor_cache:
                 return ancestor_cache[oid]
             stored_ancestors = []
@@ -307,25 +321,30 @@
                 ancestor_cache[oid] = stored_ancestors
                 return stored_ancestors
             ancestor_ids = set()
-            for p_oid, p_ci in self._impl.commit_parents(ci):
+            for p_ci in self._impl.commit_parents(ci):
+                p_oid = self._impl.object_id(p_ci)
                 ancestor_ids.add(p_oid)
-                ancestor_ids.update(_ancestors(p_oid, p_ci, indent + '    '))
+                ancestor_ids.update(_ancestors(p_ci, p_oid))
             result = ancestor_cache[oid] = list(ancestor_ids)
-            for i in xrange(0, len(result), PAGESIZE):
+            for chunk in utils.chunked_iter(result, self.BATCH_SIZE):
                 sess.insert(ca_doc(
-                        dict(
-                            object_id=oid,
-                            ancestor_ids=result[i:i+PAGESIZE])))
+                        dict(object_id=oid, ancestor_ids=list(chunk))))
 
         # Compute graph in chunks to save memory
-        for i, (oid, ci) in enumerate(reversed(commits)):
-            _ancestors(oid, ci)
-            if i and i % PAGESIZE == 0:
-                log.info('=== Clear ancestor cache === ')
+        for i, ci in enumerate(reversed(commits)):
+            _ancestors(ci)
+            if (i+1) % (10 * self.BATCH_SIZE) == 0:
+                log.info('Computed ancestors for %d commits', i+1)
                 ancestor_cache = {}
+        log.info('Computed ancestors for %d commits', i+1)
 
     def refresh(self, all_commits=False, notify=True):
         '''Find any new commits in the repository and update'''
+        request.environ['paste.registry'].register(
+            self.refresh_context,
+            Object(
+                seen_oids=set(),
+                max_manifest_size=self.BATCH_SIZE))
         self._impl.refresh_heads()
         self.status = 'analyzing'
         session(self).flush()
@@ -333,25 +352,19 @@
         log.info('Refreshing repository %s', self)
         commits = self._impl.new_commits(all_commits)
         log.info('... %d new commits', len(commits))
-        self.refresh_ancestor_graph(commits)
-
-
-        return
+        # self.refresh_ancestor_graph(commits)
         # Refresh history
-        i=0
-        seen_object_ids = set()
         commit_msgs = []
-        for i, oid in enumerate(commit_ids):
-            if len(seen_object_ids) > 10000: # pragma no cover
-                log.info('... flushing seen object cache')
-                seen_object_ids = set()
+        for i, n_ci in enumerate(commits):
+            oid = self._impl.object_id(n_ci)
+            if oid in self.refresh_context.seen_oids: continue
+            self.refresh_context.seen_oids.add(oid)
             ci, isnew = Commit.upsert(oid)
+            ci.set_context(self)
             if not isnew and not all_commits:
-                 # race condition, let the other proc handle it
                 sess.expunge(ci)
                 continue
-            ci.set_context(self)
-            self._impl.refresh_commit(ci, seen_object_ids)
+            self._impl.refresh_commit(ci, n_ci)
             if (i+1) % self.BATCH_SIZE == 0:
                 log.info('...... flushing %d commits (%d total)',
                          self.BATCH_SIZE, (i+1))
@@ -386,35 +399,55 @@
                 subject=subject,
                 text=text)
         log.info('...... flushing %d commits (%d total)',
-                 i % self.BATCH_SIZE, i)
+                 (i+1) % self.BATCH_SIZE, i+1)
         sess.flush()
         sess.clear()
-        # Mark all commits in this repo as being in this repo
-        all_commit_ids = self._impl.new_commits(True)
-        Commit.query.update(
-            dict(
-                object_id={'$in':all_commit_ids},
-                repositories={'$ne':self._id}),
-            {'$push':dict(repositories=self._id)},
-            upsert=False, multi=True)
+        # Find all commits that don't know they're in this repo. Mark them as
+        # being in this repo and add their shorlinks
+        ca_doc = mapper(CommitAncestor).doc_cls
+        ci_doc = mapper(Commit).doc_cls
+        visited_cis = set()
+        for ancestor in main_doc_session.find(
+            ca_doc,
+            dict(object_id={'$in':[hd.object_id for hd in self.heads]})):
+            ancestor_ids = [
+                oid for oid in ancestor.ancestor_ids
+                if oid not in visited_cis ]
+            visited_cis.update(ancestor_ids)
+            for ignorant_ci in main_doc_session.find(
+                ci_doc,
+                dict(
+                    object_id={'$in': ancestor_ids},
+                    repositories={'$ne': self._id }),
+                fields=['_id', 'object_id']):
+                ignorant_ci.repositories.append(self._id)
+                main_doc_session.update_partial(
+                    ci_doc, dict(_id=ignorant_ci._id),
+                    {'$set': dict(repositories=ignorant_ci.repositories) })
+                ignorant_ci.update(
+                    project_id=c.project._id,
+                    app_config_id=c.app.config._id,
+                    url=self.url_for_commit(ignorant_ci))
+                self._impl.generate_shortlinks(ignorant_ci)
         self.compute_diffs()
         log.info('... refreshed repository %s.  Found %d new commits',
-                 self, len(commit_ids))
+                 self, len(commits))
         self.status = 'ready'
         for head in self.heads + self.branches + self.repo_tags:
             ci = self.commit(head.object_id)
             if ci is not None:
                 head.count = ci.count_revisions()
         session(self).flush()
-        return len(commit_ids)
+        return len(commits)
 
     def compute_diffs(self):
-        commit_ids = self._impl.new_commits(all_commits=True)
+        commits = self._impl.new_commits(all_commits=True)
         sess = session(Commit)
         # Compute diffs on all commits
         log.info('... computing diffs')
         i=0
-        for i, oid in enumerate(commit_ids):
+        for i, n_ci in enumerate(reversed(commits)):
+            oid = self._impl.object_id(n_ci)
             ci = self._impl.commit(oid)
             ci.tree.set_last_commit(ci, self)
             if not ci.diffs_computed:
@@ -425,7 +458,7 @@
                 sess.flush()
                 sess.clear()
         log.info('...... flushing %d commits (%d total)',
-                 i % self.BATCH_SIZE, i)
+                 (i+1) % self.BATCH_SIZE, i+1)
         sess.flush()
         sess.clear()
 
@@ -675,17 +708,42 @@
         session = repository_orm_session
         name='commit_ancestor'
         indexes = [
-            ('object_id'), ('ancestor_id') ]
+            ('object_id'), ('ancestor_ids') ]
 
     _id = FieldProperty(S.ObjectId)
     object_id = FieldProperty(str)
     ancestor_ids = FieldProperty([str])
 
-    @LazyProperty
-    def ancestor(self):
-        ci = Commit.query.get(object_id=self.ancestor_id)
-        if ci is None: return ci
-        ci.set_context(self.repo)
+class Manifest(MappedClass):
+    BATCH_SIZE=5000
+    class __mongometa__:
+        session = repository_orm_session
+        name='manifest'
+        indexes = [ ('object_id') ]
+
+    _id = FieldProperty(S.ObjectId)
+    object_id = FieldProperty(str)
+    object_ids = FieldProperty([
+            dict(object_id=str, name=str)])
+    _cache = LRUCache(BATCH_SIZE, 30)
+
+    @classmethod
+    def get(cls, oid):
+        result = cls._cache.get(
+            oid,
+            lambda: cls.query.find(dict(object_id=oid)).all())
+        if not result:
+            cls._cache.evict(oid)
+        return result
+
+    @classmethod
+    def from_iter(cls, oid, iterable):
+        cls.query.remove(dict(object_id=oid))
+        result = [
+            cls(object_id=oid, object_ids=list(chunk))
+            for chunk in utils.chunked_iter(iterable, cls.BATCH_SIZE) ]
+        cls._cache.set(oid, result)
+        return result
 
 class Commit(RepoObject):
     class __mongometa__:
@@ -848,7 +906,7 @@
     '''
     A representation of files & directories.  E.g. what is present at a single commit
 
-    :var object_ids: dict(object_id: name)  Set by _refresh_tree in the scm implementation
+    :var object_ids: dict(object_id: name)  Set by refresh_tree in the scm implementation
     '''
     class __mongometa__:
         polymorphic_identity='tree'