import os
from contextlib import closing
from copy import copy
from twisted.internet import defer
from twisted.conch.insults import insults
from twisted.python import log
from opennode.oms.util import exception_logger, find_nth
from opennode.oms.endpoint.ssh.terminal import CTRL_A, CTRL_C, CTRL_E, CTRL_K, CTRL_X, CTRL_Y, CTRL_S, CTRL_G, CTRL_L, CTRL_W, DARK_RED, RESET_COLOR
[docs]class Editor(object):
MODELINE_HEIGHT = 2
COMMENT_COLOR = DARK_RED
def __init__(self, parent):
self.parent = parent
self.terminal = parent.terminal
self.buffer = ""
self.old_dirty = False
self.dirty = False
self.status_line = ""
self.has_quit = False
self.kill_ring = ""
self.last_handler = None
self.prefix = None
self.prefixes = [CTRL_X]
self.key_handlers = {(CTRL_X, CTRL_S): self.handle_SAVE,
(CTRL_X, CTRL_W): self.handle_DUMP_BUFFER,
(CTRL_X, CTRL_C): self.handle_EXIT,
(CTRL_X, '='): self.handle_WHAT_CURSOR_POSITION,
CTRL_A: self.handle_BEGIN_LINE,
CTRL_E: self.handle_END_LINE,
CTRL_K: self.handle_KILL_LINE,
CTRL_L: self.handle_REDRAW,
CTRL_Y: self.handle_YANK,
self.terminal.LEFT_ARROW: self.handle_LEFT,
self.terminal.RIGHT_ARROW: self.handle_RIGHT,
self.terminal.UP_ARROW: self.handle_UP,
self.terminal.DOWN_ARROW: self.handle_DOWN,
self.terminal.BACKSPACE: self.handle_BACKSPACE}
@defer.inlineCallbacks
[docs] def start(self, file):
self.tracing = False
self.buffer = file
self.saved = copy(self.buffer)
# position in buffer
self.pos = 0
# current line in buffer
self.current_line = 0
self.current_column = 0
self.lines = self.buffer.count('\n') + 1
self.parent.sub_protocol = self
self.parent.enter_full_screen()
self.terminal.write('\x1b[?7l') # disable auto wrap
self.reset_scrolling_region()
self.parent.setInsertMode()
self.terminal.termSize.y -= 2
if self.tracing:
self.enable_tracing()
try:
self.redraw_buffer()
self.terminal.cursorHome()
if self.tracing:
self.replay_recorded()
yield self.wait_for_exit()
finally:
self.terminal.termSize.y += 2
self.terminal.write('\x1b[?7h') # re-enable auto wrap
self.parent.exit_full_screen()
if self.tracing:
self.finalize_tracing()
defer.returnValue(self.saved)
[docs] def replay_recorded(self):
"""Debug utility useful for repeating difficult to reproduce sessions that show some bug"""
replay_file = '/tmp/key.sim'
if os.path.exists(replay_file):
arrows = {'[UP_ARROW]': self.terminal.UP_ARROW,
'[DOWN_ARROW]': self.terminal.DOWN_ARROW,
'[LEFT_ARROW]': self.terminal.LEFT_ARROW,
'[RIGHT_ARROW]': self.terminal.RIGHT_ARROW,
}
for i in open(replay_file):
if '#' not in i:
self.keystrokeReceived(arrows[i.strip()], None)
[docs] def enable_tracing(self):
self.tracing = True
self.key_log = open('/tmp/key.log', 'w')
[docs] def finalize_tracing(self):
self.key_log.close()
[docs] def trace_keystroke(self, keyID, mod):
if self.tracing:
print >>self.key_log, keyID
self.key_log.flush()
[docs] def wait_for_exit(self):
self.exit = defer.Deferred()
return self.exit
[docs] def redraw_buffer(self):
self.terminal.saveCursor()
self.terminal.eraseDisplay()
self.draw_modeline('--:-- test 0 % (0,0) (Fundamental)')
self.terminal.cursorHome()
self.write('\n'.join(self.buffer.split('\n')[0:self.parent.height - 2]))
self.terminal.restoreCursor()
[docs] def draw_modeline(self, text):
self.terminal.cursorPosition(0, self.parent.height - 2)
self.terminal.selectGraphicRendition(str(insults.REVERSE_VIDEO))
self.terminal.write(text.ljust(self.parent.width))
self.terminal.selectGraphicRendition()
[docs] def refresh_modline(self):
if self.has_quit:
return
self.terminal.saveCursor()
self.terminal.selectGraphicRendition(str(insults.REVERSE_VIDEO))
self.parent.setTypeoverMode()
if self.dirty != self.old_dirty:
self.old_dirty = self.dirty
self.terminal.cursorPosition(3, self.parent.height - 2)
if self.dirty:
self.terminal.write('**')
else:
self.terminal.write('--')
self.terminal.cursorPosition(20, self.parent.height - 2)
self.terminal.write('(%s,%s) ' % (self.current_line, self.current_column))
self.parent.setInsertMode()
self.terminal.selectGraphicRendition()
self.terminal.restoreCursor()
[docs] def draw_status(self, text):
text = text.replace('\n', '^J')
if text == self.status_line:
return
self.status_line = text
self.terminal.saveCursor()
self.terminal.cursorPosition(0, self.parent.height - 1)
self.terminal.eraseLine()
self.terminal.write(text)
self.terminal.restoreCursor()
[docs] def abort(self):
self.prefix = ""
self.draw_status("Quit")
[docs] def char_at(self, pos):
return self.buffer[pos] if pos < len(self.buffer) else '\x00'
[docs] def write(self, text):
for line in text.splitlines(True):
if line.startswith('#'):
self.terminal.write(self.COMMENT_COLOR)
self.terminal.write(line)
if line.startswith('#'):
self.terminal.write(RESET_COLOR)
[docs] def handle_EOF(self):
pass
[docs] def handle_EXIT(self):
self.has_quit = True
self.exit.callback(None)
[docs] def handle_SAVE(self):
self.draw_status("Object saved")
self.saved = copy(self.buffer)
self.dirty = False
[docs] def handle_DUMP_BUFFER(self):
self.draw_status("Buffer dumped to /tmp/dump.txt")
with closing(open('/tmp/dump.txt', 'w')) as f:
print >>f, self.buffer,
[docs] def handle_WHAT_CURSOR_POSITION(self):
ch = self.char_at(self.pos)
pch = self.char_at(self.pos - 1) if self.pos else '\x00'
nch = self.char_at(self.pos + 1) if self.pos + 1 < len(self.buffer) else '\x00'
bol_pos = self.bol_pos()
eol_pos = self.eol_pos()
if self.pos:
if self.char_at(self.pos - 1) == '\n':
prev_line = self.bol_pos(self.pos - 1)
else:
prev_line = self.bol_pos(self.bol_pos() - 2)
else:
prev_line = 0
next_line = self.eol_pos() + 1
self.draw_status('Char: %s (%s %s) point=%s of %s line=%s; Prev: %s (%s %s), Next: %s (%s %s) bol=%s eol=%s prev_line=%s next_line=%s cursor=(%s, %s) termSize=(%s, %s)' %
(self.show_keys((ch,)), ord(ch), hex(ord(ch)),
self.pos, len(self.buffer), self.current_line,
self.show_keys(pch,), ord(pch), hex(ord(pch)),
self.show_keys(nch,), ord(nch), hex(ord(nch)),
bol_pos, eol_pos, prev_line, next_line,
self.terminal.cursorPos.y, self.terminal.cursorPos.x,
self.terminal.termSize.y, self.terminal.termSize.x,
))
[docs] def handle_BACKSPACE(self):
if not self.pos:
return
# currently we have to have a trailing newline
if self.pos == len(self.buffer):
return
self.dirty = True
# currently hitting backspace at the beginning of a line is not implemented
if self.pos == self.bol_pos():
current_screen_line = self.terminal.cursorPos.y
# special handling for last line, because some terminal (xterm and gnome term) don't support
# correctly a scroll region of height 1
scroll_region = self.terminal.cursorPos.y + 1 != self.parent.height - self.MODELINE_HEIGHT
self.terminal.eraseToLineEnd()
self.terminal.saveCursor()
# scroll up part below deleted line
# XXX: consider using termSize.y instead of computing view size using MODELINE_HEIGHT
if scroll_region:
self.terminal.setScrollRegion(current_screen_line + 1, self.parent.height - self.MODELINE_HEIGHT)
self.terminal.cursorPosition(0, self.parent.height - self.MODELINE_HEIGHT - 1)
self.terminal.index()
# redraw revealed line
self.terminal.cursorPosition(0, self.parent.height - self.MODELINE_HEIGHT - 1)
last_line_delta = self.terminal.termSize.y - self.terminal.cursorPos.y
# fixup for scrolling with self.terminal.index()
if scroll_region:
last_line_delta += 1
last_line_pos = find_nth(self.buffer, '\n', last_line_delta, self.pos)
if last_line_pos > 0:
last_line_end = self.buffer.find('\n', last_line_pos + 1)
self.write(self.buffer[last_line_pos + 1:last_line_end])
if scroll_region:
self.reset_scrolling_region()
self.terminal.restoreCursor()
old_bol, old_eol = self.bol_pos(), self.eol_pos()
self.goto_prev_line()
go_forward = self.eol_pos() - self.bol_pos()
if go_forward:
self.terminal.cursorForward(go_forward)
self.write(self.buffer[old_bol:old_eol])
self.terminal.cursorPos.x += old_eol - old_bol
go_backward = old_eol - old_bol
if go_backward:
self.terminal.cursorBackward(go_backward)
self.buffer = self.buffer[:old_bol - 1] + self.buffer[old_bol:]
self.current_column += go_forward
self.pos += go_forward
self.lines -= 1
return
self.terminal.cursorBackward()
self.terminal.deleteCharacter()
self.pos -= 1
self.current_column -= 1
self.delete_character()
[docs] def handle_BEGIN_LINE(self):
move_backward = self.pos - self.bol_pos()
self.current_column = 0
if move_backward:
self.pos = self.bol_pos()
self.terminal.cursorBackward(move_backward)
[docs] def handle_END_LINE(self):
move_forward = self.eol_pos() - self.pos
self.current_column = max(0, self.eol_pos() - self.bol_pos())
if move_forward:
self.pos = self.eol_pos()
self.terminal.cursorForward(move_forward)
[docs] def handle_KILL_LINE(self):
if self.last_handler != self.handle_KILL_LINE:
self.kill_ring = ""
line_length = max(0, self.eol_pos() - self.bol_pos())
if line_length:
self.terminal.eraseToLineEnd()
eol = self.eol_pos()
self.kill_ring += self.buffer[self.pos:eol]
self.buffer = self.buffer[:self.pos] + self.buffer[eol:]
else:
# it's equivalent as handling backspace
self.handle_BACKSPACE()
# except that we remain at the beginning
self.handle_BEGIN_LINE()
# of on this line
self.goto_next_line()
self.kill_ring += '\n'
[docs] def handle_YANK(self):
# slow but simple impl
for ch in self.kill_ring:
self.keystrokeReceived(ch, None)
def _rfind(self, string, sub, start=None, end=None):
"""Behaves like str.find, but returns 0 instead of -1"""
res = string.rfind(sub, start, end)
return res if res != -1 else 0
def _find(self, string, sub, start=None, end=None):
"""Behaves like str.find, but returns last_pos instead of -1"""
res = string.find(sub, start, end)
return res if res != -1 else len(self.buffer) - 1
[docs] def bol_pos(self, from_pos=None):
if from_pos == None:
from_pos = self.pos
return self.buffer.rfind('\n', 0, from_pos) + 1
[docs] def eol_pos(self):
return self._find(self.buffer, '\n', self.pos)
[docs] def handle_LEFT(self):
if not self.pos:
return
if self.char_at(self.pos - 1) == '\n':
self.goto_prev_line()
go_forward = self.eol_pos() - self.pos
self.current_column = go_forward
# twisted insults cannot move by 0 amount
if go_forward:
self.terminal.cursorForward(go_forward)
self.pos += go_forward
else:
self.terminal.cursorBackward()
self.pos -= 1
self.current_column -= 1
[docs] def handle_RIGHT(self):
if self.pos >= len(self.buffer):
return
if self.char_at(self.pos) == '\n':
go_back = self.pos - self.bol_pos()
self.goto_next_line()
self.current_column = 0
# twisted insults cannot move by 0 amount
if go_back:
self.terminal.cursorBackward(go_back)
else:
self.pos += 1
self.terminal.cursorForward()
self.current_column += 1
[docs] def handle_UP(self):
# reached the top of the file
if self.current_line == 0:
return
go_forward = self.pos - self.bol_pos()
self.goto_prev_line()
self.fixup_for_shorter_line(go_forward)
[docs] def goto_prev_line(self):
self.current_line = self.current_line - 1
if self.char_at(self.pos - 1) == '\n':
self.pos = self.bol_pos(self.pos - 1)
else:
self.pos = self.bol_pos(self.bol_pos() - 1)
should_fill = self.terminal.cursorPos.y == 0
# move and possibly scroll
self.terminal.reverseIndex()
if should_fill:
self.terminal.saveCursor()
self.terminal.cursorHome()
self.write(self.buffer[self.pos:self.eol_pos()])
self.terminal.restoreCursor()
[docs] def handle_DOWN(self):
go_forward = self.pos - self.bol_pos()
self.goto_next_line()
self.fixup_for_shorter_line(go_forward)
[docs] def fixup_for_shorter_line(self, go_forward):
line_length = max(0, self.eol_pos() - self.bol_pos())
self.pos += min(go_forward, line_length)
# fixup screen cursor position in case of out of range
move_backward = go_forward - min(go_forward, line_length)
if move_backward:
self.terminal.cursorBackward(move_backward)
self.current_column -= move_backward
[docs] def goto_next_line(self):
# reached the end of the file
if self.current_line >= self.lines - 1:
return
self.current_line = self.current_line + 1
self.pos = self.eol_pos() + 1
should_fill = self.terminal.cursorPos.y == self.terminal.termSize.y - 1
# move and possibly scroll
self.terminal.index()
if should_fill:
self.terminal.saveCursor()
if self.terminal.cursorPos.x:
self.terminal.cursorBackward(self.terminal.cursorPos.x)
self.write(self.buffer[self.pos:self.eol_pos()])
self.terminal.restoreCursor()
[docs] def handle_REDRAW(self):
self.redraw_buffer()
[docs] def show_keys(self, keys, prefix='C-'):
def show_key(key):
if not isinstance(key, str):
return str(key)
if ord(key) == 127:
return '^H'
if ord(key) < 32 and key != '\r':
return prefix + chr(ord('A') + ord(key) - 1)
return key
return "-".join(show_key(key) for key in keys)
[docs] def insert_character(self, ch):
import string
if not ch in string.printable:
return
self.dirty = True
if ch == '\n':
self.terminal.eraseToLineEnd()
self.terminal.saveCursor()
if self.terminal.cursorPos.y < self.terminal.termSize.y - 1:
# scroll up part below deleted line
# XXX: consider using termSize.y instead of computing view size using MODELINE_HEIGHT
current_screen_line = self.terminal.cursorPos.y
# special handling for last line, because some terminal (xterm and gnome term) don't support
# correctly a scroll region of height 1
scroll_region = self.terminal.cursorPos.y + 2 != self.parent.height - self.MODELINE_HEIGHT
if scroll_region:
self.terminal.setScrollRegion(current_screen_line + 2, self.parent.height - self.MODELINE_HEIGHT)
self.terminal.cursorPosition(0, current_screen_line + 1)
self.terminal.reverseIndex()
else:
self.terminal.cursorPosition(0, current_screen_line + 1)
self.terminal.eraseToLineEnd()
self.write(self.buffer[self.pos:self.eol_pos()])
if scroll_region:
self.reset_scrolling_region()
self.terminal.restoreCursor()
self.terminal.cursorPosition(0, current_screen_line + 1)
else:
self.terminal.saveCursor()
self.terminal.index()
self.write(self.buffer[self.pos:self.eol_pos()])
self.terminal.restoreCursor()
self.lines += 1
self.current_line += 1
self.current_column = 0
self.terminal.cursorPos.y += 1
self.terminal.cursorPos.y = min(self.terminal.cursorPos.y, self.terminal.termSize.y - 1)
else:
self.current_column += 1
highlight = '#' in self.buffer[self.bol_pos():self.pos]
if highlight:
self.terminal.write(self.COMMENT_COLOR)
self.terminal.write(ch)
if highlight:
self.terminal.write(RESET_COLOR)
# insults doesn't keep track of cursor movement made by write
self.terminal.cursorPos.x += 1
self.buffer = self.buffer[:self.pos] + ch + self.buffer[self.pos:]
self.pos += 1
[docs] def delete_character(self):
self.buffer = self.buffer[:self.pos] + self.buffer[self.pos + 1:]
def _echo(self, keyID, mod):
"""Echoes characters on terminal like on unix (special chars etc)"""
if isinstance(keyID, str):
if keyID in ('\r'):
keyID = '\n'
self.insert_character(keyID)
else:
log.msg("GOT Special char '%s' (%s)" % (keyID, type(keyID)), system='ssh')
@exception_logger
[docs] def keystrokeReceived(self, keyID, mod):
self.trace_keystroke(keyID, mod)
if keyID == CTRL_G:
self.abort()
elif self.prefix:
handler = self.key_handlers.get((self.prefix, keyID), None)
try:
if handler:
self.draw_status("")
handler()
else:
self.draw_status("%s is undefined" % self.show_keys((self.prefix, keyID)))
finally:
self.last_handler = handler
self.prefix = None
elif keyID in self.prefixes:
self.prefix = keyID
self.draw_status(self.show_keys(keyID))
elif self.key_handlers.get(keyID, None):
handler = self.key_handlers.get(keyID, None)
try:
handler()
finally:
self.last_handler = handler
else:
self.draw_status("")
self._echo(keyID, mod)
self.refresh_modline()
if self.tracing and not self.has_quit:
self.handle_WHAT_CURSOR_POSITION()