Parent: [588145] (diff)

Download this file

oid_helper.py    114 lines (106 with data), 5.0 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from pylons import app_globals as g
from pylons import request
from tg import flash, redirect, session, config
from openid.consumer import consumer
from allura import model as M
log = logging.getLogger(__name__)
# openid.oidutil outputs all logging to STDERR unless otherwise configured.
# We follow the openid.oidutil instructions to install our own logging hook.
from openid import oidutil
oidutil.log = log.info
def verify_oid(oid_url, failure_redirect=None, return_to=None,
**kw):
'''Step 1 of OID verification -- redirect to provider site'''
log.info('Trying to login via %s', oid_url)
realm = config.get('openid.realm', 'http://localhost:8080/')
return_to = realm + 'auth/' + return_to
oidconsumer = consumer.Consumer(g.oid_session(), g.oid_store)
try:
req = oidconsumer.begin(oid_url)
except consumer.DiscoveryFailure, ex:
log.exception('Error in openid login')
flash(str(ex[0]), 'error')
redirect(failure_redirect)
if req is None: # pragma no cover
flash('No openid services found for <code>%s</code>' % oid_url,
'error')
redirect(failure_redirect)
if req.shouldSendRedirect():
redirect_url = req.redirectURL(
realm, return_to, False)
log.info('Redirecting to %r', redirect_url)
session.save()
redirect(redirect_url)
else:
return dict(kw, form=req.formMarkup(realm, return_to=return_to))
def process_oid(failure_redirect=None):
oidconsumer = consumer.Consumer(g.oid_session(), g.oid_store)
info = oidconsumer.complete(request.params, request.url)
display_identifier = info.getDisplayIdentifier() or info.identity_url
if info.status == consumer.FAILURE and display_identifier:
# In the case of failure, if info is non-None, it is the
# URL that we were verifying. We include it in the error
# message to help the user figure out what happened.
fmt = "Verification of %s failed: %s"
flash(fmt % (display_identifier, info.message), 'error')
redirect(failure_redirect)
elif info.status == consumer.SUCCESS:
# Success means that the transaction completed without
# error. If info is None, it means that the user cancelled
# the verification.
css_class = 'alert'
# This is a successful verification attempt. If this
# was a real application, we would do our login,
# comment posting, etc. here.
fmt = "You have successfully verified %s as your identity."
message = fmt % display_identifier
if info.endpoint.canonicalID:
# You should authorize i-name users by their canonicalID,
# rather than their more human-friendly identifiers. That
# way their account with you is not compromised if their
# i-name registration expires and is bought by someone else.
message += (" This is an i-name, and its persistent ID is %s"
% info.endpoint.canonicalID )
flash(message, 'info')
elif info.status == consumer.CANCEL:
# cancelled
message = 'Verification cancelled'
flash(message, 'error')
redirect(failure_redirect)
elif info.status == consumer.SETUP_NEEDED:
if info.setup_url:
message = '<a href=%s>Setup needed</a>' % info.setup_url
else:
# This means auth didn't succeed, but you're welcome to try
# non-immediate mode.
message = 'Setup needed'
flash(message, 'error')
redirect(failure_redirect)
else:
# Either we don't understand the code or there is no
# openid_url included with the error. Give a generic
# failure message. The library should supply debug
# information in a log.
message = 'Verification failed.'
flash(message, 'error')
redirect(failure_redirect)
session.save()
oid_obj = M.OpenId.upsert(info.identity_url, display_identifier=display_identifier)
return oid_obj