Child: [7ab6ff] (diff)

Download this file

schema.py    329 lines (289 with data), 11.4 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
import logging
from copy import deepcopy
from datetime import datetime
from formencode.validators import Invalid
import pymongo
from .utils import LazyProperty
log = logging.getLogger(__name__)
class Missing(tuple):
'''Missing is a sentinel used to indicate a missing key or missing keyword
argument (used since None sometimes has meaning)'''
pass
class NoDefault(tuple):
'''NoDefault is a sentinel used to indicate a keyword argument was not
specified. Used since None and Missing mean something else
'''
pass
Missing = Missing()
NoDefault = NoDefault()
class SchemaItem(object):
'''Part of a MongoDB schema. The validate() method is called when a record
is loaded from the DB or saved to it. It should return a "validated" object,
raising an Invalid exception if the object is invalid. If it returns
Missing, the field will be stripped from its parent object.'''
def validate(self, d):
'convert/validate an object or raise an Invalid exception'
raise NotImplemented, 'validate'
@classmethod
def make(cls, field):
'''Build a SchemaItem from a "shorthand" schema (summarized below)
int - int or long
str - string or unicode
float - float, int, or long
bool - boolean value
datetime - datetime.datetime object
None - Anything
[] - Array of Anything objects
[type] - array of objects of type "type"
{ fld: type... } - dict-like object with fields of type "type"
'''
if isinstance(field, list):
if len(field) == 0:
field = Array(Anything())
elif len(field) == 1:
field = Array(field[0])
else:
raise ValueError, 'Array must have 0-1 elements'
elif isinstance(field, dict):
field = Object(field)
elif field is None:
field = Anything()
elif field in SHORTHAND:
field = SHORTHAND[field]
if isinstance(field, type):
field = field()
return field
class Migrate(SchemaItem):
'''Use when migrating from one field type to another
'''
def __init__(self, old, new, migration_function):
self.old, self.new, self.migration_function = (
SchemaItem.make(old),
SchemaItem.make(new),
migration_function)
def validate(self, value):
try:
return self.new.validate(value)
except Invalid:
value = self.old.validate(value)
value = self.migration_function(value)
return self.new.validate(value)
@classmethod
def obj_to_list(cls, key_name, value_name=None):
'''Migration function to go from object { key: value } to
list [ { key_name: key, value_name: value} ]. If value_name is None,
then value must be an object and the result will be a list
[ { key_name: key, **value } ].
'''
from sf.gutenberg.model import base
def migrate_scalars(value):
return [
base.Object({ key_name: k, value_name: v})
for k,v in value.iteritems() ]
def migrate_objects(value):
return [
base.Object(dict(v, **{key_name:k}))
for k,v in value.iteritems() ]
if value_name is None:
return migrate_objects
else:
return migrate_scalars
class Deprecated(SchemaItem):
'''Used for deprecated fields -- they will be stripped from the object.
'''
def validate(self, value):
if value is not Missing:
# log.debug('Stripping deprecated field value %r', value)
pass
return Missing
class FancySchemaItem(SchemaItem):
'''Simple SchemaItem wrapper providing required and if_missing fields.
If the value is present, then the result of the _validate method is returned.
'''
required=False
if_missing=Missing
def __init__(self, required=NoDefault, if_missing=NoDefault):
if required is not NoDefault:
self.required = required
if if_missing is not NoDefault:
self.if_missing = if_missing
def validate(self, value, **kw):
if value is Missing:
if self.required:
raise Invalid('Missing field', value, None)
else:
if self.if_missing is Missing:
return self.if_missing
else:
return deepcopy(self.if_missing) # handle mutable defaults
elif value == self.if_missing:
return value
return self._validate(value, **kw)
def _validate(self, value, **kw): return value
class Anything(FancySchemaItem):
'Anything goes - always passes validation unchanged'
pass
class Object(FancySchemaItem):
'''Used for dict-like validation. Also ensures that the incoming object does
not have any extra keys AND performs polymorphic validation (which means that
ParentClass._validate(...) sometimes will return an instance of ChildClass).
'''
def __init__(self, fields=None, required=False, if_missing=NoDefault):
if fields is None: fields = {}
FancySchemaItem.__init__(self, required, if_missing)
self.fields = dict((name, SchemaItem.make(field))
for name, field in fields.iteritems())
if self.if_missing is NoDefault:
self.if_missing = dict((k, v.validate(Missing))
for k,v in self.fields.iteritems())
self.polymorphic_on = self.polymorphic_registry = None
self.managed_class=None
def validate(self, value, **kw):
try:
result = FancySchemaItem.validate(self, value, **kw)
return result
except Invalid, inv:
if self.managed_class:
inv.msg = '%s:\n %s' % (
self.managed_class,
inv.msg.replace('\n', '\n '))
raise
def _validate(self, d, allow_extra=False, strip_extra=False):
from sf.gutenberg.model import base
cls = self.managed_class
if self.polymorphic_registry:
disc = d.get(self.polymorphic_on, Missing)
if disc is Missing:
disc = d[self.polymorphic_on] = self.managed_class.__name__
else:
cls = self.polymorphic_registry[disc]
if cls is None:
result = base.Object()
elif cls != self.managed_class:
return cls.__mongometa__.schema.validate(
d, allow_extra=allow_extra, strip_extra=strip_extra)
else:
result = cls.__new__(cls)
if not isinstance(d, dict):
raise Invalid('%r is not dict-like' % d, d, None)
error_dict = {}
for name,field in self.fields.iteritems():
if isinstance(name, basestring):
try:
value = field.validate(d.get(name, Missing))
if value is not Missing:
result[name] = value
except Invalid, inv:
error_dict[name] = inv
else:
# Validate all items in d against this field
allow_extra=True
name_validator = SchemaItem.make(name)
for name, value in d.iteritems():
try:
name = name_validator.validate(name)
value = field.validate(value)
if value is not Missing:
result[name] = value
except Invalid, inv:
error_dict[name] = inv
if error_dict:
msg = '\n'.join('%s:%s' % t for t in error_dict.iteritems())
raise Invalid(msg, d, None, error_dict=error_dict)
try:
extra_keys = set(d.iterkeys()) - set(self.fields.iterkeys())
except AttributeError, ae:
raise Invalid(str(ae), d, None)
if extra_keys and not allow_extra:
raise Invalid('Extra keys: %r' % extra_keys, d, None)
if extra_keys and not strip_extra:
for ek in extra_keys:
result[ek] = d[ek]
return result
def extend(self, other):
if other is None: return
self.fields.update(other.fields)
def set_polymorphic(self, field, registry, identity):
self.polymorphic_on = field
self.polymorphic_registry = registry
if self.polymorphic_on:
registry[identity] = self.managed_class
class Array(FancySchemaItem):
'''Array/list validator. All elements of the array must pass validation by a
single field_type (which itself may be Anything, however).
'''
def __init__(self, field_type, **kw):
required = kw.pop('required', False)
if_missing = kw.pop('if_missing', [])
FancySchemaItem.__init__(self, required, if_missing)
self._field_type = field_type
@LazyProperty
def field_type(self):
return SchemaItem.make(self._field_type)
def _validate(self, d):
result = []
error_list = []
has_errors = False
if d is None:
d = []
for value in d:
try:
value = self.field_type.validate(value)
result.append(value)
error_list.append(None)
except Invalid, inv:
error_list.append(inv)
has_errors = True
if has_errors:
msg = '\n'.join(('[%s]:%s' % (i,v))
for i,v in enumerate(error_list)
if v)
raise Invalid(msg, d, None, error_list=error_list)
return result
class Scalar(FancySchemaItem):
'''Validate that a value is NOT an array or dict'''
if_missing=None
def _validate(self, value):
if isinstance(value, (tuple, list, dict)):
raise Invalid('%r is not a scalar' % value, value, None)
return value
class ParticularScalar(Scalar):
'''Validate that a value is NOT an array or dict and is a particular type
'''
type=()
def _validate(self, value):
value = Scalar._validate(self, value)
if value is None: return value
if not isinstance(value, self.type):
raise Invalid('%s is not a %r' % (value, self.type),
value, None)
return value
class OneOf(ParticularScalar):
def __init__(self, *options, **kwargs):
self.options = options
ParticularScalar.__init__(self, **kwargs)
def _validate(self, value):
if value not in self.options:
raise Invalid('%s is not in %r' % (value, self.options),
value, None)
return value
class String(ParticularScalar):
type=basestring
class Int(ParticularScalar):
type=(int,long)
class Float(ParticularScalar):
type=(float,int,long)
class DateTime(ParticularScalar):
type=datetime
class Bool(ParticularScalar):
type=bool
class ObjectId(ParticularScalar):
if_missing=Missing
type=pymongo.bson.ObjectId
# Shorthand for various SchemaItems
SHORTHAND={
int:Int,
str:String,
float:Float,
bool:Bool,
datetime:DateTime}