Source code for opennode.oms.backend.indexer
from collections import deque
from twisted.internet import defer
from twisted.python import log
from zope.component import provideSubscriptionAdapter
from zope.interface import implements
from zope.keyreference.interfaces import NotYet
from opennode.oms.endpoint.ssh.detached import DetachedProtocol
from opennode.oms.model.model.proc import IProcess, Proc, DaemonProcess
from opennode.oms.model.model.search import ReindexAction
from opennode.oms.util import subscription_factory, async_sleep
from opennode.oms.zodb import db
from opennode.oms.model.form import IModelDeletedEvent
from opennode.oms.model.traversal import canonical_path, traverse_path
[docs]class BlackHoleQueue(object):
[docs] def append(self, val):
pass
[docs]class IndexerDaemonProcess(DaemonProcess):
implements(IProcess)
__name__ = "indexer"
queue = deque()
black_hole = BlackHoleQueue()
@defer.inlineCallbacks
[docs] def run(self):
while True:
try:
if not self.paused:
if IndexerDaemonProcess.queue == self.black_hole:
IndexerDaemonProcess.queue = deque()
self.reindex()
yield self.process()
else:
IndexerDaemonProcess.queue = self.black_hole
except Exception:
import traceback
traceback.print_exc()
pass
yield async_sleep(1)
@classmethod
[docs] def enqueue(cls, model, event):
cls.queue.append((model, event))
@defer.inlineCallbacks
[docs] def process(self):
if self.queue:
yield self._process()
@db.transact
def _process(self):
log.msg("indexing a batch of objects", system="indexer")
searcher = db.get_root()['oms_root']['search']
def currently_queued():
while self.queue:
yield self.queue.popleft()
for model, event in currently_queued():
self.index(searcher, model, event)
log.msg("done", system="indexer")
[docs] def index(self, searcher, model, event):
if not self.try_index(searcher, model, event):
log.msg("cannot (un)index %s %s" % (model, type(event).__name__), system="indexer")
[docs] def try_index(self, searcher, model, event):
path = canonical_path(model)
op = 'un' if IModelDeletedEvent.providedBy(event) else ''
log.msg("%sindexing %s %s" % (op, path, type(event).__name__), system="indexer")
objs, unresolved_path = traverse_path(db.get_root()['oms_root'], path)
if unresolved_path and not IModelDeletedEvent.providedBy(event):
return False
obj = objs[-1]
try:
if IModelDeletedEvent.providedBy(event):
searcher.unindex_object(obj)
else:
searcher._index_object(obj)
except NotYet:
return False
log.msg("%sindexed %s %s" % (op, path, type(event).__name__), system="indexer")
return True
[docs] def reindex(self):
ReindexAction(None).execute(DetachedProtocol(), object())
provideSubscriptionAdapter(subscription_factory(IndexerDaemonProcess), adapts=(Proc,))