--- a
+++ b/pyforge/pyforge/model/notification.py
@@ -0,0 +1,311 @@
+'''Manage notifications and subscriptions
+
+When an artifact is modified:
+- Notification generated by plugin app
+- Search is made for subscriptions matching the notification
+- Notification is added to each matching subscriptions' queue
+
+Periodically:
+- For each subscriptions with notifications and direct delivery:
+   - For each notification, enqueue as a separate email message
+   - Clear subscription's notification list
+- For each subscription with notifications and delivery due:
+   - Enqueue one email message with all notifications
+   - Clear subscription's notification list
+
+Notifications are also available for use in feeds
+'''
+
+from datetime import datetime, timedelta
+
+from pylons import c, g
+
+from ming import schema as S
+from ming.orm import MappedClass, FieldProperty, ForeignIdProperty, RelationProperty
+
+from pyforge.lib import helpers as h
+
+from .session import main_orm_session, project_orm_session
+from .types import ArtifactReferenceType
+
+class Notification(MappedClass):
+    class __mongometa__:
+        session = main_orm_session
+        name = 'notification'
+
+    _id = FieldProperty(str, if_missing=h.gen_message_id)
+
+    # Classify notifications
+    project_id = ForeignIdProperty('Project', if_missing=lambda:c.project._id)
+    app_config_id = ForeignIdProperty('AppConfig', if_missing=lambda:c.app.config._id)
+    artifact_reference = FieldProperty(ArtifactReferenceType)
+    topic = FieldProperty(str)
+
+    # Notification Content
+    in_reply_to=FieldProperty(str)
+    from_address=FieldProperty(str)
+    subject=FieldProperty(str)
+    text=FieldProperty(str)
+    link=FieldProperty(str)
+    author_id=ForeignIdProperty('User')
+    feed_meta=FieldProperty(dict(
+            link=str,
+            created=S.DateTime(if_missing=datetime.utcnow),
+            unique_id=S.String(if_missing=lambda:h.nonce(40)),
+            author_name=S.String(if_missing=lambda:c.user.display_name),
+            author_link=S.String(if_missing=lambda:c.user.url())))
+
+    @classmethod
+    def post(cls, artifact, topic, **kw):
+        '''Create a notification and  send the notify message'''
+        idx = artifact.index()
+        subject_prefix = '[%s:%s] ' % (
+            c.project.shortname, c.app.config.options.mount_point)
+        if topic == 'message':
+            post = kw.pop('post')
+            subject = post.subject
+            if post.parent_id and not post.subject.lower().startswith('re:'):
+                subject = 'Re: ' + subject
+            d = dict(
+                _id=post._id,
+                from_address='%s <%s>' % (
+                    post.author().display_name, getattr(artifact, 'email_address', 'noreply@in.sf.net')),
+                subject=subject_prefix + subject,
+                text=post.text,
+                in_reply_to=post.parent_id)
+        else:
+            subject = kw.pop('subject', '%s modified by %s' % (
+                    idx['title_s'], c.user.display_name))
+            d = dict(
+                from_address='%s <%s>' % (
+                    idx['title_s'], artifact.email_address),
+                subject=subject_prefix + subject,
+                text=kw.pop('text', subject))
+        n = cls(artifact_reference=artifact.dump_ref(),
+                topic=topic,
+                link=kw.pop('link', artifact.url()),
+                **d)
+        g.publish('react', 'forgemail.notify', dict(
+                notification_id=n._id,
+                artifact_index_id=idx['id'],
+                topic=topic))
+        return n
+
+    def send_direct(self, user_id):
+        g.publish('audit', 'forgemail.send_email', {
+                'destinations':[str(user_id)],
+                'from':self.from_address,
+                'subject':self.subject,
+                'message_id':self._id,
+                'in_reply_to':self.in_reply_to,
+                'text':self.text},
+                  serializer='pickle')
+
+    @classmethod
+    def send_digest(self, user_id, from_address, subject, notifications):
+        text = [ 'Digest of %s' % subject ]
+        for n in notifications:
+            text.append('From: %s' % n.from_address)
+            text.append('Subject: %s' % n.subject)
+            text.append('Message-ID: %s' % n._id)
+            text.append('')
+            text.append(n.text or '-no text-')
+        text = '\n'.join(text)
+        g.publish('audit', 'forgemail.send_email', {
+                'destinations':[str(user_id)],
+                'from':from_address,
+                'subject':subject,
+                'message_id':h.gen_message_id(),
+                'text':text},
+                  serializer='pickle')
+
+    @classmethod
+    def send_summary(self, user_id, from_address, subject, notifications):
+        text = [ 'Digest of %s' % subject ]
+        for n in notifications:
+            text.append('From: %s' % n.from_address)
+            text.append('Subject: %s' % n.subject)
+            text.append('Message-ID: %s' % n._id)
+            text.append('')
+            text.append(h.text.truncate(n.text or '-no text-', 128))
+        text = '\n'.join(text)
+        g.publish('audit', 'forgemail.send_email', {
+                'destinations':[str(user_id)],
+                'from':from_address,
+                'subject':subject,
+                'message_id':h.gen_message_id(),
+                'text':text},
+                  serializer='pickle')
+
+class Subscriptions(MappedClass):
+    '''User-maintained subscriptions
+    '''
+    class __mongometa__:
+        session = main_orm_session
+        name = 'subscriptions'
+
+    _id = FieldProperty(S.ObjectId)
+    user_id = ForeignIdProperty('User', if_missing=lambda:c.user._id)
+    project_id = ForeignIdProperty('Project', if_missing=lambda:c.project._id)
+    app_config_id = ForeignIdProperty('AppConfig', if_missing=lambda:c.app.config._id)
+    subscriptions = FieldProperty(
+        [ {'_id':S.ObjectId,
+           'artifact_index_id':str,
+           'topic':str,
+           'type':S.OneOf('direct', 'digest', 'summary'),
+           'frequency':{'n':int, 'unit':S.OneOf('day', 'week', 'month')},
+           'next_scheduled':datetime,
+           'mailbox_id':S.ObjectId,
+           }])
+    project = RelationProperty('Project')
+    app_config = RelationProperty('AppConfig')
+
+
+    @classmethod
+    def upsert(cls, user=None, project=None, app=None):
+        if user is None: user = c.user
+        if project is None: project = c.project
+        if app is None: app = c.app
+        d = dict(
+            user_id=user._id,
+            project_id=project._id,
+            app_config_id=app.config._id)
+        result = cls.query.get(**d)
+        if result is None: result = cls(**d)
+        return result
+
+    def subscribed(self, artifact=None, topic=None):
+        if artifact:
+            artifact_index_id = artifact.index_id()
+        else:
+            artifact_index_id = None
+        new_subs = []
+        for s in self.subscriptions:
+            if (s.artifact_index_id == artifact_index_id
+                and s.topic == topic):
+                return True
+        return False
+
+    def subscribe(self, type, n=1, unit='day', artifact=None, topic=None):
+        '''Subscribe to notifications on the current project/app, optionally
+        filtered by artifact and topic.  This method creates the associated
+        mailbox as well.
+        '''
+        if self.subscribed(artifact, topic): return
+        if artifact:
+            artifact_index_id = artifact.index_id()
+        else:
+            artifact_index_id = None
+        d = dict(
+            artifact_index_id=artifact_index_id,
+            topic=topic,
+            type=type,
+            frequency=dict(n=n, unit=unit),
+            next_scheduled=datetime.utcnow())
+        mbox = Mailbox(**d)
+        self.subscriptions.append(
+            dict(d, mailbox_id=mbox._id))
+
+    def unsubscribe(self, artifact=None, artifact_index_id=None, topic=None):
+        '''Unsubscribe to notifications on the current project/app, optionally
+        filtered by artifact and topic.  This method removes the associated
+        mailbox as well.
+        '''
+        if artifact_index_id is None and artifact is not None:
+            artifact_index_id = artifact.index_id()
+        new_subs = []
+        for s in self.subscriptions:
+            if (s.artifact_index_id == artifact_index_id
+                and s.topic == topic):
+                Mailbox.query.remove(dict(_id=s.mailbox_id))
+            else:
+                new_subs.append(s)
+        self.subscriptions = new_subs
+
+class Mailbox(MappedClass):
+    class __mongometa__:
+        session = main_orm_session
+        name = 'mailbox'
+
+    _id = FieldProperty(S.ObjectId)
+    user_id = ForeignIdProperty('User', if_missing=lambda:c.user._id)
+    project_id = ForeignIdProperty('Project', if_missing=lambda:c.project._id)
+    app_config_id = ForeignIdProperty('AppConfig', if_missing=lambda:c.app.config._id)
+
+    # Subscription filters
+    artifact_index_id = FieldProperty(str)
+    topic = FieldProperty(str)
+
+    # Subscription type
+    type = FieldProperty(S.OneOf, 'direct', 'digest', 'summary')
+    frequency = FieldProperty(dict(
+            n=int,unit=S.OneOf('day', 'week', 'month')))
+    next_scheduled = FieldProperty(datetime)
+
+    # Actual notification IDs
+    queue = FieldProperty([str])
+
+    @classmethod
+    def deliver(cls, nid, artifact_index_id, topic):
+        '''Called in the notification message handler to deliver notification IDs
+        to the appropriate  mailboxes.  Atomically appends the nids
+        to the appropriate mailboxes.
+        '''
+        q = cls.query.find({
+                'project_id':c.project._id,
+                'app_config_id':c.app.config._id,
+                'artifact_index_id':{'$in':[None, artifact_index_id]},
+                'topic':{'$in':[None, topic]}
+                })
+        for mbox in q:
+            mbox.query.update(
+                dict(_id=mbox._id),
+                {'$push':dict(queue=nid)})
+
+    @classmethod
+    def fire_ready(cls):
+        '''Fires all direct subscriptions with notifications as well as
+        all summary & digest subscriptions with notifications that are ready
+        '''
+        now = datetime.utcnow()
+        # Queries to find all matching subscription objects
+        q_direct = dict(
+            type='direct',
+            queue={'$ne':[]})
+        q_digest = dict(
+            type={'$in': ['digest', 'summary']},
+            next_scheduled={'$lt':now})
+        for mbox in cls.query.find(q_direct):
+            mbox = mbox.query.find_and_modify(
+                query=dict(_id=mbox._id),
+                update={'$set': dict(
+                        queue=[])})
+            mbox.fire(now)
+        for mbox in cls.query.find(q_digest):
+            next_scheduled = now
+            if mbox.frequency.unit == 'day':
+                next_scheduled += timedelta(days=mbox.frequency.n)
+            elif mbox.frequency.unit == 'week':
+                next_scheduled += timedelta(days=7 * mbox.frequency.n)
+            elif mbox.frequency.unit == 'month':
+                next_scheduled += timedelta(days=30 * mbox.frequency.n)
+            mbox = mbox.query.find_and_modify(
+                query=dict(_id=mbox._id),
+                update={'$set': dict(
+                        next_scheduled=next_scheduled,
+                        queue=[])})
+            mbox.fire(now)
+
+    def fire(self, now):
+        notifications = Notification.query.find(dict(_id={'$in':self.queue}))
+        if self.type == 'direct':
+            for n in notifications:
+                n.send_direct(self.user_id)
+        elif self.type == 'digest':
+            Notification.send_digest(
+                self.user_id, 'noreply@in.sf.net', 'Digest Email',
+                notifications)
+        elif self.type == 'summary':
+            Notification.send_summary(
+                self.user_id, 'noreply@in.sf.net', 'Digest Email',
+                notifications)