Source code for opennode.oms.model.model.proc

from __future__ import absolute_import

import time
from collections import OrderedDict

from grokcore.component import querySubscriptions, Adapter, context, subscribe, baseclass
from twisted.python import log
from zope import schema
from zope.component import provideSubscriptionAdapter
from zope.interface import Interface, implements, alsoProvides

from .base import ReadonlyContainer
from .actions import ActionsContainerExtension, Action, action
from opennode.oms.util import Singleton
from opennode.oms.config import get_config
from opennode.oms.core import IAfterApplicationInitializedEvent


class ITask(Interface):
    """Executable command object."""
    cmdline = schema.TextLine(title=u"command line", description=u"Command line", readonly=True, required=False)
    uptime = schema.Int(title=u"uptime", description=u"Task uptime in seconds", readonly=True, required=False)
    ptid = schema.TextLine(title=u"parent task", description=u"Parent task", readonly=True, required=False)
    stdout = schema.TextLine(title=u"stdout", description=u"Standard output", readonly=True, required=False)

    def signal(name):
        """Process a signal"""


class ISuspendableTask(Interface):
    """A task which can be suspendedD."""

    def stop():
        pass

    def cont():
        pass


class IProcess(Interface):
    def run():
        """Returns a deferred representing the background process execution"""

    def signal_handler(name):
        """Process a signal"""


[docs]class DaemonProcess(object): def __init__(self): config = get_config() self.paused = not config.getboolean('daemons', self.__name__, True)
[docs] def signal_handler(self, name): if name == 'STOP': log.msg("Stopping %s" % self.__name__, system='proc') self.paused = True elif name == 'CONT': log.msg("Continuing %s" % self.__name__, system='proc') self.paused = False
class IProcessStateRenderer(Interface): def __str__(): pass
[docs]class DaemonStateRenderer(Adapter): implements(IProcessStateRenderer) context(DaemonProcess) def __str__(self): return "[%s%s]" % (self.context.__name__, ': paused' if self.context.paused else '')
[docs]class Task(ReadonlyContainer): implements(ITask) def __init__(self, name, parent, subject, deferred, cmdline, ptid, signal_handler=None, principal=None, write_buffer=None): self.__name__ = name self.__parent__ = parent self.subject = subject self.deferred = deferred self.cmdline = cmdline self.timestamp = time.time() self.ptid = ptid self.signal_handler = signal_handler self.principal = principal self.write_buffer = write_buffer # XXX: Workaround to handle ON-425 # Refactor with adapters handling each specific signal if self.signal_handler: alsoProvides(self, ISuspendableTask) @property
[docs] def uptime(self): return time.time() - self.timestamp
@property
[docs] def nicknames(self): return [self.cmdline, ]
@property
[docs] def stdout(self): return self.write_buffer
[docs] def signal(self, name): if self.signal_handler: self.signal_handler(name)
[docs]class Proc(ReadonlyContainer): __metaclass__ = Singleton __contains__ = ITask __name__ = 'proc' def __init__(self): super(Proc, self).__init__() # represents the init process, just for fun. self.tasks = OrderedDict({'1': Task('1', self, self, None, '/bin/init', '0')}) self.dead_tasks = OrderedDict() self.next_id = 1
[docs] def start_daemons(self): for i in querySubscriptions(self, IProcess): self.spawn(i)
[docs] def spawn(self, process): self._register(process.run(), process, IProcessStateRenderer(process), signal_handler=process.signal_handler)
def __str__(self): return 'Tasks'
[docs] def content(self): res = dict(self.tasks) res['completed'] = CompletedProc(self, self.dead_tasks) return res
@classmethod
[docs] def register(cls, deferred, subject, cmdline=None, ptid='1', principal=None, write_buffer=None): pid = Proc()._register(deferred, subject, cmdline, ptid, principal=principal, write_buffer=write_buffer) log.msg('Registered as process %s: %s' % (pid, cmdline), system='proc') return pid
def _register(self, deferred, subject, cmdline, ptid='1', signal_handler=None, principal=None, write_buffer=None): self.next_id += 1 new_id = str(self.next_id) self.tasks[new_id] = Task(new_id, self, subject, deferred, cmdline, ptid, signal_handler, principal, write_buffer) if deferred: deferred.addBoth(self._unregister, new_id) return new_id @classmethod
[docs] def unregister(cls, id_): self = Proc() self.dead_tasks[id_] = self.tasks[id_] del self.tasks[id_] log.msg('Unregistered process %s: %s' % (id_, self.dead_tasks[id_].cmdline), system='proc')
@classmethod def _unregister(cls, res, id_): cls.unregister(id_) return res
[docs]class CompletedProc(ReadonlyContainer): __name__ = 'completed' def __init__(self, parent, tasks): self.__parent__ = parent self.tasks = tasks
[docs] def content(self): return self.tasks
[docs]class SignalAction(Action): """Send a given signal""" baseclass()
[docs] def execute(self, cmd, args): from opennode.oms.zodb import db @db.ro_transact def execute(): self.context.signal(self.__signal__) execute()
[docs]class StopTaskAction(SignalAction): """Send STOP signal""" context(ISuspendableTask) action('stop') __signal__ = 'STOP'
[docs]class ContinueTaskAction(SignalAction): """Send CONT signal""" context(ISuspendableTask) action('continue') __signal__ = 'CONT'
[docs]class TerminateTaskAction(SignalAction): """Send TERM signal""" context(ITask) action('terminate') __signal__ = 'TERM'
provideSubscriptionAdapter(ActionsContainerExtension, adapts=(Task, )) @subscribe(IAfterApplicationInitializedEvent)
[docs]def start_daemons(event): try: Proc().start_daemons() except Exception as e: log.msg("Got exception while starting daemons", system='proc') if get_config().get_boolean('debug', 'print_exceptions'): log.err(e, system='proc')

This Page