Source code for opennode.oms.endpoint.ssh.terminal

import os
import string

from twisted.conch import recvline
from twisted.python import log

CTRL_A = '\x01'
CTRL_C = '\x03'
CTRL_E = '\x05'
CTRL_D = '\x04'
CTRL_K = '\x0b'
CTRL_Y = '\x19'
CTRL_L = '\x0c'
CTRL_S = '\x13'
CTRL_T = '\x14'
CTRL_R = '\x12'
CTRL_G = '\x07'
CTRL_W = '\x17'
CTRL_X = '\x18'

BLUE = '\x1b[1;34m'
CYAN = '\x1b[1;36m'
GREEN = '\x1b[1;32m'
RED = '\x1b[1;31m'

DARK_RED = '\x1b[0;31m'

RESET_COLOR = '\x1b[0m'

[docs]class InteractiveTerminal(recvline.HistoricRecvLine): """Advanced interactive terminal. Handles history, line editing, killing/yanking, line movement. Prompt handling is delegated to subclasses. """ batch = False
[docs] def connectionMade(self): super(InteractiveTerminal, self).connectionMade() self.enable_colors = True self.history_save_enabled = True self.restore_history() self.kill_ring = None self.keyHandlers[CTRL_A] = self.handle_HOME self.keyHandlers[CTRL_E] = self.handle_END self.keyHandlers[CTRL_D] = self.handle_EOF self.keyHandlers[CTRL_L] = self.handle_FF self.keyHandlers[CTRL_K] = self.handle_KILL_LINE self.keyHandlers[CTRL_Y] = self.handle_YANK self.keyHandlers[CTRL_T] = self.handle_TRANSPOSE self.keyHandlers[CTRL_R] = self.handle_SEARCH self.keyHandlers[CTRL_G] = self.handle_ABORT self.keyHandlers[CTRL_BACKSLASH] = self.handle_QUIT self.altKeyHandlers = {self.terminal.BACKSPACE: self.handle_BACKWARD_KILL_WORD} self.search_mode = False self.found_index = -1 self.search_skip = 0
[docs] def initializeScreen(self): # don't output "insert mode" escape chars if not in interactive mode if not self.batch: self.setInsertMode()
[docs] def enter_full_screen(self): """Switch to the full screen plane, used for editors.""" self.terminal.write('\x1b[?1049h')
[docs] def exit_full_screen(self): """Switch back to the normal plane.""" self.terminal.write('\x1b[?1049l')
[docs] def terminalSize(self, width, height): """Avoid clearing the whole screen""" self.width = width self.height = height self.terminal.termSize.x = width self.terminal.termSize.y = height
[docs] def terminalSizeAfterLogin(self): """Part of terminalSize executed after login""" if not self.batch: self.drawInputLine()
[docs] def set_terminal(self, terminal): self.terminal = terminal terminal.terminalProtocol = self
[docs] def keystrokeReceived(self, keyID, modifier): if self.search_mode: if keyID == '\n' or keyID == '\r': return self.handle_SEARCH_RETURN() if keyID == CTRL_R: return self.handle_SEARCH_NEXT() self.search_skip = 0 if not (keyID == CTRL_G or keyID == self.terminal.BACKSPACE or (isinstance(keyID, str) and keyID in string.printable)): self.handle_EXIT_SEARCH() # Fall through, continue processing if modifier == self.terminal.ALT: m = self.altKeyHandlers.get(keyID) if m is not None: m() return super(InteractiveTerminal, self).keystrokeReceived(keyID, modifier) if self.search_mode: self.handle_UPDATE_SEARCH()
[docs] def restore_history(self): try: if os.path.exists(self.hist_file_name): self.historyLines = [line.strip() for line in open(self.hist_file_name, 'r').readlines()] self.historyPosition = len(self.historyLines) except Exception as e: log.msg("cannot restore history: %s" % e, system='ssh') log.err(e, system='ssh')
[docs] def save_history(self): if not self.history_save_enabled: return try: concat = [line + '\n' for line in self.historyLines] with open(self.hist_file_name, 'w') as f: f.writelines(concat) except Exception as e: log.msg("cannot save history: %s" % e, system='ssh') log.err(e, system='ssh')
[docs] def hist_file_name(self): raise NotImplementedError
[docs] def print_prompt(self): self.terminal.write([])
[docs] def insert_buffer(self, buf): """Inserts some chars in the buffer at the current cursor position.""" lead, rest = self.lineBuffer[0:self.lineBufferIndex], self.lineBuffer[self.lineBufferIndex:] self.lineBuffer = lead + buf + rest self.lineBufferIndex += len(buf)
[docs] def insert_text(self, text): """Inserts some text at the current cursor position and renders it.""" if isinstance(text, unicode): text = text.encode('utf-8') self.terminal.write(text) self.insert_buffer(list(text))
[docs] def handle_EOF(self): """Exits the shell on CTRL-D""" if self.lineBuffer: self.terminal.write('\a') else: self.handle_QUIT()
[docs] def handle_FF(self): """Handles a 'form feed' byte - generally used to request a screen refresh/redraw.""" self.terminal.eraseDisplay() self.terminal.cursorHome() self.drawInputLine()
[docs] def handle_KILL_LINE(self): """Deletes the rest of the line (from the cursor right), and keeps the content in the kill ring for future pastes. """ self.terminal.eraseToLineEnd() self.kill_ring = self.lineBuffer[self.lineBufferIndex:] self.lineBuffer = self.lineBuffer[0:self.lineBufferIndex]
[docs] def handle_YANK(self): """Pastes the content of the kill ring.""" if self.kill_ring: self.terminal.write("".join(self.kill_ring)) self.insert_buffer(self.kill_ring)
[docs] def handle_BACKWARD_KILL_WORD(self): """Provides the ALT-BACKSPACE behaviour like in emacs/bash.""" line = ''.join(self.lineBuffer[:self.lineBufferIndex]) # remove trailing spaces back_positions = len(line) - len(line.rstrip()) line = line.rstrip() # remove everthing until the previous space (not included) back_positions += len(line) - (' ' + line).rfind(' ') self.terminal.cursorBackward(back_positions) self.terminal.deleteCharacter(back_positions) # XXX: The index value should be extracted to a local variable for readability and DRY self.kill_ring = self.lineBuffer[self.lineBufferIndex - back_positions: self.lineBufferIndex] del self.lineBuffer[self.lineBufferIndex - back_positions: self.lineBufferIndex] self.lineBufferIndex -= back_positions
[docs] def handle_TRANSPOSE(self): """Provides the CTRL-T behaviour like on emacs/bash.""" if self.lineBufferIndex == 0: self.terminal.cursorForward() self.lineBufferIndex += 1 if self.lineBufferIndex == len(self.lineBuffer): self.terminal.cursorBackward() self.lineBufferIndex -= 1 if self.lineBufferIndex > 0 and self.lineBufferIndex < len(self.lineBuffer) and len(self.lineBuffer) > 1: l, r = self.lineBuffer[self.lineBufferIndex - 1], self.lineBuffer[self.lineBufferIndex] self.lineBuffer[self.lineBufferIndex - 1] = r self.lineBuffer[self.lineBufferIndex] = l self.terminal.cursorBackward() self.terminal.deleteCharacter(2) self.terminal.write(r + l) self.lineBufferIndex += 1
[docs] def search_ps(self): return "bck-i-search: "
[docs] def handle_SEARCH_NEXT(self): self.search_skip += 1 self.handle_UPDATE_SEARCH(self.search_skip)
[docs] def handle_SEARCH_RETURN(self): self.search_mode = False self.terminal.write('\n') if self.found_index < 0: self.print_prompt() return self.lineBuffer = [] self.lineBufferIndex = 0 # record it in the history self.historyLines.append(self.historyLines[self.found_index]) self.historyPosition = len(self.historyLines) self.lineReceived(self.historyLines[self.found_index])
[docs] def handle_ABORT(self): """Abort a search.""" if self.search_mode: self.search_mode = False self.terminal.cursorBackward(self.lineBufferIndex + len(self.search_ps)) self.terminal.eraseToLineEnd() self.terminal.cursorUp(1) self.terminal.eraseToLineEnd() self.lineBuffer = [] self.lineBufferIndex = 0 self.drawInputLine()
[docs] def handle_QUIT(self): """Just copied from conch Manhole, no idea why it would be useful to differentiate it from CTRL-D, but I guess it's here for a reason. """ self.close_connection()
[docs] def colorize(self, color, text): return (color + text + RESET_COLOR if color and self.enable_colors else text)
[docs] def close_connection(self): """Closes the connection and saves history.""" # we have to disable insert mode, because bash etc don't use # the terminal builtin insert mode but they emulate it self.setTypeoverMode() self.save_history() # avoid performing a terminal reset self.terminal.transport.loseConnection()

This Page