'''Manage notifications and subscriptions
When an artifact is modified:
- Notification generated by plugin app
- Search is made for subscriptions matching the notification
- Notification is added to each matching subscriptions' queue
Periodically:
- For each subscriptions with notifications and direct delivery:
- For each notification, enqueue as a separate email message
- Clear subscription's notification list
- For each subscription with notifications and delivery due:
- Enqueue one email message with all notifications
- Clear subscription's notification list
Notifications are also available for use in feeds
'''
from datetime import datetime, timedelta
from pylons import c, g
from ming import schema as S
from ming.orm import MappedClass, FieldProperty, ForeignIdProperty, RelationProperty
from pyforge.lib import helpers as h
from .session import main_orm_session, project_orm_session
from .types import ArtifactReferenceType
class Notification(MappedClass):
class __mongometa__:
session = main_orm_session
name = 'notification'
_id = FieldProperty(str, if_missing=h.gen_message_id)
# Classify notifications
project_id = ForeignIdProperty('Project', if_missing=lambda:c.project._id)
app_config_id = ForeignIdProperty('AppConfig', if_missing=lambda:c.app.config._id)
artifact_reference = FieldProperty(ArtifactReferenceType)
topic = FieldProperty(str)
# Notification Content
in_reply_to=FieldProperty(str)
from_address=FieldProperty(str)
subject=FieldProperty(str)
text=FieldProperty(str)
link=FieldProperty(str)
author_id=ForeignIdProperty('User')
feed_meta=FieldProperty(dict(
link=str,
created=S.DateTime(if_missing=datetime.utcnow),
unique_id=S.String(if_missing=lambda:h.nonce(40)),
author_name=S.String(if_missing=lambda:c.user.display_name),
author_link=S.String(if_missing=lambda:c.user.url())))
@classmethod
def post(cls, artifact, topic, **kw):
'''Create a notification and send the notify message'''
idx = artifact.index()
subject_prefix = '[%s:%s] ' % (
c.project.shortname, c.app.config.options.mount_point)
if topic == 'message':
post = kw.pop('post')
subject = post.subject
if post.parent_id and not post.subject.lower().startswith('re:'):
subject = 'Re: ' + subject
d = dict(
_id=post._id,
from_address='%s <%s>' % (
post.author().display_name, getattr(artifact, 'email_address', 'noreply@in.sf.net')),
subject=subject_prefix + subject,
text=post.text,
in_reply_to=post.parent_id)
else:
subject = kw.pop('subject', '%s modified by %s' % (
idx['title_s'], c.user.display_name))
d = dict(
from_address='%s <%s>' % (
idx['title_s'], artifact.email_address),
subject=subject_prefix + subject,
text=kw.pop('text', subject))
n = cls(artifact_reference=artifact.dump_ref(),
topic=topic,
link=kw.pop('link', artifact.url()),
**d)
g.publish('react', 'forgemail.notify', dict(
notification_id=n._id,
artifact_index_id=idx['id'],
topic=topic))
return n
def send_direct(self, user_id):
g.publish('audit', 'forgemail.send_email', {
'destinations':[str(user_id)],
'from':self.from_address,
'subject':self.subject,
'message_id':self._id,
'in_reply_to':self.in_reply_to,
'text':self.text},
serializer='pickle')
@classmethod
def send_digest(self, user_id, from_address, subject, notifications):
text = [ 'Digest of %s' % subject ]
for n in notifications:
text.append('From: %s' % n.from_address)
text.append('Subject: %s' % n.subject)
text.append('Message-ID: %s' % n._id)
text.append('')
text.append(n.text or '-no text-')
text = '\n'.join(text)
g.publish('audit', 'forgemail.send_email', {
'destinations':[str(user_id)],
'from':from_address,
'subject':subject,
'message_id':h.gen_message_id(),
'text':text},
serializer='pickle')
@classmethod
def send_summary(self, user_id, from_address, subject, notifications):
text = [ 'Digest of %s' % subject ]
for n in notifications:
text.append('From: %s' % n.from_address)
text.append('Subject: %s' % n.subject)
text.append('Message-ID: %s' % n._id)
text.append('')
text.append(h.text.truncate(n.text or '-no text-', 128))
text = '\n'.join(text)
g.publish('audit', 'forgemail.send_email', {
'destinations':[str(user_id)],
'from':from_address,
'subject':subject,
'message_id':h.gen_message_id(),
'text':text},
serializer='pickle')
class Subscriptions(MappedClass):
'''User-maintained subscriptions
'''
class __mongometa__:
session = main_orm_session
name = 'subscriptions'
_id = FieldProperty(S.ObjectId)
user_id = ForeignIdProperty('User', if_missing=lambda:c.user._id)
project_id = ForeignIdProperty('Project', if_missing=lambda:c.project._id)
app_config_id = ForeignIdProperty('AppConfig', if_missing=lambda:c.app.config._id)
subscriptions = FieldProperty(
[ {'_id':S.ObjectId,
'artifact_index_id':str,
'topic':str,
'type':S.OneOf('direct', 'digest', 'summary'),
'frequency':{'n':int, 'unit':S.OneOf('day', 'week', 'month')},
'next_scheduled':datetime,
'mailbox_id':S.ObjectId,
}])
project = RelationProperty('Project')
app_config = RelationProperty('AppConfig')
@classmethod
def upsert(cls, user=None, project=None, app=None):
if user is None: user = c.user
if project is None: project = c.project
if app is None: app = c.app
d = dict(
user_id=user._id,
project_id=project._id,
app_config_id=app.config._id)
result = cls.query.get(**d)
if result is None: result = cls(**d)
return result
def subscribed(self, artifact=None, topic=None):
if artifact:
artifact_index_id = artifact.index_id()
else:
artifact_index_id = None
new_subs = []
for s in self.subscriptions:
if (s.artifact_index_id == artifact_index_id
and s.topic == topic):
return True
return False
def subscribe(self, type, n=1, unit='day', artifact=None, topic=None):
'''Subscribe to notifications on the current project/app, optionally
filtered by artifact and topic. This method creates the associated
mailbox as well.
'''
if self.subscribed(artifact, topic): return
if artifact:
artifact_index_id = artifact.index_id()
else:
artifact_index_id = None
d = dict(
artifact_index_id=artifact_index_id,
topic=topic,
type=type,
frequency=dict(n=n, unit=unit),
next_scheduled=datetime.utcnow())
mbox = Mailbox(**d)
self.subscriptions.append(
dict(d, mailbox_id=mbox._id))
def unsubscribe(self, artifact=None, artifact_index_id=None, topic=None):
'''Unsubscribe to notifications on the current project/app, optionally
filtered by artifact and topic. This method removes the associated
mailbox as well.
'''
if artifact_index_id is None and artifact is not None:
artifact_index_id = artifact.index_id()
new_subs = []
for s in self.subscriptions:
if (s.artifact_index_id == artifact_index_id
and s.topic == topic):
Mailbox.query.remove(dict(_id=s.mailbox_id))
else:
new_subs.append(s)
self.subscriptions = new_subs
class Mailbox(MappedClass):
class __mongometa__:
session = main_orm_session
name = 'mailbox'
_id = FieldProperty(S.ObjectId)
user_id = ForeignIdProperty('User', if_missing=lambda:c.user._id)
project_id = ForeignIdProperty('Project', if_missing=lambda:c.project._id)
app_config_id = ForeignIdProperty('AppConfig', if_missing=lambda:c.app.config._id)
# Subscription filters
artifact_index_id = FieldProperty(str)
topic = FieldProperty(str)
# Subscription type
type = FieldProperty(S.OneOf, 'direct', 'digest', 'summary')
frequency = FieldProperty(dict(
n=int,unit=S.OneOf('day', 'week', 'month')))
next_scheduled = FieldProperty(datetime)
# Actual notification IDs
queue = FieldProperty([str])
@classmethod
def deliver(cls, nid, artifact_index_id, topic):
'''Called in the notification message handler to deliver notification IDs
to the appropriate mailboxes. Atomically appends the nids
to the appropriate mailboxes.
'''
q = cls.query.find({
'project_id':c.project._id,
'app_config_id':c.app.config._id,
'artifact_index_id':{'$in':[None, artifact_index_id]},
'topic':{'$in':[None, topic]}
})
for mbox in q:
mbox.query.update(
dict(_id=mbox._id),
{'$push':dict(queue=nid)})
@classmethod
def fire_ready(cls):
'''Fires all direct subscriptions with notifications as well as
all summary & digest subscriptions with notifications that are ready
'''
now = datetime.utcnow()
# Queries to find all matching subscription objects
q_direct = dict(
type='direct',
queue={'$ne':[]})
q_digest = dict(
type={'$in': ['digest', 'summary']},
next_scheduled={'$lt':now})
for mbox in cls.query.find(q_direct):
mbox = mbox.query.find_and_modify(
query=dict(_id=mbox._id),
update={'$set': dict(
queue=[])})
mbox.fire(now)
for mbox in cls.query.find(q_digest):
next_scheduled = now
if mbox.frequency.unit == 'day':
next_scheduled += timedelta(days=mbox.frequency.n)
elif mbox.frequency.unit == 'week':
next_scheduled += timedelta(days=7 * mbox.frequency.n)
elif mbox.frequency.unit == 'month':
next_scheduled += timedelta(days=30 * mbox.frequency.n)
mbox = mbox.query.find_and_modify(
query=dict(_id=mbox._id),
update={'$set': dict(
next_scheduled=next_scheduled,
queue=[])})
mbox.fire(now)
def fire(self, now):
notifications = Notification.query.find(dict(_id={'$in':self.queue}))
if self.type == 'direct':
for n in notifications:
n.send_direct(self.user_id)
elif self.type == 'digest':
Notification.send_digest(
self.user_id, 'noreply@in.sf.net', 'Digest Email',
notifications)
elif self.type == 'summary':
Notification.send_summary(
self.user_id, 'noreply@in.sf.net', 'Digest Email',
notifications)