Source code for opennode.oms.endpoint.ssh.cmd.completers

import argparse
import os

from grokcore.component import baseclass, context
from twisted.internet import defer
from zope.component import provideSubscriptionAdapter

from opennode.oms.endpoint.ssh.cmd import commands
from opennode.oms.endpoint.ssh.cmd import security
from opennode.oms.endpoint.ssh.cmd.completion import Completer
from opennode.oms.endpoint.ssh.cmdline import GroupDictAction
from opennode.oms.model.model.base import IContainer
from opennode.oms.model.model.bin import ICommand
from opennode.oms.model.model.symlink import Symlink, follow_symlinks
from opennode.oms.zodb import db


[docs]class PositionalCompleter(Completer): """Base class for positional completers.""" baseclass()
[docs] def expected_action(self, parsed, parser): """Currently expected action. It looks at the cardinalities """ for action_group in parser._action_groups: for action in action_group._group_actions: # For every positional argument: if not action.option_strings: actual = 0 maximum = 0 # Count how many of them we have already. values = getattr(parsed, action.dest, []) if values == action.default: # don't count default values values = [] if not isinstance(values, list): values = [values] actual += len(values) # And the maximum number of expected occurencies. if action.nargs is None: maximum += 1 elif isinstance(action.nargs, int): maximum += action.nargs elif action.nargs == argparse.OPTIONAL: maximum += 1 else: maximum = float('inf') if actual < maximum: return action
[docs]class PathCompleter(PositionalCompleter): """Completes a path name.""" baseclass() @db.ro_transact
[docs] def complete(self, token, parsed, parser, **kwargs): # If there is still any positional option to complete: if self.expected_action(parsed, parser): base_path = os.path.dirname(token) container = self.context.traverse(base_path) if IContainer.providedBy(container): def suffix(obj): if IContainer.providedBy(follow_symlinks(obj)): return '/' elif ICommand.providedBy(follow_symlinks(obj)): return '*' elif isinstance(obj, Symlink): return '@' else: return '' def name(obj): return os.path.join(base_path, obj.__name__) return [name(obj) + suffix(obj) for obj in container.listcontent() if name(obj).startswith(token)]
[docs]class CommandCompleter(PathCompleter): """Completes a command.""" context(commands.NoCommand) @defer.inlineCallbacks
[docs] def complete(self, token, parsed, parser, protocol=None, **kwargs): cmds = yield self._scan_search_path(protocol) # TODO: check that only 'executables' and 'directories' are returned. paths = yield super(CommandCompleter, self).complete(token, parsed, parser, **kwargs) defer.returnValue([value for value in list(cmds) + list(set(paths).difference(i + '*' for i in cmds)) if value.startswith(token)])
@db.ro_transact def _scan_search_path(self, protocol): dummy = commands.NoCommand(protocol) cmds = [] for d in protocol.environment['PATH'].split(':'): for i in dummy.traverse(d) or []: if ICommand.providedBy(i): cmds.append(i.cmd.name) return set(cmds)
[docs] def expected_action(self, parsed, parser): return True
[docs]class KeywordPathSubCompleter(PathCompleter): """ Implement a FS path completer which works as subcompleter for the keyworded arguments. """ baseclass() def __init__(self, context, base_path=''): super(KeywordPathSubCompleter, self).__init__(context) self.base_path = base_path @defer.inlineCallbacks
[docs] def complete(self, token, parsed, parser, **kwargs): self.original_context = self.context self.context = self keyword, value_prefix = token.split('=') res = yield super(KeywordPathSubCompleter, self).complete(value_prefix, parsed, parser, **kwargs) defer.returnValue([keyword + '=' + i for i in res])
[docs] def traverse(self, path): if not os.path.isabs(path): path = self.base_path + path return self.original_context.traverse(path)
[docs] def expected_action(self, parsed, parser): return True
[docs]class ArgSwitchCompleter(Completer): """Completes argument switches based on the argparse grammar exposed for a command""" baseclass()
[docs] def complete(self, token, parsed, parser, **kwargs): if token.startswith("-"): return [option for action_group in parser._action_groups for action in action_group._group_actions for option in action.option_strings if option.startswith(token) and not self.option_consumed(action, parsed)]
[docs] def option_consumed(self, action, parsed): # "count" actions can be repeated if action.nargs > 0 or isinstance(action, argparse._CountAction): return False if isinstance(action, GroupDictAction): value = getattr(parsed, action.group, {}).get(action.dest, action.default) else: value = getattr(parsed, action.dest, action.default) return value != action.default
[docs]class KeywordSwitchCompleter(ArgSwitchCompleter): """Completes key=value argument switches based on the argparse grammar exposed for a command. TODO: probably more can be shared with ArgSwitchCompleter.""" baseclass()
[docs] def complete(self, token, parsed, parser, display=False, **kwargs): return [('[%s=]' if display and not action.was_required else '%s=') % (option[1:]) for action_group in parser._action_groups for action in action_group._group_actions for option in action.option_strings if option.startswith('=' + token) and not self.option_consumed(action, parsed)]
[docs]class KeywordValueCompleter(ArgSwitchCompleter): """Completes the `value` part of key=value constructs based on the type of the keyword. Currently works only for args which declare an explicit enumeration.""" baseclass()
[docs] def complete(self, token, parsed, parser, **kwargs): if '=' in token: keyword, value_prefix = token.split('=') action = self.find_action(keyword, parsed, parser) if isinstance(action, GroupDictAction) and action.is_path: subcompleter = KeywordPathSubCompleter(self.context, action.base_path) return subcompleter.complete(token, parsed, parser, **kwargs) if action and action.choices: return [keyword + '=' + value for value in action.choices if value.startswith(value_prefix)] return []
[docs] def find_action(self, keyword, parsed, parser): for action_group in parser._action_groups: for action in action_group._group_actions: if action.dest == keyword: return action
[docs]class PositionalChoiceCompleter(PositionalCompleter): baseclass()
[docs] def complete(self, token, parsed, parser, **kwargs): action = self.expected_action(parsed, parser) if action and action.choices: return [value for value in action.choices if value.startswith(token)]
[docs]class EnvironmentCompleter(Completer): baseclass()
[docs] def complete(self, token, parsed, parser, protocol=None, **kwargs): return [value for value in protocol.environment.keys() if value.startswith(token)] # TODO: move to handler
for command in [commands.ListDirContentsCmd, commands.ChangeDirCmd, commands.CatObjectCmd, commands.SetAttrCmd, commands.RemoveCmd, commands.MoveCmd, commands.FileCmd, commands.EchoCmd, commands.LinkCmd, commands.EditCmd, security.GetAclCmd, security.SetAclCmd, security.PermCheckCmd]: provideSubscriptionAdapter(PathCompleter, adapts=(command, )) for command in [commands.ListDirContentsCmd, commands.ChangeDirCmd, commands.CatObjectCmd, commands.SetAttrCmd, commands.RemoveCmd, commands.QuitCmd, commands.FileCmd, commands.LinkCmd, commands.KillTaskCmd, security.GetAclCmd, security.SetAclCmd, security.PermCheckCmd]: provideSubscriptionAdapter(ArgSwitchCompleter, adapts=(command, )) for command in [commands.SetAttrCmd, commands.CreateObjCmd]: provideSubscriptionAdapter(KeywordSwitchCompleter, adapts=(command, )) for command in [commands.SetAttrCmd, commands.CreateObjCmd]: provideSubscriptionAdapter(KeywordValueCompleter, adapts=(command, )) for command in [commands.HelpCmd, commands.CreateObjCmd]: provideSubscriptionAdapter(PositionalChoiceCompleter, adapts=(command, )) for command in [commands.SetEnvCmd]: provideSubscriptionAdapter(EnvironmentCompleter, adapts=(command, ))

This Page