|
a/Allura/allura/lib/security.py |
|
b/Allura/allura/lib/security.py |
1 |
"""
|
1 |
"""
|
2 |
This module provides the security predicates used in decorating various models.
|
2 |
This module provides the security predicates used in decorating various models.
|
3 |
"""
|
3 |
"""
|
|
|
4 |
import logging
|
4 |
from collections import defaultdict
|
5 |
from collections import defaultdict
|
5 |
|
6 |
|
6 |
from pylons import c, request
|
7 |
from pylons import c, request
|
7 |
from webob import exc
|
8 |
from webob import exc
|
8 |
from itertools import chain
|
9 |
from itertools import chain
|
9 |
from ming.utils import LazyProperty
|
10 |
from ming.utils import LazyProperty
|
|
|
11 |
|
|
|
12 |
from allura.lib import utils
|
|
|
13 |
|
|
|
14 |
log = logging.getLogger(__name__)
|
10 |
|
15 |
|
11 |
class Credentials(object):
|
16 |
class Credentials(object):
|
12 |
'''
|
17 |
'''
|
13 |
Role graph logic & caching
|
18 |
Role graph logic & caching
|
14 |
'''
|
19 |
'''
|
|
... |
|
... |
169 |
|
174 |
|
170 |
@LazyProperty
|
175 |
@LazyProperty
|
171 |
def reaching_ids_set(self):
|
176 |
def reaching_ids_set(self):
|
172 |
return set(self.reaching_ids)
|
177 |
return set(self.reaching_ids)
|
173 |
|
178 |
|
174 |
def has_neighborhood_access(access_type, neighborhood, user=None):
|
179 |
def has_access(obj, permission, user=None, project=None):
|
175 |
'''
|
|
|
176 |
:param str access_type: permission name
|
|
|
177 |
:param Neighborhood neighborhood:
|
|
|
178 |
:param User user: a specific user, else the current user
|
|
|
179 |
:returns: _another function_ which returns True if has access
|
|
|
180 |
'''
|
|
|
181 |
from allura import model as M
|
180 |
from allura import model as M
|
182 |
def result(user=user):
|
181 |
@utils.memoize_on_request(
|
|
|
182 |
'has_access', obj, permission,
|
|
|
183 |
include_func_in_key=False)
|
|
|
184 |
def predicate(obj=obj, user=user, project=project, roles=None):
|
|
|
185 |
if roles is None:
|
183 |
if user is None: user = c.user
|
186 |
if user is None: user = c.user
|
184 |
acl = neighborhood.acl[access_type]
|
|
|
185 |
anon = M.User.anonymous()
|
|
|
186 |
if not acl: return user != anon
|
|
|
187 |
for u in acl:
|
|
|
188 |
if u == anon._id or u == user._id: return True
|
|
|
189 |
return False
|
|
|
190 |
return result
|
|
|
191 |
|
|
|
192 |
def has_project_access(access_type, project=None, user=None):
|
|
|
193 |
'''
|
|
|
194 |
:param str access_type: permission name
|
|
|
195 |
:param Project project: a specific project, else the current project
|
|
|
196 |
:param User user: a specific user, else the current user
|
|
|
197 |
:returns: _another function_ which returns True if has access. Neighborhood admin access will always result in True
|
|
|
198 |
'''
|
|
|
199 |
def result(project=project, user=user):
|
|
|
200 |
if project is None: project = c.project
|
|
|
201 |
if user is None: user = c.user
|
|
|
202 |
assert user, 'c.user should always be at least M.User.anonymous()'
|
187 |
assert user, 'c.user should always be at least M.User.anonymous()'
|
203 |
cred = Credentials.get()
|
188 |
cred = Credentials.get()
|
204 |
for proj in project.parent_iter():
|
189 |
if project is None:
|
205 |
acl = set(proj.acl.get(access_type, []))
|
190 |
if isinstance(obj, M.Neighborhood):
|
206 |
if cred.user_has_any_role(user._id, project.root_project._id, acl): return True
|
191 |
project = M.Project.query.get(
|
207 |
if has_neighborhood_access('admin', project.neighborhood, user)():
|
192 |
neighborhood_id=obj._id,
|
|
|
193 |
shortname='--init--')
|
|
|
194 |
elif isinstance(obj, M.Project):
|
|
|
195 |
project = obj.root_project
|
|
|
196 |
else:
|
|
|
197 |
if project is None: project = c.project.root_project
|
|
|
198 |
roles = cred.user_roles(user_id=user._id, project_id=project._id).reaching_ids
|
|
|
199 |
chainable_roles = []
|
|
|
200 |
for rid in roles:
|
|
|
201 |
for ace in obj.acl:
|
|
|
202 |
if M.ACE.match(ace, rid, permission):
|
|
|
203 |
if ace.access == M.ACE.ALLOW:
|
|
|
204 |
# access is allowed
|
|
|
205 |
# log.info('%s: True', txt)
|
208 |
return True
|
206 |
return True
|
|
|
207 |
else:
|
|
|
208 |
# access is denied for this role
|
|
|
209 |
break
|
|
|
210 |
else:
|
|
|
211 |
# access neither allowed or denied, may chain to parent context
|
|
|
212 |
chainable_roles.append(rid)
|
|
|
213 |
parent = obj.parent_security_context()
|
|
|
214 |
if parent and chainable_roles:
|
|
|
215 |
result = has_access(parent, permission, user=user, project=project)(
|
|
|
216 |
roles=tuple(chainable_roles))
|
|
|
217 |
elif not isinstance(obj, M.Neighborhood):
|
|
|
218 |
result = has_access(project.neighborhood, 'admin', user=user)()
|
|
|
219 |
else:
|
209 |
return False
|
220 |
result = False
|
|
|
221 |
# log.info('%s: %s', txt, result)
|
210 |
return result
|
222 |
return result
|
211 |
|
|
|
212 |
def has_artifact_access(access_type, obj=None, user=None, app=None):
|
|
|
213 |
'''
|
|
|
214 |
Check for artifact- or application-level access
|
|
|
215 |
|
|
|
216 |
:param str access_type: permission name
|
|
|
217 |
:param Artifact obj: if None, application access is checked
|
|
|
218 |
:param Project project: a specific project, else the current project
|
|
|
219 |
:param Application app: a specific user, else the current user
|
|
|
220 |
:returns: _another function_ which returns True if has access. Neighborhood admin access will always result in True
|
|
|
221 |
'''
|
|
|
222 |
def result(user=user, app=app):
|
|
|
223 |
if user is None: user = c.user
|
|
|
224 |
if app is None: app = c.app
|
|
|
225 |
project_id = app.project.root_project._id
|
|
|
226 |
assert user, 'c.user should always be at least M.User.anonymous()'
|
|
|
227 |
cred = Credentials.get()
|
|
|
228 |
acl = set(app.config.acl.get(access_type, []))
|
|
|
229 |
if obj is not None:
|
|
|
230 |
acl |= set(obj.acl.get(access_type, []))
|
|
|
231 |
if cred.user_has_any_role(user._id, project_id, acl): return True
|
|
|
232 |
if has_neighborhood_access('admin', app.project.neighborhood, user)():
|
|
|
233 |
return True
|
|
|
234 |
return False
|
|
|
235 |
return result
|
223 |
return predicate
|
236 |
|
224 |
|
237 |
def require(predicate, message=None):
|
225 |
def require(predicate, message=None):
|
238 |
'''
|
226 |
'''
|
239 |
Example: require(has_artifact_access('read'))
|
227 |
Example: require(has_artifact_access('read'))
|
240 |
|
228 |
|
|
... |
|
... |
253 |
request.environ['error_message'] = message
|
241 |
request.environ['error_message'] = message
|
254 |
raise exc.HTTPForbidden(detail=message)
|
242 |
raise exc.HTTPForbidden(detail=message)
|
255 |
else:
|
243 |
else:
|
256 |
raise exc.HTTPUnauthorized()
|
244 |
raise exc.HTTPUnauthorized()
|
257 |
|
245 |
|
|
|
246 |
def require_access(obj, permission, **kwargs):
|
|
|
247 |
predicate = has_access(obj, permission, **kwargs)
|
|
|
248 |
return require(predicate, message='%s access required' % permission.capitalize())
|
|
|
249 |
|
258 |
def require_authenticated():
|
250 |
def require_authenticated():
|
259 |
'''
|
251 |
'''
|
260 |
:raises: HTTPUnauthorized if current user is anonymous
|
252 |
:raises: HTTPUnauthorized if current user is anonymous
|
261 |
'''
|
253 |
'''
|
262 |
from allura import model as M
|
254 |
from allura import model as M
|
263 |
if c.user == M.User.anonymous():
|
255 |
if c.user == M.User.anonymous():
|
264 |
raise exc.HTTPUnauthorized()
|
256 |
raise exc.HTTPUnauthorized()
|
|
|
257 |
|
|
|
258 |
def simple_grant(acl, role_id, permission):
|
|
|
259 |
from allura.model.types import ACE
|
|
|
260 |
for ace in acl:
|
|
|
261 |
if ace.role_id == role_id and ace.permission == permission: return
|
|
|
262 |
acl.append(ACE.allow(role_id, permission))
|
|
|
263 |
|
|
|
264 |
def simple_revoke(acl, role_id, permission):
|
|
|
265 |
remove = []
|
|
|
266 |
for i, ace in enumerate(acl):
|
|
|
267 |
if ace.role_id == role_id and ace.permission == permission:
|
|
|
268 |
remove.append(i)
|
|
|
269 |
for i in reversed(remove):
|
|
|
270 |
acl.pop(i)
|