Parent: [67df1e] (diff)

Child: [2c5659] (diff)

Download this file

test.py    172 lines (143 with data), 5.2 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# -*- coding: utf-8 -*-
"""Main Controller"""
import os
import logging
from urllib import unquote
import pkg_resources
from pylons import c, request, response
from webob import exc
from tg import expose
from tg.decorators import without_trailing_slash
import ming.orm.ormsession
import allura
from allura.lib.base import WsgiDispatchController
from allura.lib.security import require, require_authenticated, has_project_access, has_artifact_access
from allura.lib import helpers as h
from allura.lib import plugin
from allura import model as M
from .root import RootController
from .project import ProjectController
from .auth import AuthController
from .static import NewForgeController
from .search import SearchController
from .error import ErrorController
from .rest import RestController
from .oembed import OEmbedController
__all__ = ['RootController']
log = logging.getLogger(__name__)
class TestController(WsgiDispatchController, ProjectController):
'''Root controller for testing -- it behaves just like a
ProjectController for test/ except that all tools are mounted,
on-demand, at the mount point that is the same as their entry point
name.
Also, the test-admin is perpetually logged in here.
'''
def __init__(self):
setattr(self, 'feed.rss', self.feed)
setattr(self, 'feed.atom', self.feed)
self.oembed = OEmbedController()
for n in M.Neighborhood.query.find():
if n.url_prefix.startswith('//'): continue
n.bind_controller(self)
proxy_root = RootController()
self.dispatch = DispatchTest()
self.security = SecurityTests()
for attr in ('index', 'browse', 'auth', 'nf', 'error'):
setattr(self, attr, getattr(proxy_root, attr))
self.gsearch = proxy_root.search
self.rest = RestController()
super(TestController, self).__init__()
def _setup_request(self):
# This code fixes a race condition in our tests
c.project = M.Project.query.get(shortname='test')
count = 20
while c.project is None:
import sys, time
time.sleep(0.5)
log.warning('Project "test" not found, retrying...')
c.project = M.Project.query.get(shortname='test')
count -= 1
assert count > 0, 'Timeout waiting for test project to appear'
@expose()
def _lookup(self, name, *remainder):
if not h.re_path_portion.match(name):
raise exc.HTTPNotFound, name
subproject = M.Project.query.get(shortname=c.project.shortname + '/' + name)
if subproject:
c.project = subproject
c.app = None
return ProjectController(), remainder
app = c.project.app_instance(name)
if app is None:
c.project.install_app(name, name)
app = c.project.app_instance(name)
if app is None:
raise exc.HTTPNotFound, name
c.app = app
return app.root, remainder
#
# @expose('jinja:project_index.html')
# def index(self, **kw):
# require(has_project_access('read'))
# return dict()
def __call__(self, environ, start_response):
c.app = None
c.project = M.Project.query.get(shortname='test')
c.user = plugin.AuthenticationProvider.get(request).by_username(
environ.get('username', 'test-admin'))
return WsgiDispatchController.__call__(self, environ, start_response)
class DispatchTest(object):
@expose()
def _lookup(self, *args):
if args:
return NamedController(args[0]), args[1:]
else:
raise exc.HTTPNotFound()
class NamedController(object):
def __init__(self, name):
self.name = name
@expose()
def index(self, **kw):
return 'index ' + self.name
@expose()
def _default(self, *args):
return 'default(%s)(%r)' % (self.name, args)
class SecurityTests(object):
@expose()
def _lookup(self, name, *args):
name = unquote(name)
if name == '*anonymous':
c.user = M.User.anonymous()
return SecurityTest(), args
class SecurityTest(object):
def __init__(self):
from forgewiki import model as WM
self.page = WM.Page.query.get(title='Home')
c.app = c.project.app_instance('wiki')
@expose()
def forbidden(self):
require(lambda:False, 'Never allowed')
return ''
@expose()
def needs_auth(self):
require_authenticated()
return ''
@expose()
def needs_project_access_fail(self):
require(has_project_access('no_such_permission'))
return ''
@expose()
def needs_project_access_ok(self):
pred = has_project_access('read')
if not pred():
log.info('Inside needs_project_access, c.user = %s' % c.user)
require(pred)
return ''
@expose()
def needs_artifact_access_fail(self):
require(has_artifact_access('no_such_permission', self.page))
return ''
@expose()
def needs_artifact_access_ok(self):
require(has_artifact_access('read', self.page))
return ''