--- a
+++ b/pyforge/pyforge/model/notification.py
@@ -0,0 +1,311 @@
+'''Manage notifications and subscriptions
+
+When an artifact is modified:
+- Notification generated by plugin app
+- Search is made for subscriptions matching the notification
+- Notification is added to each matching subscriptions' queue
+
+Periodically:
+- For each subscriptions with notifications and direct delivery:
+ - For each notification, enqueue as a separate email message
+ - Clear subscription's notification list
+- For each subscription with notifications and delivery due:
+ - Enqueue one email message with all notifications
+ - Clear subscription's notification list
+
+Notifications are also available for use in feeds
+'''
+
+from datetime import datetime, timedelta
+
+from pylons import c, g
+
+from ming import schema as S
+from ming.orm import MappedClass, FieldProperty, ForeignIdProperty, RelationProperty
+
+from pyforge.lib import helpers as h
+
+from .session import main_orm_session, project_orm_session
+from .types import ArtifactReferenceType
+
+class Notification(MappedClass):
+ class __mongometa__:
+ session = main_orm_session
+ name = 'notification'
+
+ _id = FieldProperty(str, if_missing=h.gen_message_id)
+
+ # Classify notifications
+ project_id = ForeignIdProperty('Project', if_missing=lambda:c.project._id)
+ app_config_id = ForeignIdProperty('AppConfig', if_missing=lambda:c.app.config._id)
+ artifact_reference = FieldProperty(ArtifactReferenceType)
+ topic = FieldProperty(str)
+
+ # Notification Content
+ in_reply_to=FieldProperty(str)
+ from_address=FieldProperty(str)
+ subject=FieldProperty(str)
+ text=FieldProperty(str)
+ link=FieldProperty(str)
+ author_id=ForeignIdProperty('User')
+ feed_meta=FieldProperty(dict(
+ link=str,
+ created=S.DateTime(if_missing=datetime.utcnow),
+ unique_id=S.String(if_missing=lambda:h.nonce(40)),
+ author_name=S.String(if_missing=lambda:c.user.display_name),
+ author_link=S.String(if_missing=lambda:c.user.url())))
+
+ @classmethod
+ def post(cls, artifact, topic, **kw):
+ '''Create a notification and send the notify message'''
+ idx = artifact.index()
+ subject_prefix = '[%s:%s] ' % (
+ c.project.shortname, c.app.config.options.mount_point)
+ if topic == 'message':
+ post = kw.pop('post')
+ subject = post.subject
+ if post.parent_id and not post.subject.lower().startswith('re:'):
+ subject = 'Re: ' + subject
+ d = dict(
+ _id=post._id,
+ from_address='%s <%s>' % (
+ post.author().display_name, getattr(artifact, 'email_address', 'noreply@in.sf.net')),
+ subject=subject_prefix + subject,
+ text=post.text,
+ in_reply_to=post.parent_id)
+ else:
+ subject = kw.pop('subject', '%s modified by %s' % (
+ idx['title_s'], c.user.display_name))
+ d = dict(
+ from_address='%s <%s>' % (
+ idx['title_s'], artifact.email_address),
+ subject=subject_prefix + subject,
+ text=kw.pop('text', subject))
+ n = cls(artifact_reference=artifact.dump_ref(),
+ topic=topic,
+ link=kw.pop('link', artifact.url()),
+ **d)
+ g.publish('react', 'forgemail.notify', dict(
+ notification_id=n._id,
+ artifact_index_id=idx['id'],
+ topic=topic))
+ return n
+
+ def send_direct(self, user_id):
+ g.publish('audit', 'forgemail.send_email', {
+ 'destinations':[str(user_id)],
+ 'from':self.from_address,
+ 'subject':self.subject,
+ 'message_id':self._id,
+ 'in_reply_to':self.in_reply_to,
+ 'text':self.text},
+ serializer='pickle')
+
+ @classmethod
+ def send_digest(self, user_id, from_address, subject, notifications):
+ text = [ 'Digest of %s' % subject ]
+ for n in notifications:
+ text.append('From: %s' % n.from_address)
+ text.append('Subject: %s' % n.subject)
+ text.append('Message-ID: %s' % n._id)
+ text.append('')
+ text.append(n.text or '-no text-')
+ text = '\n'.join(text)
+ g.publish('audit', 'forgemail.send_email', {
+ 'destinations':[str(user_id)],
+ 'from':from_address,
+ 'subject':subject,
+ 'message_id':h.gen_message_id(),
+ 'text':text},
+ serializer='pickle')
+
+ @classmethod
+ def send_summary(self, user_id, from_address, subject, notifications):
+ text = [ 'Digest of %s' % subject ]
+ for n in notifications:
+ text.append('From: %s' % n.from_address)
+ text.append('Subject: %s' % n.subject)
+ text.append('Message-ID: %s' % n._id)
+ text.append('')
+ text.append(h.text.truncate(n.text or '-no text-', 128))
+ text = '\n'.join(text)
+ g.publish('audit', 'forgemail.send_email', {
+ 'destinations':[str(user_id)],
+ 'from':from_address,
+ 'subject':subject,
+ 'message_id':h.gen_message_id(),
+ 'text':text},
+ serializer='pickle')
+
+class Subscriptions(MappedClass):
+ '''User-maintained subscriptions
+ '''
+ class __mongometa__:
+ session = main_orm_session
+ name = 'subscriptions'
+
+ _id = FieldProperty(S.ObjectId)
+ user_id = ForeignIdProperty('User', if_missing=lambda:c.user._id)
+ project_id = ForeignIdProperty('Project', if_missing=lambda:c.project._id)
+ app_config_id = ForeignIdProperty('AppConfig', if_missing=lambda:c.app.config._id)
+ subscriptions = FieldProperty(
+ [ {'_id':S.ObjectId,
+ 'artifact_index_id':str,
+ 'topic':str,
+ 'type':S.OneOf('direct', 'digest', 'summary'),
+ 'frequency':{'n':int, 'unit':S.OneOf('day', 'week', 'month')},
+ 'next_scheduled':datetime,
+ 'mailbox_id':S.ObjectId,
+ }])
+ project = RelationProperty('Project')
+ app_config = RelationProperty('AppConfig')
+
+
+ @classmethod
+ def upsert(cls, user=None, project=None, app=None):
+ if user is None: user = c.user
+ if project is None: project = c.project
+ if app is None: app = c.app
+ d = dict(
+ user_id=user._id,
+ project_id=project._id,
+ app_config_id=app.config._id)
+ result = cls.query.get(**d)
+ if result is None: result = cls(**d)
+ return result
+
+ def subscribed(self, artifact=None, topic=None):
+ if artifact:
+ artifact_index_id = artifact.index_id()
+ else:
+ artifact_index_id = None
+ new_subs = []
+ for s in self.subscriptions:
+ if (s.artifact_index_id == artifact_index_id
+ and s.topic == topic):
+ return True
+ return False
+
+ def subscribe(self, type, n=1, unit='day', artifact=None, topic=None):
+ '''Subscribe to notifications on the current project/app, optionally
+ filtered by artifact and topic. This method creates the associated
+ mailbox as well.
+ '''
+ if self.subscribed(artifact, topic): return
+ if artifact:
+ artifact_index_id = artifact.index_id()
+ else:
+ artifact_index_id = None
+ d = dict(
+ artifact_index_id=artifact_index_id,
+ topic=topic,
+ type=type,
+ frequency=dict(n=n, unit=unit),
+ next_scheduled=datetime.utcnow())
+ mbox = Mailbox(**d)
+ self.subscriptions.append(
+ dict(d, mailbox_id=mbox._id))
+
+ def unsubscribe(self, artifact=None, artifact_index_id=None, topic=None):
+ '''Unsubscribe to notifications on the current project/app, optionally
+ filtered by artifact and topic. This method removes the associated
+ mailbox as well.
+ '''
+ if artifact_index_id is None and artifact is not None:
+ artifact_index_id = artifact.index_id()
+ new_subs = []
+ for s in self.subscriptions:
+ if (s.artifact_index_id == artifact_index_id
+ and s.topic == topic):
+ Mailbox.query.remove(dict(_id=s.mailbox_id))
+ else:
+ new_subs.append(s)
+ self.subscriptions = new_subs
+
+class Mailbox(MappedClass):
+ class __mongometa__:
+ session = main_orm_session
+ name = 'mailbox'
+
+ _id = FieldProperty(S.ObjectId)
+ user_id = ForeignIdProperty('User', if_missing=lambda:c.user._id)
+ project_id = ForeignIdProperty('Project', if_missing=lambda:c.project._id)
+ app_config_id = ForeignIdProperty('AppConfig', if_missing=lambda:c.app.config._id)
+
+ # Subscription filters
+ artifact_index_id = FieldProperty(str)
+ topic = FieldProperty(str)
+
+ # Subscription type
+ type = FieldProperty(S.OneOf, 'direct', 'digest', 'summary')
+ frequency = FieldProperty(dict(
+ n=int,unit=S.OneOf('day', 'week', 'month')))
+ next_scheduled = FieldProperty(datetime)
+
+ # Actual notification IDs
+ queue = FieldProperty([str])
+
+ @classmethod
+ def deliver(cls, nid, artifact_index_id, topic):
+ '''Called in the notification message handler to deliver notification IDs
+ to the appropriate mailboxes. Atomically appends the nids
+ to the appropriate mailboxes.
+ '''
+ q = cls.query.find({
+ 'project_id':c.project._id,
+ 'app_config_id':c.app.config._id,
+ 'artifact_index_id':{'$in':[None, artifact_index_id]},
+ 'topic':{'$in':[None, topic]}
+ })
+ for mbox in q:
+ mbox.query.update(
+ dict(_id=mbox._id),
+ {'$push':dict(queue=nid)})
+
+ @classmethod
+ def fire_ready(cls):
+ '''Fires all direct subscriptions with notifications as well as
+ all summary & digest subscriptions with notifications that are ready
+ '''
+ now = datetime.utcnow()
+ # Queries to find all matching subscription objects
+ q_direct = dict(
+ type='direct',
+ queue={'$ne':[]})
+ q_digest = dict(
+ type={'$in': ['digest', 'summary']},
+ next_scheduled={'$lt':now})
+ for mbox in cls.query.find(q_direct):
+ mbox = mbox.query.find_and_modify(
+ query=dict(_id=mbox._id),
+ update={'$set': dict(
+ queue=[])})
+ mbox.fire(now)
+ for mbox in cls.query.find(q_digest):
+ next_scheduled = now
+ if mbox.frequency.unit == 'day':
+ next_scheduled += timedelta(days=mbox.frequency.n)
+ elif mbox.frequency.unit == 'week':
+ next_scheduled += timedelta(days=7 * mbox.frequency.n)
+ elif mbox.frequency.unit == 'month':
+ next_scheduled += timedelta(days=30 * mbox.frequency.n)
+ mbox = mbox.query.find_and_modify(
+ query=dict(_id=mbox._id),
+ update={'$set': dict(
+ next_scheduled=next_scheduled,
+ queue=[])})
+ mbox.fire(now)
+
+ def fire(self, now):
+ notifications = Notification.query.find(dict(_id={'$in':self.queue}))
+ if self.type == 'direct':
+ for n in notifications:
+ n.send_direct(self.user_id)
+ elif self.type == 'digest':
+ Notification.send_digest(
+ self.user_id, 'noreply@in.sf.net', 'Digest Email',
+ notifications)
+ elif self.type == 'summary':
+ Notification.send_summary(
+ self.user_id, 'noreply@in.sf.net', 'Digest Email',
+ notifications)