GAEでアプリを作成していて、入力値への検証機構が貧弱だと思ったので、作りました。今回作成したのは、検証情報をyamlに記述し、その検証情報に沿ってモデルのプロパティを走査していくという物です。
検証情報yamlの記述方法、使い方ともにSymfonyのValidationManagerに酷似していると思います。なので、Symfonyを使ったことがある人であれば、なんとなくわかりやすいかもしれません。と言っても、SymfonyのValidationManagerほどまだ細部までエラーチェックなどをしていないので、検証情報yamlの記述によっては、予期しないエラーが発生すると思います。ですので、この辺は、もう少しリファクタリングをしないといけないかと思います。
Pythonによるリフレクション部分だったりdb.Modelの扱い方がもし間違っていたら是非教えてください。
今回、関係するソースは下記に載せておきます。
■検証情報yaml
1 2 3 | validate: name: - validator: RequiredValidator |
■ValidationManager
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | class ValidationManager(object): VALIDATOR_PACKAGES = [ 'org.fukata.mapshare.system.utils.validation.common.', 'org.fukata.mapshare.system.utils.validation.extention.', ] valid = None errors = None def __init__(self,validYaml): self.valid = YamlUtils.load(validYaml) def validate(self,model): # initialize self.errors = Errors() for item in self.valid[config.VALIDATE_KEY]: for validInfo in self.valid[config.VALIDATE_KEY][item]: validator = self._getValidatorInstance(validInfo) # check exists attribute props = model.properties() prop = None if props.has_key(item): prop = props[item] else: # don't have a key. raise BadKeyError("Key %s is don't have property." % item) value = getattr(model, prop._attr_name(), None) try: validator.validate(value, prop, model) except ValidationError, e: self.errors.append(prop, e.__str__()) return not bool(self.errors) def _getValidatorInstance(self, validInfo): clazz = ReflectionUtils.getClazz(validInfo['validator'], self.VALIDATOR_PACKAGES); options = validInfo['options'] if validInfo.has_key('options') else {} return clazz(options) |
■ValidationError
1 2 3 4 5 6 | class ValidationError(Exception): def __init__(self, value): self.value = value def __str__(self): return self.value |
■BaseValidator
1 2 3 4 5 6 7 8 | class BaseValidator(object): options = None def __init__(self, options=None): self.options = options def validate(self,value,prop,model): return False |
■RequiredValidator
1 2 3 4 5 6 7 8 9 | class RequiredValidator(BaseValidator): def validate(self,value,prop,model): if value is None: raise ValidationError('Property %s is required' % prop.name) if StringUtils.isNone(value): raise ValidationError('Property %s is required' % prop.name) return True |
■BaseModel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | # skip default validation of db.Property setattr(db.Property, '__set__', lambda self, model_instance, value: setattr(model_instance, self._attr_name(), value)) class BaseModel(db.Model): errors = None def __init__(self, *args, **kwds): super(BaseModel, self).__init__(*args, **kwds) self.errors = Errors() def validate(self,valid_yaml=None): if valid_yaml is None: for prop in self.properties().values(): value = getattr(self, prop._attr_name(), None) try: value = prop.validate(value) setattr(self, prop._attr_name(), value) except db.BadValueError, e: self.errors.append(prop, e.message) return not bool(self.errors) else: manager = ValidationManager(valid_yaml) manager.validate(self) self.errors = manager.errors return not bool(self.errors) |
■Errors
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | class Errors(list): '''container of validation errors ''' def __init__(self): self.map = dict() def append(self, prop, msg=None, error=True): if error: if isinstance(prop, db.Property): super(Errors, self).append(Error(prop, msg)) self.map[prop.name] = len(self) - 1 elif isinstance(prop, (str, unicode)): super(Errors, self).append(Error(prop, msg)) self.map[prop] = len(self) - 1 else: super(Errors, self).append(Error(None, prop)) def clear(self): del self[:] def get(self, index, default=None): return self[index] if index in self else default def tostr(self, sep=u"\n", **ops): msgs=list() for error in self: msgs.append(error.tostr(**ops)) return sep.join(msgs) def __str__(self): return self.tostr() def __contains__(self, name): return name in self.map def __getitem__(self, index): if isinstance(index, (str, unicode)): if index not in self.map: raise IndexError(u"'%s' property has not error." % index) index = self.map[index] return super(Errors, self).__getitem__(index) |
■Error
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | class Error(object): '''container of validation error ''' def __init__(self, prop, msg=None): self.prop = prop self.msg = msg def __str__(self): return self.tostr() def tostr(self, format=u"%(msg)s"): if isinstance(self.prop, db.Property) and format: name = self.prop.verbose_name or self.prop.name msg = re.sub(' '+name+' ', ' %(name)s ', self.msg) format = format % { 'msg': msg, } return _(format) % { 'name': _(self.prop.verbose_name) or _(self.prop.name), } else: return _(self.msg) |
■ReflectionUtils
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | class ReflectionUtils(object): BIG_PATTERN = re.compile('[A-Z]') @classmethod def getClazz(cls,className,basePackageList=[]): packageName = cls.generatePackageName(className) clazz = None if len(basePackageList)>0: for basePackage in basePackageList: packageName = basePackage+packageName try: module = __import__(packageName, globals(), locals(), [className]) clazz = getattr(module, className) except ImportError, e: continue except AttributeError, e: continue else: try: module = __import__(packageName, globals(), locals(), [className]) clazz = getattr(module, className) except ImportError, e: pass except AttributeError, e: pass if clazz is None: raise AttributeError('%s class is not exists.' % className) return clazz @classmethod def newInstance(cls,className,basePackageList=[]): try: clazz = cls.getClazz(className, basePackageList) instance = clazz() except ImportError, e: pass except AttributeError, e: pass if instance is None: raise AttributeError('%s is not exists.' % className) return instance @classmethod def generatePackageName(cls,className): split = cls.BIG_PATTERN.split(className) findall = cls.BIG_PATTERN.findall(className) str_list = [] white=0 for i in range(len(split)): if split[i]: head = findall[i-white].lower() str = head+split[i] str_list.append(str) else: white = white+1 return '_'.join(str_list) |
■YamlUtils
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | class YamlUtils(object): @classmethod def load(cls,path): return yaml.load(cls._get_stream(path)) @classmethod def load_all(cls,path): return yaml.load_all(cls._get_stream(path)) @classmethod def _get_stream(cls,path): fp = open(path, 'r') stream = fp.read() fp.close() return stream |