import inspect
import zope.schema
from zope.component import handle
from zope.interface import Interface, implements
from zope.schema.interfaces import IFromUnicode, WrongType, RequiredMissing
from zope.security.proxy import removeSecurityProxy
from opennode.oms.model.schema import get_schemas, get_schema_fields
from opennode.oms.util import query_adapter_for_class
__all__ = ['ApplyRawData']
class UnknownAttribute(zope.schema.ValidationError):
"""Unknown attribute"""
class NoSchemaFound(zope.schema.ValidationError):
"""No schema found for object"""
class IModelModifiedEvent(Interface):
"""Model was modified"""
class IModelCreatedEvent(Interface):
"""Model was created"""
class IModelMovedEvent(Interface):
"""Model was moved"""
class IModelDeletedEvent(Interface):
"""Model was deleted"""
class ModelModifiedEvent(object):
implements(IModelModifiedEvent)
def __init__(self, original, modified):
self.original = original
self.modified = modified
class ModelCreatedEvent(object):
implements(IModelCreatedEvent)
def __init__(self, container):
self.container = container
class ModelMovedEvent(object):
implements(IModelMovedEvent)
def __init__(self, fromContainer, toContainer):
self.fromContainer = fromContainer
self.toContainer = toContainer
class ModelDeletedEvent(object):
implements(IModelDeletedEvent)
def __init__(self, container):
self.container = container
[docs]class ApplyRawData(object):
def __init__(self, data, obj=None, model=None, marker=None):
assert isinstance(data, dict)
assert (obj or model) and not (obj and model), \
"One of either obj or model needs to be provided, but not both"
self.schemas = list(get_schemas(obj or model, marker=marker))
self.fields = list(get_schema_fields(obj or model, marker=marker))
self.data = data
self.obj = obj
self.model = model
@property
[docs] def errors(self):
if hasattr(self, '_errors'):
return self._errors
self.tmp_obj = tmp_obj = TmpObj(self.obj, self.model)
raw_data = dict(self.data)
errors = []
if self.fields:
for name, field, schema in self.fields:
if name not in raw_data:
continue
field = field.bind(self.obj or self.model)
raw_value = raw_data.pop(name)
if isinstance(raw_value, str):
raw_value = raw_value.decode('utf8')
# We don't want to accidentally swallow any adaptation TypeErrors from here:
from_unicode = IFromUnicode(field)
try:
if not raw_value and field.required:
raise RequiredMissing(name)
try:
value = from_unicode.fromUnicode(raw_value)
except (ValueError, TypeError):
raise WrongType(name)
# TODO: make this more descriptive as to which validation failed, where was it defined etc.
except zope.schema.ValidationError as exc:
errors.append((name, exc))
else:
setattr(self.adapted_tmp_obj(tmp_obj, schema), name, value)
if raw_data:
for key in raw_data:
errors.append((key, UnknownAttribute()))
if not errors:
for schema in self.schemas:
# XXX: We should not be adapting TmpObj's... I've
# fixed the issue for now with the `if` but nobody
# knows what other issues this might cause in the
# future, or what other (hidden) issues adapting
# TmpObj's will cause.
adapted = self.adapted_tmp_obj(tmp_obj, schema)
errors.extend(zope.schema.getValidationErrors(schema, adapted))
else:
errors.append((None, NoSchemaFound()))
self._errors = errors
return errors
[docs] def adapted_tmp_obj(self, tmp_obj, schema):
adapter_cls = query_adapter_for_class(self.model or type(removeSecurityProxy(self.obj)), schema)
return adapter_cls(tmp_obj) if adapter_cls else tmp_obj
[docs] def create(self):
assert self.model, "model needs to be provided to create new objects"
assert not self.errors, "There should be no validation errors"
if not inspect.ismethod(self.model.__init__):
argnames = []
else:
argnames = inspect.getargspec(self.model.__init__).args[1:]
kwargs, rest = {}, {}
for name, value in self.data.items():
(kwargs if name in argnames else rest)[name] = getattr(self.tmp_obj, name)
for argname in argnames:
if argname not in kwargs:
for name, field, _ in self.fields:
if name == argname and field.default:
kwargs[name] = field.default
obj = self.model(**kwargs)
for name, value in rest.items():
setattr(obj, name, value)
return obj
[docs] def apply(self):
assert self.obj, "obj needs to be provided to apply changes to an existing object"
assert not self.errors, "There should be no validation errors"
self.tmp_obj.apply()
[docs] def error_dict(self):
ret = {}
for key, error in self.errors:
msg = error.doc().encode('utf8')
ret[key if key is not None else '__all__'] = msg
return ret
[docs] def write_errors(self, to):
for key, msg in self.error_dict().items():
to.write("%s: %s\n" % (key, msg) if key is not '__all__' else "%s\n" % msg)
class TmpObj(object):
"""A proxy for storing and remembering temporary modifications to
objects, and later applying them to the wrapped object.
"""
__allowed_attrs__ = ['__markers__']
def __init__(self, wrapped, cls=None):
self.__dict__['obj'] = wrapped
self.__dict__['cls'] = cls
self.__dict__['modified_attrs'] = {}
def __getattr__(self, name):
if name.startswith('__') and name not in TmpObj.__allowed_attrs__:
raise AttributeError(name)
if name in self.__dict__['modified_attrs']:
return self.__dict__['modified_attrs'][name]
else:
obj = self.__dict__['obj']
if not obj:
# try to access class methods
cls_method = getattr(self.__dict__['cls'], name, None)
return cls_method if inspect.isroutine(cls_method) else None
return getattr(obj, name) if obj else None
def __setattr__(self, name, value):
if getattr(self, name, object()) != value:
self.__dict__['modified_attrs'][name] = value
def apply(self):
original_attrs = {}
for name, value in self.__dict__['modified_attrs'].items():
original_attrs[name] = getattr(self.__dict__['obj'], name, None)
setattr(self.__dict__['obj'], name, value)
# properties could alter the effective value of what we set
# so we need to read back the actual values from the object
updated = {}
for k in self.__dict__['modified_attrs'].keys():
new_value = getattr(self.__dict__['obj'], k)
if new_value != original_attrs[k]:
updated[k] = getattr(self.__dict__['obj'], k)
# we emit modification events only for objects
# that have been already added to a container (ON-412)
if updated and self.__dict__['obj'].__parent__:
handle(self.__dict__['obj'], ModelModifiedEvent(original_attrs, updated))
def alsoProvides(obj, interface):
form = ApplyRawData({'features': '+' + interface.__name__}, obj)
if not form.errors:
form.apply()
else:
raise Exception("Cannot set marker interface %s; errors: %s" % (interface, form.errors))
def noLongerProvides(obj, interface):
form = ApplyRawData({'features': '-' + interface.__name__}, obj)
if not form.errors:
form.apply()
else:
raise Exception("Cannot remove marker interface %s; erros: %s" % (interface, form.errors))