Child: [246f20] (diff)

Download this file

test_notification.py    235 lines (205 with data), 8.4 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import unittest
from datetime import timedelta
from pylons import g, c
from nose.tools import assert_equal
from ming.orm import ThreadLocalORMSession
from alluratest.controller import setup_basic_test, setup_global_objects, REGISTRY
from allura import model as M
from allura.lib import helpers as h
from allura.tests import decorators as td
from forgewiki import model as WM
class TestNotification(unittest.TestCase):
def setUp(self):
setup_basic_test()
self.setup_with_tools()
@td.with_wiki
def setup_with_tools(self):
setup_global_objects()
_clear_subscriptions()
_clear_notifications()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
M.notification.MAILBOX_QUIESCENT=None # disable message combining
def test_subscribe_unsubscribe(self):
M.Mailbox.subscribe(type='direct')
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
subscriptions = M.Mailbox.query.find(dict(
project_id=c.project._id,
app_config_id=c.app.config._id,
user_id=c.user._id)).all()
assert len(subscriptions) == 1
assert subscriptions[0].type=='direct'
assert M.Mailbox.query.find().count() == 1
M.Mailbox.unsubscribe()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
subscriptions = M.Mailbox.query.find(dict(
project_id=c.project._id,
app_config_id=c.app.config._id,
user_id=c.user._id)).all()
assert len(subscriptions) == 0
assert M.Mailbox.query.find().count() == 0
class TestPostNotifications(unittest.TestCase):
def setUp(self):
setup_basic_test()
self.setup_with_tools()
@td.with_wiki
def setup_with_tools(self):
setup_global_objects()
g.set_app('wiki')
_clear_subscriptions()
_clear_notifications()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
self.pg = WM.Page.query.get(app_config_id=c.app.config._id)
M.notification.MAILBOX_QUIESCENT=None # disable message combining
while M.MonQTask.run_ready('setup'):
ThreadLocalORMSession.flush_all()
def test_post_notification(self):
self._post_notification()
ThreadLocalORMSession.flush_all()
M.MonQTask.list()
t = M.MonQTask.get()
assert t.args[1] == self.pg.index_id()
def test_post_user_notification(self):
u = M.User.query.get(username='test-admin')
n = M.Notification.post_user(u, self.pg, 'metadata')
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
flash_msgs = list(h.pop_user_notifications(u))
assert len(flash_msgs) == 1, flash_msgs
msg = flash_msgs[0]
assert msg['text'].startswith('WikiPage Home modified by Test Admin')
assert msg['subject'].startswith('[test:wiki]')
flash_msgs = list(h.pop_user_notifications(u))
assert not flash_msgs, flash_msgs
def test_delivery(self):
self._subscribe()
self._post_notification()
M.MonQTask.run_ready()
ThreadLocalORMSession.flush_all()
M.MonQTask.run_ready()
ThreadLocalORMSession.flush_all()
assert M.Mailbox.query.find().count()==1
mbox = M.Mailbox.query.get()
assert len(mbox.queue) == 1
def test_email(self):
self._subscribe()
self._post_notification()
ThreadLocalORMSession.flush_all()
M.MonQTask.run_ready()
ThreadLocalORMSession.flush_all()
assert_equal(M.Notification.query.get()['from_address'], '"Test Admin" <test-admin@users.localhost>')
assert M.Mailbox.query.find().count()==1
mbox = M.Mailbox.query.get()
assert len(mbox.queue) == 1
M.Mailbox.fire_ready()
task = M.MonQTask.get()
for addr in c.user.email_addresses:
if addr in task.kwargs['fromaddr']: break
else:
assert False, 'From address is wrong: %s' % task.kwargs['fromaddr']
assert task.kwargs['text'].startswith('WikiPage Home modified by Test Admin')
assert 'you indicated interest in ' in task.kwargs['text']
def test_permissions(self):
# Notification should only be delivered if user has read perms on the
# artifact. The perm check happens just before the mail task is
# posted.
u = M.User.query.get(username='test-admin')
self._subscribe(user=u)
# Simulate a permission check failure.
def patched_has_access(*args, **kw):
def predicate(*args, **kw):
return False
return predicate
from allura.model.notification import security
orig = security.has_access
security.has_access = patched_has_access
try:
# this will create a notification task
self._post_notification()
ThreadLocalORMSession.flush_all()
# running the notification task will create a mail task if the
# permission check passes...
M.MonQTask.run_ready()
ThreadLocalORMSession.flush_all()
# ...but in this case it doesn't create a mail task since we
# forced the perm check to fail
assert M.MonQTask.get() == None
finally:
security.has_access = orig
def _subscribe(self, **kw):
self.pg.subscribe(type='direct', **kw)
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
def _post_notification(self):
return M.Notification.post(self.pg, 'metadata')
class TestSubscriptionTypes(unittest.TestCase):
def setUp(self):
setup_basic_test()
self.setup_with_tools()
@td.with_wiki
def setup_with_tools(self):
setup_global_objects()
g.set_app('wiki')
_clear_subscriptions()
_clear_notifications()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
self.pg = WM.Page.query.get(app_config_id=c.app.config._id)
M.notification.MAILBOX_QUIESCENT=None # disable message combining
def test_direct_sub(self):
self._subscribe()
self._post_notification(text='A')
self._post_notification(text='B')
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
M.Mailbox.fire_ready()
def test_digest_sub(self):
self._subscribe(type='digest')
self._post_notification(text='x'*1024)
self._post_notification()
M.Mailbox.fire_ready()
def test_summary_sub(self):
self._subscribe(type='summary')
self._post_notification(text='x'*1024)
self._post_notification()
M.Mailbox.fire_ready()
def test_message(self):
self._test_message()
self.setUp()
self._test_message()
self.setUp()
M.notification.MAILBOX_QUIESCENT=timedelta(minutes=1)
self.assertRaises(AssertionError, self._test_message)
def _test_message(self):
self._subscribe()
thd = M.Thread.query.get(ref_id=self.pg.index_id())
thd.post('This is a very cool message')
M.MonQTask.run_ready()
ThreadLocalORMSession.flush_all()
M.Mailbox.fire_ready()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
msg = M.MonQTask.query.get(
task_name='allura.tasks.mail_tasks.sendmail',
state='ready')
assert msg is not None
assert 'Home@wiki.test.p' in msg.kwargs['reply_to']
u = M.User.by_username('test-admin')
assert str(u._id) in msg.kwargs['fromaddr'], msg.kwargs['fromaddr']
def _clear_subscriptions(self):
M.Mailbox.query.remove({})
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
def _subscribe(self, type='direct', topic=None):
self.pg.subscribe(type=type, topic=topic)
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
def _post_notification(self, text=None):
return M.Notification.post(self.pg, 'metadata', text=text)
def _clear_subscriptions():
M.Mailbox.query.remove({})
def _clear_notifications():
M.Notification.query.remove({})