Download this file

oauth2.py    265 lines (210 with data), 8.8 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
from __future__ import print_function
import base64
import requests
import os
import json
import time
import sys
# Workaround to support both python 2 & 3
import six
import six.moves.urllib.parse as urllibparse
class SpotifyOauthError(Exception):
pass
def _make_authorization_headers(client_id, client_secret):
auth_header = base64.b64encode(six.text_type(client_id + ':' + client_secret).encode('ascii'))
return {'Authorization': 'Basic %s' % auth_header.decode('ascii')}
def is_token_expired(token_info):
now = int(time.time())
return token_info['expires_at'] - now < 60
class SpotifyClientCredentials(object):
OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token'
def __init__(self, client_id=None, client_secret=None, proxies=None):
"""
You can either provid a client_id and client_secret to the
constructor or set SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET
environment variables
"""
if not client_id:
client_id = os.getenv('SPOTIPY_CLIENT_ID')
if not client_secret:
client_secret = os.getenv('SPOTIPY_CLIENT_SECRET')
if not client_id:
raise SpotifyOauthError('No client id')
if not client_secret:
raise SpotifyOauthError('No client secret')
self.client_id = client_id
self.client_secret = client_secret
self.token_info = None
self.proxies = proxies
def get_access_token(self):
"""
If a valid access token is in memory, returns it
Else feches a new token and returns it
"""
if self.token_info and not self.is_token_expired(self.token_info):
return self.token_info['access_token']
token_info = self._request_access_token()
token_info = self._add_custom_values_to_token_info(token_info)
self.token_info = token_info
return self.token_info['access_token']
def _request_access_token(self):
"""Gets client credentials access token """
payload = { 'grant_type': 'client_credentials'}
headers = _make_authorization_headers(self.client_id, self.client_secret)
response = requests.post(self.OAUTH_TOKEN_URL, data=payload,
headers=headers, verify=True, proxies=self.proxies)
if response.status_code != 200:
raise SpotifyOauthError(response.reason)
token_info = response.json()
return token_info
def is_token_expired(self, token_info):
return is_token_expired(token_info)
def _add_custom_values_to_token_info(self, token_info):
"""
Store some values that aren't directly provided by a Web API
response.
"""
token_info['expires_at'] = int(time.time()) + token_info['expires_in']
return token_info
class SpotifyOAuth(object):
'''
Implements Authorization Code Flow for Spotify's OAuth implementation.
'''
OAUTH_AUTHORIZE_URL = 'https://accounts.spotify.com/authorize'
OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token'
def __init__(self, client_id, client_secret, redirect_uri,
state=None, scope=None, cache_path=None, proxies=None):
'''
Creates a SpotifyOAuth object
Parameters:
- client_id - the client id of your app
- client_secret - the client secret of your app
- redirect_uri - the redirect URI of your app
- state - security state
- scope - the desired scope of the request
- cache_path - path to location to save tokens
'''
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.state=state
self.cache_path = cache_path
self.scope=self._normalize_scope(scope)
self.proxies = proxies
def get_cached_token(self):
''' Gets a cached auth token
'''
token_info = None
if self.cache_path:
try:
f = open(self.cache_path)
token_info_string = f.read()
f.close()
token_info = json.loads(token_info_string)
# if scopes don't match, then bail
if 'scope' not in token_info or not self._is_scope_subset(self.scope, token_info['scope']):
return None
if self.is_token_expired(token_info):
token_info = self.refresh_access_token(token_info['refresh_token'])
except IOError:
pass
return token_info
def _save_token_info(self, token_info):
if self.cache_path:
try:
f = open(self.cache_path, 'w')
f.write(json.dumps(token_info))
f.close()
except IOError:
self._warn("couldn't write token cache to " + self.cache_path)
pass
def _is_scope_subset(self, needle_scope, haystack_scope):
needle_scope = set(needle_scope.split()) if needle_scope else set()
haystack_scope = set(haystack_scope.split()) if haystack_scope else set()
return needle_scope <= haystack_scope
def is_token_expired(self, token_info):
return is_token_expired(token_info)
def get_authorize_url(self, state=None, show_dialog=False):
""" Gets the URL to use to authorize this app
"""
payload = {'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.redirect_uri}
if self.scope:
payload['scope'] = self.scope
if state is None:
state = self.state
if state is not None:
payload['state'] = state
if show_dialog:
payload['show_dialog'] = True
urlparams = urllibparse.urlencode(payload)
return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
def parse_response_code(self, url):
""" Parse the response code in the given response url
Parameters:
- url - the response url
"""
try:
return url.split("?code=")[1].split("&")[0]
except IndexError:
return None
def _make_authorization_headers(self):
return _make_authorization_headers(self.client_id, self.client_secret)
def get_access_token(self, code):
""" Gets the access token for the app given the code
Parameters:
- code - the response code
"""
payload = {'redirect_uri': self.redirect_uri,
'code': code,
'grant_type': 'authorization_code'}
if self.scope:
payload['scope'] = self.scope
if self.state:
payload['state'] = self.state
headers = self._make_authorization_headers()
response = requests.post(self.OAUTH_TOKEN_URL, data=payload,
headers=headers, verify=True, proxies=self.proxies)
if response.status_code != 200:
raise SpotifyOauthError(response.reason)
token_info = response.json()
token_info = self._add_custom_values_to_token_info(token_info)
self._save_token_info(token_info)
return token_info
def _normalize_scope(self, scope):
if scope:
scopes = scope.split()
scopes.sort()
return ' '.join(scopes)
else:
return None
def refresh_access_token(self, refresh_token):
payload = { 'refresh_token': refresh_token,
'grant_type': 'refresh_token'}
headers = self._make_authorization_headers()
response = requests.post(self.OAUTH_TOKEN_URL, data=payload,
headers=headers, proxies=self.proxies)
if response.status_code != 200:
if False: # debugging code
print('headers', headers)
print('request', response.url)
self._warn("couldn't refresh token: code:%d reason:%s" \
% (response.status_code, response.reason))
return None
token_info = response.json()
token_info = self._add_custom_values_to_token_info(token_info)
if not 'refresh_token' in token_info:
token_info['refresh_token'] = refresh_token
self._save_token_info(token_info)
return token_info
def _add_custom_values_to_token_info(self, token_info):
'''
Store some values that aren't directly provided by a Web API
response.
'''
token_info['expires_at'] = int(time.time()) + token_info['expires_in']
token_info['scope'] = self.scope
return token_info
def _warn(self, msg):
print('warning:' + msg, file=sys.stderr)