Child: [bea16d] (diff)

Download this file

test_tasks.py    230 lines (198 with data), 9.0 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# -*- coding: utf-8 -*-
import operator
import sys
import unittest
from base64 import b64encode
import mock
import pylons
pylons.c = pylons.tmpl_context
pylons.g = pylons.app_globals
from pylons import c, g
from datadiff.tools import assert_equal
from nose.tools import assert_in
from ming.orm import FieldProperty, Mapper
from alluratest.controller import setup_basic_test, setup_global_objects
from allura import model as M
from allura.lib import helpers as h
from allura.lib import search
from allura.lib.exceptions import CompoundError
from allura.tasks import event_tasks
from allura.tasks import index_tasks
from allura.tasks import mail_tasks
from allura.tasks import notification_tasks
from allura.tasks import repo_tasks
from allura.tests import decorators as td
from allura.lib.decorators import event_handler, task
class TestRepoTasks(unittest.TestCase):
@mock.patch('allura.tasks.repo_tasks.c.app')
@mock.patch('allura.tasks.repo_tasks.g.post_event')
def test_clone_posts_event_on_failure(self, post_event, app):
fake_source_url = 'fake_source_url'
fake_traceback = 'fake_traceback'
app.repo.init_as_clone.side_effect = Exception(fake_traceback)
repo_tasks.clone(None, None, fake_source_url)
assert_equal(post_event.call_args[0][0], 'repo_clone_task_failed')
assert_equal(post_event.call_args[0][1], fake_source_url)
assert_equal(post_event.call_args[0][2], None)
# ignore args[3] which is a traceback string
class TestEventTasks(unittest.TestCase):
def setUp(self):
self.called_with = []
def test_fire_event(self):
event_tasks.event('my_event', self, 1, 2, a=5)
assert self.called_with == [((1,2), {'a':5}) ], self.called_with
def test_compound_error(self):
'''test_compound_exception -- make sure our multi-exception return works
OK
'''
setup_basic_test()
setup_global_objects()
t = raise_exc.post()
self.assertRaises(CompoundError, t)
for x in range(10):
assert ('assert %d' % x) in t.result
class TestIndexTasks(unittest.TestCase):
def setUp(self):
setup_basic_test()
setup_global_objects()
@td.with_wiki
def test_add_artifacts(self):
old_shortlinks = M.Shortlink.query.find().count()
old_solr_size = len(g.solr.db)
artifacts = [ _TestArtifact() for x in range(5) ]
for i, a in enumerate(artifacts):
a._shorthand_id = 't%d' % i
a.text = 'This is a reference to [t3]'
arefs = [ M.ArtifactReference.from_artifact(a) for a in artifacts ]
ref_ids = [ r._id for r in arefs ]
M.artifact_orm_session.flush()
index_tasks.add_artifacts(ref_ids)
new_shortlinks = M.Shortlink.query.find().count()
new_solr_size = len(g.solr.db)
assert old_shortlinks + 5 == new_shortlinks, 'Shortlinks not created'
assert old_solr_size + 5 == new_solr_size, "Solr additions didn't happen"
M.main_orm_session.flush()
M.main_orm_session.clear()
a = _TestArtifact.query.get(_shorthand_id='t3')
assert len(a.backrefs) == 5, a.backrefs
@td.with_wiki
@mock.patch('allura.tasks.index_tasks.g.solr')
def test_del_artifacts(self, solr):
old_shortlinks = M.Shortlink.query.find().count()
artifacts = [ _TestArtifact(_shorthand_id='ta_%s' % x) for x in range(5) ]
M.artifact_orm_session.flush()
arefs = [ M.ArtifactReference.from_artifact(a) for a in artifacts ]
ref_ids = [ r._id for r in arefs ]
M.artifact_orm_session.flush()
index_tasks.add_artifacts(ref_ids)
M.main_orm_session.flush()
M.main_orm_session.clear()
new_shortlinks = M.Shortlink.query.find().count()
assert old_shortlinks + 5 == new_shortlinks, 'Shortlinks not created'
assert solr.add.call_count == 1
sort_key = operator.itemgetter('id')
assert_equal(
sorted(solr.add.call_args[0][0], key=sort_key),
sorted([search.solarize(ref.artifact) for ref in arefs],
key=sort_key))
index_tasks.del_artifacts(ref_ids)
M.main_orm_session.flush()
M.main_orm_session.clear()
new_shortlinks = M.Shortlink.query.find().count()
assert old_shortlinks == new_shortlinks, 'Shortlinks not deleted'
solr_query = 'id:({0})'.format(' || '.join(ref_ids))
solr.delete.assert_called_once_with(q=solr_query)
class TestMailTasks(unittest.TestCase):
def setUp(self):
setup_basic_test()
setup_global_objects()
# these tests go down through the mail_util.SMTPClient.sendmail method
# since usage is generally through the task, and not using mail_util directly
def test_send_email_ascii_with_user_lookup(self):
c.user = M.User.by_username('test-admin')
with mock.patch.object(mail_tasks.smtp_client, '_client') as _client:
mail_tasks.sendmail(
fromaddr=str(c.user._id),
destinations=[ str(c.user._id) ],
text=u'This is a test',
reply_to=u'noreply@sf.net',
subject=u'Test subject',
message_id=h.gen_message_id())
assert_equal(_client.sendmail.call_count, 1)
return_path, rcpts, body = _client.sendmail.call_args[0]
body = body.split('\n')
assert_equal(rcpts, [c.user.get_pref('email_address')])
assert_in('Reply-To: noreply@sf.net', body)
assert_in('From: "Test Admin" <test-admin@users.localhost>', body)
assert_in('Subject: Test subject', body)
# plain
assert_in('This is a test', body)
# html
assert_in('<div class="markdown_content"><p>This is a test</p></div>', body)
def test_send_email_nonascii(self):
with mock.patch.object(mail_tasks.smtp_client, '_client') as _client:
mail_tasks.sendmail(
fromaddr=u'"����" <foo@bar.com>',
destinations=[ 'blah@blah.com' ],
text=u'�������������� ���������������� ����������������',
reply_to=u'noreply@sf.net',
subject=u'���� �������������������� ��������������',
message_id=h.gen_message_id())
assert_equal(_client.sendmail.call_count, 1)
return_path, rcpts, body = _client.sendmail.call_args[0]
body = body.split('\n')
assert_equal(rcpts, ['blah@blah.com'])
assert_in('Reply-To: noreply@sf.net', body)
# The address portion must not be encoded, only the name portion can be.
# Also it is apparently not necessary to have the double-quote separators present
# when the name portion is encoded. That is, the encoding below is just ���� and not "����"
assert_in('From: =?utf-8?b?0J/Qvg==?= <foo@bar.com>', body)
assert_in('Subject: =?utf-8?b?0J/QviDQvtC20LjQstC70ZHQvdC90YvQvCDQsdC10YDQtdCz0LDQvA==?=', body)
assert_in('Content-Type: text/plain; charset="utf-8"', body)
assert_in('Content-Transfer-Encoding: base64', body)
assert_in(b64encode(u'�������������� ���������������� ����������������'.encode('utf-8')), body)
@td.with_wiki
def test_receive_email_ok(self):
c.user = M.User.by_username('test-admin')
import forgewiki
with mock.patch.object(forgewiki.wiki_main.ForgeWikiApp, 'handle_message') as f:
mail_tasks.route_email(
'0.0.0.0', c.user.email_addresses[0],
['Page@wiki.test.p.in.sf.net'],
'This is a mail message')
args, kwargs = f.call_args
assert args[0] == 'Page'
assert len(args) == 2
class TestNotificationTasks(unittest.TestCase):
def setUp(self):
setup_basic_test()
setup_global_objects()
def test_delivers_messages(self):
with mock.patch.object(M.Mailbox, 'deliver') as deliver:
with mock.patch.object(M.Mailbox, 'fire_ready') as fire_ready:
notification_tasks.notify('42', '52', 'none')
assert deliver.called_with('42', '52', 'none')
assert fire_ready.called_with()
@event_handler('my_event')
def _my_event(event_type, testcase, *args, **kwargs):
testcase.called_with.append((args, kwargs))
@task
def raise_exc():
errs = []
for x in range(10):
try:
assert False, 'assert %d' % x
except:
errs.append(sys.exc_info())
raise CompoundError(*errs)
class _TestArtifact(M.Artifact):
_shorthand_id = FieldProperty(str)
text = FieldProperty(str)
def url(self): return ''
def shorthand_id(self):
return getattr(self, '_shorthand_id', self._id)
def index(self):
return dict(
super(_TestArtifact, self).index(),
text=self.text)
Mapper.compile_all()