/
auth.py
379 lines (293 loc) · 11.2 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
"""
Sample SQLAlchemy-powered model definition for the repoze.what SQL plugin.
This model definition has been taken from a quickstarted TurboGears 2 project,
but it's absolutely independent of TurboGears.
"""
import bcrypt
import hashlib
import logging
import random
from datetime import (
datetime,
timedelta,
)
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import Unicode
from sqlalchemy import Boolean
from sqlalchemy.orm import relation
from sqlalchemy.orm import synonym
from bookie.models import Base
from bookie.models import DBSession
from bookie.models.social import BaseConnection
LOG = logging.getLogger(__name__)
GROUPS = ['admin', 'user']
ACTIVATION_AGE = timedelta(days=3)
NON_ACTIVATION_AGE = timedelta(days=30)
def get_random_word(wordLen):
word = ''
for i in xrange(wordLen):
word += random.choice(('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs'
'tuvwxyz0123456789/&='))
return word
class ActivationMgr(object):
@staticmethod
def count():
"""Count how many activations are in the system."""
return Activation.query.count()
@staticmethod
def get_user(username, code):
"""Get the user for this code"""
qry = Activation.query.\
filter(Activation.code == code).\
filter(User.username == username)
res = qry.first()
if res is not None:
return res.user
else:
return None
@staticmethod
def activate_user(username, code, new_pass):
"""Given this code get the user with this code make sure they exist"""
qry = Activation.query.\
filter(Activation.code == code).\
filter(User.username == username)
res = qry.first()
if UserMgr.acceptable_password(new_pass) and res is not None:
user = res.user
user.activated = True
user.password = new_pass
res.activate()
LOG.debug(dict(user))
return True
else:
return None
class Activation(Base):
"""Handle activations/password reset items for users
The id is the user's id. Each user can only have one valid activation in
process at a time
The code should be a random hash that is valid only one time
After that hash is used to access the site it'll be removed
The created by is a system: new user registration, password reset, forgot
password, etc.
"""
__tablename__ = u'activations'
id = Column(Integer, ForeignKey('users.id'), primary_key=True)
code = Column(Unicode(60))
valid_until = Column(
DateTime,
default=lambda: datetime.utcnow + ACTIVATION_AGE)
created_by = Column('created_by', Unicode(255))
def __init__(self, created_system):
"""Create a new activation"""
self.code = Activation._gen_activation_hash()
self.created_by = created_system
self.valid_until = datetime.utcnow() + ACTIVATION_AGE
@staticmethod
def _gen_activation_hash():
"""Generate a random activation hash for this user account"""
# for now just cheat and generate an api key, that'll work for now
return User.gen_api_key()
def activate(self):
"""Remove this activation"""
DBSession.delete(self)
class UserMgr(object):
""" Wrapper for static/combined operations of User object"""
@staticmethod
def count():
"""Number of users in the system."""
return User.query.count()
@staticmethod
def non_activated_account(delete=False):
"""Get a list of user accounts which are not verified since
30 days of signup"""
test_date = datetime.utcnow() - NON_ACTIVATION_AGE
query = DBSession.query(Activation.id).\
filter(Activation.valid_until < test_date).\
subquery(name="query")
qry = DBSession.query(User).\
filter(User.activated.is_(False)).\
filter(User.last_login.is_(None)).\
filter(User.id.in_(query))
# Delete the non activated accounts only if it is asked to.
if delete:
for user in qry.all():
DBSession.delete(user)
# If the non activated accounts are not asked to be deleted,
# return their details.
else:
return qry.all()
@staticmethod
def get_list(active=None, order=None, limit=None):
"""Get a list of all of the user accounts"""
user_query = User.query.order_by(User.username)
if active is not None:
user_query = user_query.filter(User.activated == active)
if order:
user_query = user_query.order_by(getattr(User, order))
else:
user_query = user_query.order_by(User.signup)
if limit:
user_query = user_query.limit(limit)
return user_query.all()
@staticmethod
def get(user_id=None, username=None, email=None, api_key=None):
"""Get the user instance for this information
:param user_id: integer id of the user in db
:param username: string user's name
:param inactive: default to only get activated true
"""
user_query = User.query
if username is not None:
return user_query.filter(User.username == username).first()
if user_id is not None:
return user_query.filter(User.id == user_id).first()
if email is not None:
return user_query.filter(User.email == email).first()
if api_key is not None:
return user_query.filter(User.api_key == api_key).first()
return None
@staticmethod
def auth_groupfinder(userid, request):
"""Pyramid wants to know what groups a user is in
We need to pull this from the User object that we've stashed in the
request object
"""
user = request.user
if user is not None:
if user.is_admin:
return 'admin'
else:
return 'user'
return None
@staticmethod
def acceptable_password(password):
"""Verify that the password is acceptable
Basically not empty, has more than 3 chars...
"""
LOG.debug("PASS")
LOG.debug(password)
if password is not None:
LOG.debug(len(password))
if password is None:
return False
if len(password) < 3:
return False
return True
@staticmethod
def signup_user(email, signup_method):
# Get this invite party started, create a new user acct.
new_user = User()
new_user.email = email.lower()
new_user.username = email.lower()
new_user.invited_by = signup_method
new_user.api_key = User.gen_api_key()
# they need to be deactivated
new_user.reactivate(u'invite')
# decrement the invite counter
DBSession.add(new_user)
return new_user
class User(Base):
"""Basic User def"""
__tablename__ = 'users'
id = Column(Integer, autoincrement=True, primary_key=True)
username = Column(Unicode(255), unique=True)
name = Column(Unicode(255))
_password = Column('password', Unicode(60))
email = Column(Unicode(255), unique=True)
activated = Column(Boolean, default=False)
is_admin = Column(Boolean, default=False)
last_login = Column(DateTime)
signup = Column(DateTime, default=datetime.utcnow)
api_key = Column(Unicode(12))
invite_ct = Column(Integer, default=0)
invited_by = Column('invited_by', Unicode(255))
BaseConnection = relation(BaseConnection,
backref="users")
activation = relation(
Activation,
cascade="all, delete, delete-orphan",
uselist=False,
backref='user')
def __init__(self):
"""By default a user starts out deactivated"""
self.activation = Activation(u'signup')
self.activated = False
def _set_password(self, password):
"""Hash password on the fly."""
hashed_password = password
if isinstance(password, unicode):
password_8bit = password.encode('UTF-8')
else:
password_8bit = password
# Hash a password for the first time, with a randomly-generated salt
salt = bcrypt.gensalt(10)
hashed_password = bcrypt.hashpw(password_8bit, salt)
# Make sure the hased password is an UTF-8 object at the end of the
# process because SQLAlchemy _wants_ a unicode object for Unicode
# fields
if not isinstance(hashed_password, unicode):
hashed_password = hashed_password.decode('UTF-8')
self._password = hashed_password
def _get_password(self):
"""Return the password hashed"""
return self._password
password = synonym('_password', descriptor=property(_get_password,
_set_password))
def validate_password(self, password):
"""
Check the password against existing credentials.
:param password: the password that was provided by the user to
try and authenticate. This is the clear text version that we will
need to match against the hashed one in the database.
:type password: unicode object.
:return: Whether the password is valid.
"""
# the password might be null as in the case of morpace employees
# logging in via ldap. We check for that here and return them as an
# incorrect login
if self.password:
salt = self.password[:29]
return self.password == bcrypt.hashpw(password, salt)
else:
return False
def safe_data(self):
"""Return safe data to be sharing around"""
hide = ['_password', 'password', 'is_admin', 'api_key']
return dict(
[(k, v) for k, v in dict(self).iteritems() if k not in hide]
)
def deactivate(self):
"""In case we need to disable the login"""
self.activated = False
def reactivate(self, creator):
"""Put the account through the reactivation process
This can come about via a signup or from forgotten password link
"""
# if we reactivate then reinit this
self.activation = Activation(creator)
self.activated = False
def has_invites(self):
"""Does the user have any invitations left"""
return self.invite_ct > 0
def invite(self, email):
"""Invite a user"""
if not self.has_invites():
return False
if not email:
raise ValueError('You must supply an email address to invite')
else:
# get this invite party started, create a new useracct
new_user = UserMgr.signup_user(email, self.username)
# decrement the invite counter
self.invite_ct = self.invite_ct - 1
DBSession.add(new_user)
return new_user
@staticmethod
def gen_api_key():
"""Generate a 12 char api key for the user to use"""
m = hashlib.sha256()
m.update(get_random_word(12))
return unicode(m.hexdigest()[:12])