Source code for opennode.oms.security.checker

import inspect

from collections import defaultdict
from zope.interface import implements
from zope.security._definitions import thread_local
from zope.security._proxy import _Proxy as Proxy
from zope.security.checker import _available_by_default, getCheckerForInstancesOf, CheckerPublic, TracebackSupplement, getChecker
from zope.security.interfaces import INameBasedChecker, Unauthorized, ForbiddenAttribute
from twisted.internet.defer import Deferred
from twisted.python import log

from opennode.oms.config import get_config
from opennode.oms.security.principals import effective_principals


_available_by_default.extend(['_p_oid', '__providedBy__', '__conform__'])


[docs]class strong_defaultdict(defaultdict): """Python's `defaultdict` type doesn't invoke default factory when called with `get`, we need this subclass to implement a permissive checker."""
[docs] def get(self, name): return self[name]
[docs]def get_interaction(obj): """Extract interaction from a proxied object""" try: checker = getChecker(obj) except TypeError: return None if isinstance(checker, Checker): return checker.interaction else: return thread_local.interaction
[docs]class AuditingPermissionDictionary(dict): marker = object() seen = {} def __getitem__(self, key): return self.get(key)
[docs] def get(self, key, default=None): val = super(AuditingPermissionDictionary, self).get(key, self.marker) if val is self.marker: if key not in _available_by_default: checker_locals = inspect.getouterframes(inspect.currentframe())[1][0].f_locals checker = checker_locals['self'] principals = effective_principals(checker.interaction) seen_key = (key, ','.join(i.id for i in principals), type(checker_locals['object']).__name__) if seen_key not in self.seen: log.msg("Audit: permissive mode; granting attribute=%s, principals=(%s), object=%s" % seen_key, system='security') self.seen[seen_key] = True return CheckerPublic return val
def _select_checker(value, interaction): checker = getCheckerForInstancesOf(type(value)) if not checker: if get_config().getboolean('auth', 'enforce_attribute_rights_definition'): perms = {} else: if get_config().getboolean('auth', 'audit_all_missing_attribute_rights_definitions'): perms = AuditingPermissionDictionary() else: perms = strong_defaultdict(lambda: CheckerPublic) return Checker(perms, perms, interaction=interaction) # handle checkers for "primitive" types like str if type(checker) is object: return checker return Checker(checker.get_permissions, checker.set_permissions, interaction=interaction)
[docs]def proxy_factory(value, interaction): if type(value) is Proxy: return value # ignore proxies on deferreds if isinstance(value, Deferred): return value return Proxy(value, _select_checker(value, interaction))
[docs]class Checker(object): implements(INameBasedChecker) def __init__(self, get_permissions, set_permissions=None, interaction=None): """Create a checker A dictionary must be provided for computing permissions for names. The dictionary get will be called with attribute names and must return a permission id, None, or the special marker, CheckerPublic. If None is returned, then access to the name is forbidden. If CheckerPublic is returned, then access will be granted without checking a permission. An optional setattr dictionary may be provided for checking set attribute access. """ assert isinstance(get_permissions, dict) self.get_permissions = get_permissions if set_permissions is not None: assert isinstance(set_permissions, dict) self.set_permissions = set_permissions self.interaction = interaction
[docs] def permission_id(self, name): 'See INameBasedChecker' return self.get_permissions.get(name)
[docs] def setattr_permission_id(self, name): 'See INameBasedChecker' if self.set_permissions: return self.set_permissions.get(name)
[docs] def check_getattr(self, object, name): 'See IChecker' self.check(object, name)
[docs] def check_setattr(self, object, name): 'See IChecker' # handle defaultdict if self.set_permissions is not None: permission = self.set_permissions.get(name) else: permission = None if permission is not None: if permission is CheckerPublic: return # Public if self.interaction.checkPermission(permission, object): # use local interaction return # allowed else: __traceback_supplement__ = (TracebackSupplement, object) raise Unauthorized(object, name, permission) __traceback_supplement__ = (TracebackSupplement, object) raise ForbiddenAttribute(name, object)
[docs] def check(self, object, name): 'See IChecker' permission = self.get_permissions.get(name) if permission is not None: if permission is CheckerPublic: return # Public if self.interaction.checkPermission(permission, object): # use local interaction return else: __traceback_supplement__ = (TracebackSupplement, object) raise Unauthorized(object, name, permission) elif name in _available_by_default: return if name != '__iter__' or hasattr(object, name): __traceback_supplement__ = (TracebackSupplement, object) raise ForbiddenAttribute(name, object)
[docs] def proxy(self, value): 'See IChecker' if type(value) is Proxy: return value # ignore proxies on deferreds if isinstance(value, Deferred): return value # don't proxy classes if isinstance(value, type): return value checker = getattr(value, '__Security_checker__', None) if checker is None: checker = _select_checker(value, self.interaction) # pass interaction if checker is None or type(checker) is object: return value return Proxy(value, checker)

This Page