|
a/Allura/allura/lib/security.py |
|
b/Allura/allura/lib/security.py |
|
... |
|
... |
7 |
from webob import exc
|
7 |
from webob import exc
|
8 |
from itertools import chain
|
8 |
from itertools import chain
|
9 |
from ming.utils import LazyProperty
|
9 |
from ming.utils import LazyProperty
|
10 |
|
10 |
|
11 |
class Credentials(object):
|
11 |
class Credentials(object):
|
|
|
12 |
'''
|
|
|
13 |
Role graph logic & caching
|
|
|
14 |
'''
|
12 |
|
15 |
|
13 |
def __init__(self):
|
16 |
def __init__(self):
|
14 |
self.clear()
|
17 |
self.clear()
|
15 |
|
18 |
|
16 |
@classmethod
|
19 |
@classmethod
|
17 |
def get(cls):
|
20 |
def get(cls):
|
|
|
21 |
'get the global Credentials instance'
|
18 |
import allura
|
22 |
import allura
|
19 |
return allura.credentials
|
23 |
return allura.credentials
|
20 |
|
24 |
|
21 |
def clear(self):
|
25 |
def clear(self):
|
|
|
26 |
'clear cache'
|
22 |
self.users = {}
|
27 |
self.users = {}
|
23 |
self.projects = {}
|
28 |
self.projects = {}
|
24 |
|
29 |
|
25 |
def project_roles(self, project_id):
|
30 |
def project_roles(self, project_id):
|
|
|
31 |
'''
|
|
|
32 |
:returns: a RoleCache of ProjectRoles for project_id
|
|
|
33 |
'''
|
26 |
from allura import model as M
|
34 |
from allura import model as M
|
27 |
roles = self.projects.get(project_id)
|
35 |
roles = self.projects.get(project_id)
|
28 |
if roles is None:
|
36 |
if roles is None:
|
29 |
roles = self.projects[project_id] = RoleCache(
|
37 |
roles = self.projects[project_id] = RoleCache(
|
30 |
self, M.ProjectRole.query.find(dict(project_id=project_id)))
|
38 |
self, M.ProjectRole.query.find(dict(project_id=project_id)))
|
31 |
return roles
|
39 |
return roles
|
32 |
|
40 |
|
33 |
def user_roles(self, user_id, project_id=None):
|
41 |
def user_roles(self, user_id, project_id=None):
|
|
|
42 |
'''
|
|
|
43 |
:returns: a RoleCache of ProjectRoles for given user_id and project_id, *anonymous and *authenticated checked as appropriate
|
|
|
44 |
'''
|
34 |
from allura import model as M
|
45 |
from allura import model as M
|
35 |
roles = self.users.get((user_id, project_id))
|
46 |
roles = self.users.get((user_id, project_id))
|
36 |
if roles is None:
|
47 |
if roles is None:
|
37 |
if project_id is None:
|
48 |
if project_id is None:
|
38 |
if user_id is None:
|
49 |
if user_id is None:
|
|
... |
|
... |
159 |
@LazyProperty
|
170 |
@LazyProperty
|
160 |
def reaching_ids_set(self):
|
171 |
def reaching_ids_set(self):
|
161 |
return set(self.reaching_ids)
|
172 |
return set(self.reaching_ids)
|
162 |
|
173 |
|
163 |
def has_neighborhood_access(access_type, neighborhood, user=None):
|
174 |
def has_neighborhood_access(access_type, neighborhood, user=None):
|
|
|
175 |
'''
|
|
|
176 |
:param str access_type: permission name
|
|
|
177 |
:param Neighborhood neighborhood:
|
|
|
178 |
:param User user: a specific user, else the current user
|
|
|
179 |
:returns: _another function_ which returns True if has access
|
|
|
180 |
'''
|
164 |
from allura import model as M
|
181 |
from allura import model as M
|
165 |
def result(user=user):
|
182 |
def result(user=user):
|
166 |
if user is None: user = c.user
|
183 |
if user is None: user = c.user
|
167 |
acl = neighborhood.acl[access_type]
|
184 |
acl = neighborhood.acl[access_type]
|
168 |
anon = M.User.anonymous()
|
185 |
anon = M.User.anonymous()
|
|
... |
|
... |
171 |
if u == anon._id or u == user._id: return True
|
188 |
if u == anon._id or u == user._id: return True
|
172 |
return False
|
189 |
return False
|
173 |
return result
|
190 |
return result
|
174 |
|
191 |
|
175 |
def has_project_access(access_type, project=None, user=None):
|
192 |
def has_project_access(access_type, project=None, user=None):
|
|
|
193 |
'''
|
|
|
194 |
:param str access_type: permission name
|
|
|
195 |
:param Project project: a specific project, else the current project
|
|
|
196 |
:param User user: a specific user, else the current user
|
|
|
197 |
:returns: _another function_ which returns True if has access. Neighborhood admin access will always result in True
|
|
|
198 |
'''
|
176 |
def result(project=project, user=user):
|
199 |
def result(project=project, user=user):
|
177 |
if project is None: project = c.project
|
200 |
if project is None: project = c.project
|
178 |
if user is None: user = c.user
|
201 |
if user is None: user = c.user
|
179 |
assert user, 'c.user should always be at least M.User.anonymous()'
|
202 |
assert user, 'c.user should always be at least M.User.anonymous()'
|
180 |
cred = Credentials.get()
|
203 |
cred = Credentials.get()
|
|
... |
|
... |
185 |
return True
|
208 |
return True
|
186 |
return False
|
209 |
return False
|
187 |
return result
|
210 |
return result
|
188 |
|
211 |
|
189 |
def has_artifact_access(access_type, obj=None, user=None, app=None):
|
212 |
def has_artifact_access(access_type, obj=None, user=None, app=None):
|
|
|
213 |
'''
|
|
|
214 |
Check for artifact- or application-level access
|
|
|
215 |
|
|
|
216 |
:param str access_type: permission name
|
|
|
217 |
:param Artifact obj: if None, application access is checked
|
|
|
218 |
:param Project project: a specific project, else the current project
|
|
|
219 |
:param Application app: a specific user, else the current user
|
|
|
220 |
:returns: _another function_ which returns True if has access. Neighborhood admin access will always result in True
|
|
|
221 |
'''
|
190 |
def result(user=user, app=app):
|
222 |
def result(user=user, app=app):
|
191 |
if user is None: user = c.user
|
223 |
if user is None: user = c.user
|
192 |
if app is None: app = c.app
|
224 |
if app is None: app = c.app
|
193 |
project_id = app.project.root_project._id
|
225 |
project_id = app.project.root_project._id
|
194 |
assert user, 'c.user should always be at least M.User.anonymous()'
|
226 |
assert user, 'c.user should always be at least M.User.anonymous()'
|
|
... |
|
... |
201 |
return True
|
233 |
return True
|
202 |
return False
|
234 |
return False
|
203 |
return result
|
235 |
return result
|
204 |
|
236 |
|
205 |
def require(predicate, message=None):
|
237 |
def require(predicate, message=None):
|
|
|
238 |
'''
|
|
|
239 |
Example: require(has_artifact_access('read'))
|
|
|
240 |
|
|
|
241 |
:param callable predicate: truth function to call
|
|
|
242 |
:param str message: message to show upon failure
|
|
|
243 |
:raises: HTTPForbidden or HTTPUnauthorized
|
|
|
244 |
'''
|
|
|
245 |
|
206 |
from allura import model as M
|
246 |
from allura import model as M
|
207 |
if predicate(): return
|
247 |
if predicate(): return
|
208 |
if not message:
|
248 |
if not message:
|
209 |
message = """You don't have permission to do that.
|
249 |
message = """You don't have permission to do that.
|
210 |
You must ask a project administrator for rights to perform this task.
|
250 |
You must ask a project administrator for rights to perform this task.
|
|
... |
|
... |
214 |
raise exc.HTTPForbidden(detail=message)
|
254 |
raise exc.HTTPForbidden(detail=message)
|
215 |
else:
|
255 |
else:
|
216 |
raise exc.HTTPUnauthorized()
|
256 |
raise exc.HTTPUnauthorized()
|
217 |
|
257 |
|
218 |
def require_authenticated():
|
258 |
def require_authenticated():
|
|
|
259 |
'''
|
|
|
260 |
:raises: HTTPUnauthorized if current user is anonymous
|
|
|
261 |
'''
|
219 |
from allura import model as M
|
262 |
from allura import model as M
|
220 |
if c.user == M.User.anonymous():
|
263 |
if c.user == M.User.anonymous():
|
221 |
raise exc.HTTPUnauthorized()
|
264 |
raise exc.HTTPUnauthorized()
|