Source code for termio

# -*- coding: utf-8 -*-
#
#       Copyright 2011 Liftoff Software Corporation (http://liftoffsoftware.com)
#
# NOTE:  Commercial licenses for this software are available!
#

# TODO: See if we can spin off termio.py into its own little program that sits between Gate One and ssh_connect.py.  That way we can take advantage of multiple cores/processors (for terminal-to-HTML processing).  There's no reason why we can't write something that does what dtach does.  Just need to redirect the fd of self.cmd to a unix domain socket and os.setsid() somewhere after forking (twice maybe?).
# TODO: Make the environment variables used before launching self.cmd configurable

# Meta
__version__ = '1.1'
__license__ = "AGPLv3 or Proprietary (see LICENSE.txt)"
__version_info__ = (1, 1)
__author__ = 'Dan McDougall <daniel.mcdougall@liftoffsoftware.com>'

__doc__ = """\
About termio
============
This module provides a Multiplex class that can perform the following:

 * Fork a child process that opens a given terminal program.
 * Read and write data to and from the child process (synchronously or asynchronously).
 * Examine the output of the child process in real-time and perform actions (also asynchronously!) based on what is "expected" (aka non-blocking, pexpect-like functionality).
 * Log the output of the child process to a file and/or syslog.

The Multiplex class was built for asynchronous use in conjunction with a running
:class:`tornado.ioloop.IOLoop` instance but it can be used in a synchronous
(blocking) manner as well.  Synchronous use of this module is most likely to be
useful in an interactive Python session but if blocking doesn't matter for your
program please see the section titled, "Blocking" for tips & tricks.

Here's an example instantiating a Multiplex class::

    multiplexer = termio.Multiplex(
        'nethack',
        log_path='/var/log/myapp',
        user='bsmith@CORP',
        term_id=1,
        syslog=True
    )

.. note:: Support for event loops other than Tornado is in the works!

Then *multiplexer* can create and launch a new controlling terminal (tty)
running the given command (e.g. 'nethack')::

    env = {
        'PATH': os.environ['PATH'],
        'MYVAR': 'foo'
    }
    fd = multiplexer.spawn(80, 24, env=env)
    # The fd is returned from spawn() in case you want more low-level control.

Asynchronous input and output from the controlled program is handled via IOLoop.
It will automatically write all output from the terminal program to an instance
of self.terminal_emulator (which defaults to Gate One's `terminal.Terminal`).
So if you want to perform an action whenever the running terminal application
has output (like, say, sending a message to a client) you'll need to attach a
callback::

    def screen_update():
        'Called when new output is ready to send to the client'
        output = multiplexer.dump_html()
        socket_or_something.write(output)
    multiplexer.callbacks[multiplexer.CALLBACK_UPDATE] = screen_update

In this example, `screen_update()` will `write()` the output of
`multiplexer.dump_html()` to *socket_or_something* whenever the terminal program
has some sort of output.  You can also make calls directly to the terminal
emulator (if you're using a custom one)::

    def screen_update():
        output = multiplexer.term.my_custom_func()
        whatever.write(output)

Writing characters to the controlled terminal application is pretty
straightforward::

    multiplexer.write(u'some text')

Typically you'd pass in keystrokes or commands from your application to the
underlying program this way and the screen/terminal emulator would get updated
automatically.  If using Gate One's `terminal.Terminal()` you can also attach
callbacks to perform further actions when more specific situations are
encountered (e.g. when the window title is set via its respective escape
sequence)::

    def set_title():
        'Hypothetical title-setting function'
        print("Window title was just set to: %s" % multiplexer.term.title)
    multiplexer.term.callbacks[multiplexer.CALLBACK_TITLE] = set_title

Module Functions and Classes
============================
"""

# Stdlib imports
import os, sys, time, struct, io, gzip, re, logging
from copy import copy
from datetime import timedelta, datetime
from functools import partial
from itertools import izip
from multiprocessing import Process
from json import loads as json_decode
from json import dumps as json_encode

# Inernationalization support
import gettext
gettext.install('termio')

# Globals
SEPARATOR = u"\U000f0f0f" # The character used to separate frames in the log
# NOTE: That unicode character was carefully selected from only the finest
# of the PUA.  I hereby dub thee, "U+F0F0F0, The Separator."
CALLBACK_THREAD = None # Used by add_callback()
POSIX = 'posix' in sys.builtin_module_names
MACOS = os.uname()[0] == 'Darwin'
# Matches Gate One's special optional escape sequence (ssh plugin only)
RE_OPT_SSH_SEQ = re.compile(
    r'.*\x1b\]_\;(ssh\|.+?)(\x07|\x1b\\)', re.MULTILINE|re.DOTALL)
# Matches an xterm title sequence
RE_TITLE_SEQ = re.compile(
    r'.*\x1b\][0-2]\;(.+?)(\x07|\x1b\\)', re.DOTALL|re.MULTILINE)

# Helper functions
[docs]def debug_expect(m_instance, match, pattern): """ This method is used by :meth:`BaseMultiplex.expect` if :attr:`self.debug` is True. It facilitates easy debugging of regular expressions. It will print out precisely what was matched and where. .. note:: This function only works with post-process patterns. """ print("%s was matched..." % repr(match)) out = "" for line in m_instance.dump(): match_obj = pattern.search(line) if match_obj: out += "--->%s\n" % repr(line) break else: out += " %s\n" % repr(line) print(out)
[docs]def retrieve_first_frame(golog_path): """ Retrieves the first frame from the given *golog_path*. """ found_first_frame = None frame = b"" f = gzip.open(golog_path) while not found_first_frame: frame += f.read(1) # One byte at a time if frame.decode('UTF-8', "ignore").endswith(SEPARATOR): # That's it; wrap this up found_first_frame = True distance = f.tell() f.close() return (frame.decode('UTF-8', "ignore").rstrip(SEPARATOR), distance)
[docs]def retrieve_last_frame(golog_path): """ Retrieves the last frame from the given *golog_path*. It does this by iterating over the log in reverse. """ encoded_separator = SEPARATOR.encode('UTF-8') golog = gzip.open(golog_path) chunk_size = 1024*128 # Seek to the end of the file (gzip objects don't support negative seeking) distance = chunk_size prev_tell = None while golog.tell() != prev_tell: prev_tell = golog.tell() try: golog.seek(distance) except IOError: return # Something wrong with the file distance += distance # Now that we're at the end, go back a bit and split from there golog.seek(golog.tell() - chunk_size) end_frames = golog.read().split(encoded_separator) if len(end_frames) > 1: # Very last item will be empty return end_frames[-2].decode('UTF-8', 'ignore') else: # Just a single frame here, return it as-is return end_frames[0].decode('UTF-8', 'ignore')
[docs]def get_or_update_metadata(golog_path, user, force_update=False): """ Retrieves or creates/updates the metadata inside of *golog_path*. If *force_update* the metadata inside the golog will be updated even if it already exists. .. note:: All logs will need "fixing" the first time they're enumerated like this since they won't have an end_date. Fortunately we only need to do this once per golog. """ logging.debug('get_or_update_metadata(%s, %s, %s)' % (golog_path, user, force_update)) if not os.path.getsize(golog_path): # 0 bytes return # Nothing to do try: first_frame, distance = retrieve_first_frame(golog_path) except IOError: # Something wrong with the log... Probably still being written to return metadata = {} if first_frame[14:].startswith('{'): # This is JSON, capture existing metadata metadata = json_decode(first_frame[14:]) # end_date gets added by this function if not force_update and 'end_date' in metadata: return metadata # All done # '\xf3\xb0\xbc\x8f' <--UTF-8 encoded SEPARATOR (for reference) encoded_separator = SEPARATOR.encode('UTF-8') golog = gzip.open(golog_path) # Loop over the file in big chunks (which is faster than read() by an order # of magnitude) chunk_size = 1024*128 # 128k should be enough for a 100x300 terminal full # of 4-byte unicode characters. That would be one BIG frame (i.e. unlikely). # Sadly, we have to read the whole thing into memory (log_data) in order to # perform this important work (creating proper metadata). # On the plus side re-compressing the log can save a _lot_ of disk space # Why? Because termio.py writes the frames using gzip.open() in append mode # which is a lot less efficient than compressing all the data in one go. log_data = b'' total_frames = 0 while True: try: chunk = golog.read(chunk_size) except IOError: return # Something wrong with the file total_frames += chunk.count(encoded_separator) log_data += chunk if len(chunk) < chunk_size: break log_data = log_data.decode('UTF-8', 'ignore') start_date = first_frame[:13] # Getting the start date is easy last_frame = retrieve_last_frame(golog_path) # This takes some work end_date = last_frame[:13] version = u"1.0" connect_string = None from gateone import PLUGINS if 'ssh' in PLUGINS['py']: # Try to find the host that was connected to by looking for the SSH # plugin's special optional escape sequence. It looks like this: # "\x1b]_;ssh|%s@%s:%s\007" match_obj = RE_OPT_SSH_SEQ.match(log_data[:(chunk_size*10)]) if match_obj: connect_string = match_obj.group(1).split('|')[1] if not connect_string: # Try guessing it by looking for a title escape sequence match_obj = RE_TITLE_SEQ.match(log_data[:(chunk_size*10)]) if match_obj: # The split() here is an attempt to remove the tail end of # titles like this: 'someuser@somehost: ~' connect_string = match_obj.group(1) # TODO: Add some hooks here for plugins to add their own metadata metadata.update({ u'user': user, u'start_date': start_date, u'end_date': end_date, u'frames': total_frames, u'version': version, u'connect_string': connect_string, u'filename': os.path.split(golog_path)[1] }) # Make a *new* first_frame first_frame = u"%s:" % start_date first_frame += json_encode(metadata) # Replace the first frame and re-save the log log_data = log_data.encode('UTF-8') # Encode this first to ensure 'distance' log_data = (first_frame + SEPARATOR).encode('UTF-8') + log_data[distance:] gzip.open(golog_path, 'w').write(log_data) return metadata # Exceptions
[docs]class Timeout(Exception): """ Used by :meth:`BaseMultiplex.expect` and :meth:`BaseMultiplex.await`; called when a timeout is reached. """ pass
[docs]class ProgramTerminated(Exception): """ Called when we try to write to a process that's no longer running. """ pass # Classes
[docs]class Pattern(object): """ Used by :meth:`BaseMultiplex.expect`, an object to store patterns (regular expressions) and their associated properties. .. note:: The variable *m_instance* is used below to mean the current instance of BaseMultiplex (or a subclass thereof). :pattern: A regular expression or iterable of regular expressions that will be checked against the output stream. :callback: A function that will be called when the pattern is matched. Callbacks are called like so: >>> callback(m_instance, matched_string) .. tip:: If you provide a string instead of a function for your *callback* it will automatically be converted into a function that writes the string to the child process. Example:: >>> p = Pattern('(?i)password:', 'mypassword\\n') :optional: Indicates that this pattern is optional. Meaning that it isn't required to match before the next pattern in :attr:`BaseMultiplex._patterns` is checked. :sticky: Indicates that the pattern will not time out and won't be automatically removed from self._patterns when it is matched. :errorback: A function to call in the event of a timeout or if an exception is encountered. Errorback functions are called like so: >>> errorback(m_instance) :preprocess: Indicates that this pattern is to be checked against the incoming stream before it is processed by the terminal emulator. Useful if you need to match non-printable characters like control codes and escape sequences. :timeout: A :obj:`datetime.timedelta` object indicating how long we should wait before calling :meth:`errorback`. :created: A :obj:`datetime.datetime` object that gets set when the Pattern is instantiated by :meth:`BaseMultiplex.expect`. It is used to determine if and when a timeout has been reached. """ def __init__(self, pattern, callback, optional=False, sticky=False, errorback=None, preprocess=False, timeout=30): self.pattern = pattern if isinstance(callback, (str, unicode)): # Convert the string to a write() call self.callback = lambda m, match: m.write(unicode(callback)) else: self.callback = callback self.errorback = errorback self.optional = optional self.sticky = sticky self.preprocess = preprocess self.timeout = timeout self.created = datetime.now()
[docs]class BaseMultiplex(object): """ A base class that all Multiplex types will inherit from. :cmd: *string* - The command to execute when calling :meth:`spawn`. :terminal_emulator: *terminal.Terminal or similar* - The terminal emulator to write to when capturing the incoming output stream from *cmd*. :log_path: *string* - The absolute path to the log file where the output from *cmd* will be saved. :term_id: *string* - The terminal identifier to associated with this instance (only used in the logs to identify terminals). :syslog: *boolean* - Whether or not the session should be logged using the local syslog daemon. :syslog_host: *string* - An optional syslog host to send session log information to (this is independent of the *syslog* option above--it does not require a syslog daemon be present on the host running Gate One). :syslog_facility: *integer* - The syslog facility to use when logging messages. All possible facilities can be found in `utils.FACILITIES` (if you need a reference other than the syslog module). :debug: *boolean* - Used by the `expect` methods... If set, extra debugging information will be output whenever a regular expression is matched. """ CALLBACK_UPDATE = 1 # Screen update CALLBACK_EXIT = 2 # When the underlying program exits def __init__(self, cmd, terminal_emulator=None, # Defaults to Gate One's terminal.Terminal log_path=None, user=None, # Only used by log output (to differentiate who's who) term_id=None, # Also only for syslog output for the same reason syslog=False, syslog_host=None, syslog_facility=None, debug=False): self.debug = debug self.exitfunc = None self.cmd = cmd if not terminal_emulator: # Why do this? So you could use/write your own specialty emulator. # Whatever you use it just has to accept 'rows' and 'cols' as # keyword arguments in __init__() from terminal import Terminal # Dynamic import to cut down on waste self.terminal_emulator = Terminal else: self.terminal_emulator = terminal_emulator self.log_path = log_path # Logs of the terminal output wind up here self.log = None # Just a placeholder until it is opened self.syslog = syslog # See "if self.syslog:" below self._alive = False self.ratelimiter_engaged = False self.rows = 24 self.cols = 80 self.pid = -1 # Means "no pid yet" self.started = "Never" self._patterns = [] self._handling_match = False # Setup our callbacks self.callbacks = { # Defaults do nothing which saves some conditionals self.CALLBACK_UPDATE: {}, self.CALLBACK_EXIT: {}, } # Configure syslog logging self.user = user self.term_id = term_id self.syslog_buffer = '' if self.syslog and not self.syslog_host: try: import syslog except ImportError: logging.error(_( "The syslog module is required to log terminal sessions to " "syslog if no syslog_host is set. The syslog module is not" " required if you want to send syslog messages to a remote " "syslog server but for this to work you must set the " "syslog_host variable either via the command-line switch or" " in your server.conf.")) sys.exit(1) if not syslog_facility: syslog_facility = syslog.LOG_DAEMON syslog_facility = syslog_facility # Sets up syslog messages to show up like this: # Sep 28 19:45:02 <hostname> gateone: <log message> syslog.openlog('gateone', 0, syslog_facility) def __repr__(self): """ Returns self.__str__() """ return "<%s>" % self.__str__() def __str__(self): """ Returns a string representation of this Multiplex instance and the current state of things. """ started = self.started if started != "Never": started = self.started.isoformat() out = ( "%s.%s: " "term_id: %s, " "alive: %s, " "command: %s, " "started: %s" % ( self.__module__, self.__class__.__name__, self.term_id, self._alive, repr(self.cmd), started ) ) return out
[docs] def add_callback(self, event, callback, identifier=None): """ Attaches the given *callback* to the given *event*. If given, *identifier* can be used to reference this callback leter (e.g. when you want to remove it). Otherwise an identifier will be generated automatically. If the given *identifier* is already attached to a callback at the given event, that callback will be replaced with *callback*. *event* - The numeric ID of the event you're attaching *callback* to (e.g. Multiplex.CALLBACK_UPDATE). *callback* - The function you're attaching to the *event*. *identifier* - A string or number to be used as a reference point should you wish to remove or update this callback later. Returns the identifier of the callback. to Example: >>> m = Multiplex() >>> def somefunc(): pass >>> id = "myref" >>> ref = m.add_callback(m.CALLBACK_UPDATE, somefunc, id) .. note:: This allows the controlling program to have multiple callbacks for the same event. """ if not identifier: identifier = callback.__hash__() self.callbacks[event][identifier] = callback return identifier
[docs] def remove_callback(self, event, identifier): """ Removes the callback referenced by *identifier* that is attached to the given *event*. Example: >>> m.remove_callback(m.CALLBACK_BELL, "myref") """ try: del self.callbacks[event][identifier] except KeyError: pass # Doesn't exist anymore--nothing to do
[docs] def remove_all_callbacks(self, identifier): """ Removes all callbacks associated with *identifier*. """ for event, identifiers in self.callbacks.items(): try: del self.callbacks[event][identifier] except KeyError: pass # Doesn't exist--nothing to worry about
[docs] def _call_callback(self, callback): """ This method is here in the event that subclasses of `BaseMultiplex` need to call callbacks in an implementation-specific way. It just calls *callback*. """ callback()
[docs] def spawn(self, rows=24, cols=80, env=None, em_dimensions=None): """ This method must be overridden by suclasses of `BaseMultiplex`. It is expected to execute a child process in a way that allows non-blocking reads to be performed. """ raise NotImplementedError(_( "spawn() *must* be overridden by subclasses."))
[docs] def isalive(self): """ This method must be overridden by suclasses of `BaseMultiplex`. It is expected to return True if the child process is still alive and False otherwise. """ raise NotImplementedError(_( "isalive() *must* be overridden by subclasses."))
[docs] def term_write(self, stream): """ Writes :obj:`stream` to `BaseMultiplex.term` and also takes care of logging to :attr:`log_path` (if set) and/or syslog (if :attr:`syslog` is `True`). When complete, will call any callbacks registered in :obj:`CALLBACK_UPDATE`. :stream: A string or bytes containing the incoming output stream from the underlying terminal program. .. note:: This kind of logging doesn't capture user keystrokes. This is intentional as we don't want passwords winding up in the logs. """ #logging.debug('term_write() stream: %s' % repr(stream)) # Write to the log (if configured) separator = b"\xf3\xb0\xbc\x8f" if self.log_path: # Using .encode() below ensures the result will be bytes now = str(int(round(time.time() * 1000))).encode('UTF-8') if not os.path.exists(self.log_path): # Write the first frame as metadata metadata = { 'version': '1.0', # Log format version 'rows': self.rows, 'cols': self.cols, 'start_date': now.decode('UTF-8') # JSON needs strings # NOTE: end_date should be added later when the is read for # the first time by either the logviewer or the logging # plugin. } # The hope is that we can use the first-frame-metadata paradigm # to store all sorts of useful information about a log. # NOTE: Using .encode() below to ensure it is bytes in Python 3 metadata_frame = json_encode(metadata).encode('UTF-8') # Using concatenation of bytes below to ensure compatibility # with both Python 2 and Python 3. metadata_frame = now + b":" + metadata_frame + separator self.log = gzip.open(self.log_path, mode='a') self.log.write(metadata_frame) if not self.log: # Only comes into play if the file already exists self.log = gzip.open(self.log_path, mode='a') # NOTE: I'm using an obscure unicode symbol in order to avoid # conflicts. We need to do our best to ensure that we can # differentiate between terminal output and our log format... # This should do the trick because it is highly unlikely that # someone would be displaying this obscure unicode symbol on an # actual terminal unless they were using Gate One to view a # Gate One log file in vim or something =) # "\xf3\xb0\xbc\x8f" == \U000f0f0f == U+F0F0F (Private Use Symbol) output = now + b":" + stream + separator self.log.write(output) # NOTE: Gate One's log format is special in that it can be used for both # playing back recorded sessions *or* generating syslog-like output. if self.syslog: # Try and keep it as line-line as possible so we don't end up with # a log line per character. if '\n' in stream: for line in stream.splitlines(): if self.syslog_buffer: line = self.syslog_buffer + line self.syslog_buffer = '' # Sylog really doesn't like any fancy encodings line = line.encode('ascii', 'xmlcharrefreplace') syslog.syslog("%s %s: %s" % ( self.user, self.term_id, line)) else: self.syslog_buffer += stream # Handle preprocess patterns (for expect()) if self._patterns: self.preprocess(stream) self.term.write(stream) # Handle post-process patterns (for expect()) if self._patterns: self.postprocess() if self.CALLBACK_UPDATE in self.callbacks: for callback in self.callbacks[self.CALLBACK_UPDATE].values(): self._call_callback(callback)
[docs] def preprocess(self, stream): """ Handles preprocess patterns registered by :meth:`expect`. That is, those patterns which have been marked with `preprocess = True`. Patterns marked in this way get handled *before* the terminal emulator processes the :obj:`stream`. :stream: A string or bytes containing the incoming output stream from the underlying terminal program. """ preprocess_patterns = (a for a in self._patterns if a.preprocess) finished_non_sticky = False # If there aren't any preprocess patterns this won't do anything: for pattern_obj in preprocess_patterns: if finished_non_sticky and not pattern_obj.sticky: # We only want sticky patterns if we've already matched once continue if isinstance(pattern_obj.pattern, (list, tuple)): for pat in pattern_obj.pattern: match = pat.search(term_lines) if match: callback = partial( pattern_obj.callback, self, match.group()) self._call_callback(callback) if not pattern_obj.sticky: self.unexpect(hash(pattern_obj)) # Remove it break else: match = pattern_obj.pattern.search(stream) if match: callback = partial( pattern_obj.callback, self, match.group()) self._call_callback(callback) if not pattern_obj.sticky: self.unexpect(hash(pattern_obj)) # Remove it if not pattern_obj.optional: # We only match the first non-optional pattern finished_non_sticky = True
[docs] def postprocess(self): """ Handles post-process patterns registered by :meth:`expect`. """ # Check the terminal emulator screen for any matching patterns. post_patterns = (a for a in self._patterns if not a.preprocess) finished_non_sticky = False for pattern_obj in post_patterns: # For post-processing matches we search the terminal emulator's # screen as a single string. This allows for full-screen screen # scraping in addition to typical 'expect-like' functionality. # The big difference being that with traditional expect (and # pexpect) you don't get to examine the program's output as it # would be rendered in an actual terminal. # By using post-processing of the text after it has been handled # by a terminal emulator we don't have to worry about hidden # characters and escape sequences that we may not be aware of or # could make our regular expressions much more complicated than # they should be. if finished_non_sticky and not pattern_obj.sticky: continue # We only want sticky patterns at this point # For convenience, trailing whitespace is removed from the lines # output from the terminal emulator. This is so we don't have to # put '\w*' before every '$' to match the end of a line. term_lines = "\n".join( [a.rstrip() for a in self.term.dump()]).rstrip() if isinstance(pattern_obj.pattern, (list, tuple)): for pat in pattern_obj.pattern: match = pat.search(term_lines) if match: self._handle_match(pattern_obj, match) break else: match = pattern_obj.pattern.search(term_lines) if match: self._handle_match(pattern_obj, match) if not pattern_obj.optional and not pattern_obj.sticky: # We only match the first non-optional pattern finished_non_sticky = True
[docs] def _handle_match(self, pattern_obj, match): """ Handles a matched regex detected by :meth:`postprocess`. It calls :obj:`Pattern.callback` and takes care of removing it from :attr:`_patterns` (if it isn't sticky). """ if self._handling_match: # Don't process anything if we're in the middle of handling a match. # NOTE: This can happen when there's more than one thread, # processes, or PeriodicCallback going on simultaneously. It seems # to work better than threading.Lock() return self._handling_match = True callback = partial(pattern_obj.callback, self, match.group()) self._call_callback(callback) if self.debug: # Turn on the fancy regex debugger/pretty printer debug_callback = partial( debug_expect, self, match.group(), pattern_obj.pattern) self._call_callback(debug_callback) if not pattern_obj.sticky: self.unexpect(hash(pattern_obj)) # Remove it self._handling_match = False
[docs] def writeline(self, line=''): """ Just like :meth:`write` but it writes a newline after writing *line*. If no *line* is given a newline will be written. """ self.write(line + u'\r\n')
[docs] def writelines(self, lines): """ Writes *lines* (a list of strings) to the underlying program, appending a newline after each line. """ if getattr(lines, '__iter__', False): for line in lines: self.write(line + u'\r\n') else: raise TypeError(_( "%s is not iterable (strings don't count :)" % type(lines)))
[docs] def dump_html(self, full=False, client_id='0'): """ Returns the difference of terminal lines (a list of lines, to be specific) and its scrollback buffer (which is also a list of lines) as a tuple:: (scrollback, screen) If a line hasn't changed since the last dump said line will be replaced with an empty string in the output. If *full*, will return the entire screen (not just the diff). if *client_id* is given (string), this will be used as a unique client identifier for keeping track of screen differences (so you can have multiple clients getting their own unique diff output for the same Multiplex instance). """ if client_id not in self.prev_output: self.prev_output[client_id] = [None for a in xrange(self.rows-1)] try: scrollback, html = ([], []) if self.term: try: result = self.term.dump_html() if result: scrollback, html = result # Make a copy so we can save it to prev_output later preserved_html = html[:] except IOError as e: logging.debug(_("IOError attempting self.term.dump_html()")) logging.debug("%s" % e) if html: if not full: count = 0 for line1, line2 in izip(self.prev_output[client_id], html): if line1 != line2: html[count] = line2 # I love updates-in-place else: html[count] = '' count += 1 # Otherwise a full dump will take place self.prev_output.update({client_id: preserved_html}) return (scrollback, html) except ValueError as e: # This would be special... logging.error(_("ValueError in dump_html(): %s" % e)) return ([], []) except (IOError, TypeError) as e: logging.error(_("Unhandled exception in dump_html(): %s" % e)) if self.ratelimiter_engaged: # Caused by the program being out of control return([], [ _("<b>Program output too noisy. Sending Ctrl-c...</b>")]) else: import traceback traceback.print_exc(file=sys.stdout) return ([], [])
[docs] def dump(self): """ Dumps whatever is currently on the screen of the terminal emulator as a list of plain strings (so they'll be escaped and look nice in an interactive Python interpreter). """ return self.term.dump()
[docs] def timeout_check(self, timeout_now=False): """ Iterates over :attr:`BaseMultiplex._patterns` checking each to determine if it has timed out. If a timeout has occurred for a `Pattern` and said Pattern has an *errorback* function that function will be called. Returns True if there are still non-sticky patterns remaining. False otherwise. If *timeout_now* is True, will force the first errorback to be called and will empty out self._patterns. """ remaining_patterns = False for pattern_obj in self._patterns: if timeout_now: if pattern_obj.errorback: errorback = partial(pattern_obj.errorback, self) self._call_callback(errorback) self.unexpect() return False if not pattern_obj.timeout: # Timeouts of 0 or None mean "wait forever" remaining_patterns = True continue elapsed = datetime.now() - pattern_obj.created if elapsed > pattern_obj.timeout: if not pattern_obj.sticky: self.unexpect(hash(pattern_obj)) if pattern_obj.errorback: errorback = partial(pattern_obj.errorback, self) self._call_callback(errorback) elif not pattern_obj.sticky: remaining_patterns = True return remaining_patterns
[docs] def expect(self, patterns, callback, optional=False, sticky=False, errorback=None, timeout=15, position=None, preprocess=False): """ Watches the stream of output coming from the underlying terminal program for *patterns* and if there's a match *callback* will be called like so:: callback(multiplex_instance, matched_string) .. tip:: You can provide a string instead of a *callback* function as a shortcut if you just want said string written to the child process. *patterns* can be a string, an :class:`re.RegexObject` (as created by :func:`re.compile`), or a iterator of either/or. Returns a reference object that can be used to remove the registered pattern/callback at any time using the :meth:`unexpect` method (see below). .. note:: This function is non-blocking! .. warning:: The *timeout* value gets compared against the time :meth:`expect` was called to create it. So don't wait too long if you're planning on using :meth:`await`! Here's a simple example that changes a user's password:: >>> def write_password(m_instance, matched): ... print("Sending Password... %s patterns remaining." % len(m_instance._patterns)) ... m_instance.writeline('somepassword') >>> m = Multiplex('passwd someuser') # Assumes running as root :) >>> m.expect('(?i)password:', write_password) # Step 1 >>> m.expect('(?i)password:', write_password) # Step 2 >>> print(len(m._patterns)) # To show that there's two in the queue 2 >>> m.spawn() # Execute the command >>> m.await(10) # This will block for up to 10 seconds waiting for self._patterns to be empty (not counting optional patterns) Sending Password... 1 patterns remaining. Sending Password... 0 patterns remaining. >>> m.isalive() False >>> # All done! .. tip:: The :meth:`await` method will automatically call :meth:`spawn` if not :meth:`isalive`. This would result in the password of 'someuser' being changed to 'somepassword'. How is the order determined? Every time :meth:`expect` is called it creates a new :class:`Pattern` using the given parameters and appends it to `self._patterns` (which is a list). As each :class:`Pattern` is matched its *callback* gets called and the :class:`Pattern` is removed from `self._patterns` (unless *sticky* is `True`). So even though the patterns and callbacks listed above were identical they will get executed and removed in the order they were created as each respective :class:`Pattern` is matched. .. note:: Only the first pattern, or patterns marked as *sticky* are checked against the incoming stream. If the first non-sticky pattern is marked *optional* then the proceeding pattern will be checked (and so on). All other patterns will sit in `self._patterns` until their predecessors are matched/removed. Patterns can be removed from `self._patterns` as needed by calling `unexpect(<reference>)`. Here's an example:: >>> def handle_accepting_ssh_key(m_instance, matched): ... m_instance.writeline(u'yes') >>> m = Multiplex('ssh someuser@somehost') >>> ref1 = m.expect('(?i)Are you sure.*\(yes/no\)\?', handle_accepting_ssh_key, optional=True) >>> def send_password(m_instance, matched): ... m_instance.unexpect(ref1) ... self.writeline('somepassword') >>> ref2 = m.expect('(?i)password:', send_password) >>> # spawn() and/or await() and do stuff... The example above would send 'yes' if asked by the SSH program to accept the host's public key (which would result in it being automatically removed from `self._patterns`). However, if this condition isn't met before send_password() is called, send_password() will use the reference object to remove it directly. This ensures that the pattern won't be accidentally matched later on in the program's execution. .. note:: Even if we didn't match the "Are you sure..." pattern it would still get auto-removed after its timeout was reached. **About pattern ordering:** The position at which the given pattern will be inserted in `self._patterns` can be specified via the *position* argument. The default is to simply append which should be appropriate in most cases. **About Timeouts:** The *timeout* value passed to expect() will be used to determine how long to wait before the pattern is removed from self._patterns. When this occurs, *errorback* will be called with current Multiplex instance as the only argument. If *errorback* is None (the default) the pattern will simply be discarded with no action taken. .. note:: If *sticky* is True the *timeout* value will be ignored. **Notes about the length of what will be matched:** The entire terminal 'screen' will be searched every time new output is read from the incoming stream. This means that the number of rows and columns of the terminal determines the size of the search. So if your pattern needs to look for something inside of 50 lines of text you need to make sure that when you call `spawn` you specify at least `rows = 50`. Example:: >>> def handle_long_search(m_instance, matched) ... do_stuff(matched) >>> m = Multiplex('someCommandWithLotsOfOutput.sh') >>> # 'begin', at least one non-newline char, 50 newlines, at least one char, then 'end': >>> my_regex = re.compile('begin.+[\\n]{50}.+end', re.MULTILINE) >>> ref = m.expect(my_regex, handle_accepting_ssh_key) >>> m.spawn(rows=51, cols=150) >>> # Call m.read(), m.spawn() or just let an event loop (e.g. Tornado's IOLoop) take care of things... **About non-printable characters:** If the *postprocess* argument is True (the default), patterns will be checked against the current screen as output by the terminal emulator. This means that things like control codes and escape sequences will be handled and discarded by the terminal emulator and as such won't be available for patterns to be checked against. To get around this limitation you can set *preprocess* to True and the pattern will be checked against the incoming stream before it is processed by the terminal emulator. Example:: >>> def handle_xterm_title(m_instance, matched) ... print("Caught title: %s" % matched) >>> m = Multiplex('echo -e "\\033]0;Some Title\\007"') >>> title_seq_regex = re.compile(r'\\x1b\\][0-2]\;(.*?)(\\x07|\\x1b\\\\)') >>> m.expect(title_seq_regex, handle_xterm_title, preprocess=True) # <-- 'preprocess=True' >>> m.await() Caught title: Some Title >>> **Notes about debugging:** Instead of using `await` to wait for all of your patterns to be matched at once you can make individual calls to `read` to determine if your patterns are being matched in the way that you want. For example:: >>> def do_stuff(m_instance, matched): ... print("Debug: do_stuff() got %s" % repr(matched)) ... # Do stuff here >>> m = Multiplex('someLongComplicatedOutput.sh') >>> m.expect('some pattern', do_stuff) >>> m.expect('some other pattern', do_stuff) >>> m.spawn() >>> # Instead of calling await() just call one read() at a time... >>> print(repr(m.read())) '' >>> print(repr(m.read())) # Oops, called read() too soon. Try again: 'some other pattern' >>> # Doh! Looks like 'some other pattern' comes first. Let's start over... >>> m.unexpect() # Called with no arguments, it empties m._patterns >>> m.terminate() # Tip: This will call unexpect() too so the line above really isn't necessary >>> m.expect('some other pattern', do_stuff) # This time this one will be first >>> m.expect('some pattern', do_stuff) >>> m.spawn() >>> print(repr(m.read())) # This time I waited a moment :) 'Debug: do_stuff() got "some other pattern"' 'some other pattern' >>> # Huzzah! Now let's see if 'some pattern' matches... >>> print(repr(m.read())) 'Debug: do_stuff() got "some pattern"' 'some pattern' >>> # As you can see, calling read() at-will in an interactive interpreter can be very handy. **About asynchronous use:** This mechanism is non-blocking (with the exception of `await`) and is meant to be used asynchronously. This means that if the running program has no output, `read` won't result in any patterns being matched. So you must be careful about timing *or* you need to ensure that `read` gets called either automatically when there's data to be read (IOLoop, EPoll, select, etc) or at regular intervals via a loop. Also, if you're not calling `read` at an interval (i.e. you're using a mechanism to detect when there's output to be read before calling it e.g. IOLoop) you need to ensure that `timeout_check` is called regularly anyway or timeouts won't get detected if there's no output from the underlying program. See the `MultiplexPOSIXIOLoop.read` override for an example of what this means and how to do it. """ # Create the Pattern object before we do anything else if isinstance(patterns, (str, unicode)): # Convert to a compiled regex (assume MULTILINE for the sanity of # the ignorant) patterns = re.compile(patterns, re.MULTILINE) if isinstance(patterns, (tuple, list)): # Ensure that all patterns are RegexObjects pattern_list = [] for pattern in patterns: if isinstance(pattern, str): pattern = re.compile(pattern) pattern_list.append(pattern) else: pattern_list.append(pattern) patterns = tuple(pattern_list) # No reason to keep it as a list # Convert timeout to a timedelta if necessary if timeout: # 0 or None mean "wait forever" if isinstance(timeout, (str, int, float)): timeout = timedelta(seconds=float(timeout)) elif not isinstance(timeout, timedelta): raise TypeError(_( "The timeout value must be a string, integer, float, or a " "timedelta object")) pattern_obj = Pattern(patterns, callback, optional=optional, sticky=sticky, errorback=errorback, preprocess=preprocess, timeout=timeout) if not position: self._patterns.append(pattern_obj) else: self._patterns.insert(position, pattern_obj) return hash(pattern_obj)
[docs] def unexpect(self, ref=None): """ Removes *ref* from self._patterns so it will no longer be checked against the incoming stream. If *ref* is None (the default), `self._patterns` will be emptied. """ if not ref: self._patterns = [] # Reset return for i, item in enumerate(self._patterns): if hash(item) == ref: self._patterns.pop(i)
[docs] def await(self, timeout=15, rows=24, cols=80, env=None, em_dimensions=None): """ Blocks until all non-optional patterns inside self._patterns have been removed *or* if the given *timeout* is reached. *timeout* may be an integer (in seconds) or a `datetime.timedelta` object. Returns True if all non-optional, non-sticky patterns were handled successfully. .. warning:: The timeouts attached to Patterns are set when they are created. Not when when you call :meth:`await`! As a convenience, if :meth:`isalive` resolves to False, :meth:`spawn` will be called automatically with *rows*, *cols*, and *env* given as arguments. await To wait with expectation. """ if not self.isalive(): self.spawn( rows=rows, cols=cols, env=env, em_dimensions=em_dimensions) start = datetime.now() # Convert timeout to a timedelta if necessary if isinstance(timeout, (str, int, float)): timeout = timedelta(seconds=float(timeout)) elif not isinstance(timeout, timedelta): raise TypeError(_( "The timeout value must be a string, integer, float, or a " "timedelta object")) remaining_patterns = True # This starts up the scheduler that constantly checks patterns self.read() # Remember: read() is non-blocking while remaining_patterns: # First we need to discount optional patterns remaining_patterns = False for pattern in self._patterns: if not pattern.optional and not pattern.sticky: remaining_patterns = True break # Now check if we've timed out if (datetime.now() - start) > timeout: raise Timeout("Lingered longer than %s" % timeout.seconds) # Lastly we perform a read() to ensure the output is processed self.read() # Remember: read() is non-blocking time.sleep(0.01) # So we don't eat up all the CPU return True
[docs] def terminate(self): """ This method must be overridden by suclasses of `BaseMultiplex`. It is expected to terminate/kill the child process. """ raise NotImplementedError(_( "terminate() *must* be overridden by subclasses."))
[docs] def _read(self, bytes=-1): """ This method must be overridden by subclasses of `BaseMultiplex`. It is expected that this method read the output from the running terminal program in a non-blocking way, pass the result into `term_write`, and then return the result. """ raise NotImplementedError(_( "_read() *must* be overridden by subclasses."))
[docs] def read(self, bytes=-1): """ Calls `_read` and checks if any timeouts have been reached in `self._patterns`. Returns the result of `_read`. """ result = self._read(bytes) # Perform checks for timeouts in self._patterns (used by self.expect()) self.timeout_check() return result
def write(self): raise NotImplementedError(_( "write() *must* be overridden by subclasses."))
[docs]class MultiplexPOSIXIOLoop(BaseMultiplex): """ The MultiplexPOSIXIOLoop class takes care of executing a child process on POSIX (aka Unix) systems and keeping track of its state via a terminal emulator (`terminal.Terminal` by default). If there's a started instance of :class:`tornado.ioloop.IOLoop`, handlers will be added to it that automatically keep the terminal emulator synchronized with the output of the child process. If there's no IOLoop (or it just isn't started), terminal applications can be interacted with by calling `MultiplexPOSIXIOLoop.read` (to write any pending output to the terminal emulator) and `MultiplexPOSIXIOLoop.write` (which writes directly to stdin of the child). .. note:: `MultiplexPOSIXIOLoop.read` is non-blocking. """ def __init__(self, *args, **kwargs): super(MultiplexPOSIXIOLoop, self).__init__(*args, **kwargs) from tornado import ioloop self.terminating = False self.sent_sigint = False self.env = {} self.io_loop = ioloop.IOLoop.instance() # Monitors child for activity #self.io_loop.set_blocking_signal_threshold(2, self._blocked_io_handler) #signal.signal(signal.SIGALRM, self._blocked_io_handler) self.reenable_timeout = None interval = 100 # A 0.1 second interval should be fast enough self.scheduler = ioloop.PeriodicCallback(self._timeout_checker,interval) self.exitstatus = None self._checking_patterns = False def __del__(self): """ Makes sure that the underlying terminal program is terminated so we don't leave things hanging around. """ logging.debug("MultiplexPOSIXIOLoop.__del__()") self.terminate()
[docs] def _call_callback(self, callback): """ If the IOLoop is started, adds the callback via :meth:`IOLoop.add_callback` to ensure it gets called at the next IOLoop iteration (which is thread safe). If the IOLoop isn't started *callback* will get called immediately and directly. """ try: if self.io_loop.running(): self.io_loop.add_callback(callback) else: callback() finally: del callback
[docs] def _reenable_output(self): """ Restarts capturing output from the underlying terminal program by disengaging the rate limiter. """ self.ratelimiter_engaged = False
def __reset_sent_sigint(self): self.sent_sigint = False
[docs] def _blocked_io_handler(self, signum=None, frame=None): """ Handles the situation where a terminal is blocking IO (usually because of too much output). This method would typically get called inside of `MultiplexPOSIXIOLoop._read` when the output of an fd is too noisy. """ if not self.isalive(): # This can happen if terminate() gets called too fast from another # thread... Strange stuff, mixing threading, signals, and # multiprocessing! return # Nothing to do logging.warning(_( "Noisy process (%s) kicked off rate limiter." % self.pid)) self.ratelimiter_engaged = True # CALLBACK_UPDATE is called here so the client can be made aware of the # fact that the rate limiter was engaged. for callback in self.callbacks[self.CALLBACK_UPDATE].values(): self._call_callback(callback) self.reenable_timeout = self.io_loop.add_timeout( timedelta(seconds=10), self._reenable_output)
[docs] def spawn(self, rows=24, cols=80, env=None, em_dimensions=None, exitfunc=None): """ Creates a new virtual terminal (tty) and executes self.cmd within it. Also attaches :meth:`self._ioloop_read_handler` to the IOLoop so that the terminal emulator will automatically stay in sync with the output of the child process. :cols: The number of columns to emulate on the virtual terminal (width) :rows: The number of rows to emulate (height). :env: Optional - A dictionary of environment variables to set when executing self.cmd. :em_dimensions: Optional - The dimensions of a single character within the terminal (only used when calculating the number of rows/cols images take up). :exitfunc: Optional - A function that will be called with the current Multiplex instance and its exit status when the child process terminates (*exitfunc(m_instance, statuscode)*). """ self.started = datetime.now() #signal.signal(signal.SIGCHLD, signal.SIG_IGN) # No zombies allowed logging.debug( "spawn(rows=%s, cols=%s, env=%s, em_dimensions=%s)" % ( rows, cols, repr(env), repr(em_dimensions))) rows = min(200, rows) # Max 200 to limit memory utilization cols = min(500, cols) # Max 500 for the same reason import pty pid, fd = pty.fork() if pid == 0: # We're inside the child process # Close all file descriptors other than stdin, stdout, and stderr (0, 1, 2) try: # This ensures that the child doesn't get the parent's FDs os.closerange(3, 256) except OSError: pass if not env: env = {} env["COLUMNS"] = str(cols) env["LINES"] = str(rows) env["TERM"] = "xterm-256color" # TODO: Needs to be configurable env["PATH"] = os.environ['PATH'] env["LANG"] = os.environ.get('LANG', 'en_US.UTF-8') env["PYTHONIOENCODING"] = "utf_8" # Setup stdout to be more Gate One friendly import termios # Fix missing termios.IUTF8 if 'IUTF8' not in termios.__dict__: termios.IUTF8 = 16384 # Hopefully not platform independent stdin = 0 stdout = 1 stderr = 2 attrs = termios.tcgetattr(stdout) iflag, oflag, cflag, lflag, ispeed, ospeed, cc = attrs # Enable flow control and UTF-8 input (probably not needed) iflag |= (termios.IXON | termios.IXOFF | termios.IUTF8) # OPOST: Enable post-processing of chars (not sure if this matters) # INLCR: We're disabling this so we don't get \r\r\n anywhere oflag |= (termios.OPOST | termios.ONLCR | termios.INLCR) attrs = [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] termios.tcsetattr(stdout, termios.TCSANOW, attrs) # Now do the same for stdin attrs = termios.tcgetattr(stdin) iflag, oflag, cflag, lflag, ispeed, ospeed, cc = attrs iflag |= (termios.IXON | termios.IXOFF | termios.IUTF8) oflag |= (termios.OPOST | termios.ONLCR | termios.INLCR) attrs = [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] termios.tcsetattr(stdin, termios.TCSANOW, attrs) # The sleep statement below ensures we capture all output from the # fd before it is closed... It turns out that IOLoop's response to # changes in the fd is so fast that it can result in the fd being # closed the very moment the Python interpreter is reading from it. cmd = ['/bin/sh', '-c', self.cmd + '; sleep .1'] os.dup2(2, 1) # Copy stderr to stdout (equivalent to 2>&1) os.execvpe(cmd[0], cmd, env) os._exit(0) else: # We're inside this Python script logging.debug("spawn() pid: %s" % pid) self._alive = True self.fd = fd self.env = env self.em_dimensions = em_dimensions self.exitfunc = exitfunc self.pid = pid self.time = time.time() try: self.term = self.terminal_emulator( rows=rows, cols=cols, em_dimensions=em_dimensions ) except TypeError: # Terminal emulator doesn't support em_dimensions. That's OK self.term = self.terminal_emulator( rows=rows, cols=cols ) # Tell our IOLoop instance to start watching the child self.io_loop.add_handler( fd, self._ioloop_read_handler, self.io_loop.READ) self.prev_output = {} # Set non-blocking so we don't wait forever for a read() import fcntl fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL) fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # Set the size of the terminal resize = partial(self.resize, rows, cols, ctrl_l=False) self.io_loop.add_timeout(timedelta(seconds=2), resize) return fd
[docs] def isalive(self): """ Checks the underlying process to see if it is alive and sets self._alive appropriately. """ if self.exitstatus == None: # This doesn't hurt anything if the process is still running if self.pid != -1: try: pid, status = os.waitpid(self.pid, os.WNOHANG) if pid: # pid is 0 if the process is still running self.exitstatus = os.WEXITSTATUS(status) except OSError: # This can happen if the program closes itself very quickly # immediately after being executed. if self.debug: # Useful to know when debugging... print(_( "Could not determine exit status for child with " "PID: %s\n" % self.pid )) print(_("Setting self.exitstatus to 99")) self.exitstatus = 99 # Seems like a good number if self._alive: # Re-check it try: os.kill(self.pid, 0) return self._alive except OSError: # Process is dead self._alive = False if self.debug: # Useful to know when debugging... print(_("Child exited with status: %s\n" % self.exitstatus)) # Call the exitfunc (if set) if self.exitfunc: self.exitfunc(self, self.exitstatus) self.exitfunc = None # Just in case return False else: return False
[docs] def resize(self, rows, cols, em_dimensions=None, ctrl_l=True): """ Resizes the child process's terminal window to *rows* and *cols* by first sending it a TIOCSWINSZ event and then sending ctrl-l. If *em_dimensions* are provided they will be updated along with the rows and cols. The sending of ctrl-l can be disabled by setting *ctrl_l* to False. """ logging.debug("Resizing term %s to rows: %s, cols: %s" % ( self.term_id, rows, cols)) if rows < 2: rows = 24 if cols < 2: cols = 80 self.rows = rows self.cols = cols self.term.resize(rows, cols, em_dimensions) # Sometimes the resize doesn't actually apply (for whatever reason) # so to get around this we have to send a different value than the # actual value we want then send our actual value. It's a bug outside # of Gate One that I have no idea how to isolate but this has proven to # be an effective workaround. import fcntl, termios s = struct.pack("HHHH", rows, cols, 0, 0) try: fcntl.ioctl(self.fd, termios.TIOCSWINSZ, s) except IOError: # Process already ended--no big deal return if ctrl_l: self.write(u'\x0c') # ctrl-l # SIGWINCH has been disabled since it can screw things up #os.kill(self.pid, signal.SIGWINCH) # Send the resize signal
[docs] def terminate(self): """ Kill the child process associated with `self.fd`. .. note:: If dtach is being used this only kills the dtach process. """ if not self.terminating: self.terminating = True else: return # Something else already called it logging.debug("terminate() self.pid: %s" % self.pid) if self.reenable_timeout: self.io_loop.remove_timeout(self.reenable_timeout) # Unset our blocked IO handler so there's no references to self hanging # around preventing us from freeing up memory try: self.io_loop.set_blocking_signal_threshold(None, None) except ValueError: pass # Can happen if this instance winds up in a thread for callback in self.callbacks[self.CALLBACK_EXIT].values(): self._call_callback(callback) # This try/except block *must* come before the exitfunc logic. # Otherwise, if the registered exitfunc raises an exception the IOLoop # will never stop watching self.fd; resulting in an infinite loop of # exitfunc. try: self.io_loop.remove_handler(self.fd) os.close(self.fd) except (KeyError, IOError, OSError): # This can happen when the fd is removed by the underlying process # before the next cycle of the IOLoop. Not really a problem. pass self.scheduler.stop() # NOTE: Without this 'del' we end up with a memory leak every time # a new instance of Multiplex is created. Apparently the references # inside of PeriodicCallback pointing to self prevent proper garbage # collection. del self.scheduler try: # TODO: Make this walk the series from SIGINT to SIGKILL import signal #os.kill(self.pid, signal.SIGINT) os.kill(self.pid, signal.SIGTERM) #os.kill(self.pid, signal.SIGKILL) except OSError: # The process is already dead--great. pass if not self.exitstatus: pid, status = os.waitpid(-1, os.WNOHANG) if pid: self.exitstatus = os.WEXITSTATUS(status) if self._patterns: self.timeout_check(timeout_now=True) self.unexpect() # Call the exitfunc (if set) if self.exitfunc: self.exitfunc(self, self.exitstatus) self.exitfunc = None # Reset all callbacks so there's nothing to prevent GC self.callbacks = { self.CALLBACK_UPDATE: {}, self.CALLBACK_EXIT: {}, } # Commented this out so that you can see what was in the terminal # emulator after the process terminates. #del self.term # Kick off a process that finalizes the log (updates metadata and # recompresses everything to save disk space) if not self.log_path: return # No log to finalize so we're done. self.log.close() # Write it out logging.info(_( "Finalizing the log for pid %s (this can take some time)." % self.pid )) PROC = Process( target=get_or_update_metadata, args=(self.log_path, self.user), kwargs={'force_update': True}) PROC.start()
[docs] def _ioloop_read_handler(self, fd, event): """ Read in the output of the process associated with *fd* and write it to `self.term`. :fd: The file descriptor of the child process. :event: An IOLoop event (e.g. IOLoop.READ). .. note:: This method is not meant to be called directly... The IOLoop should be the one calling it when it detects any given event on the fd. """ if event == self.io_loop.READ: self._call_callback(self.read) else: # Child died logging.debug(_( "Apparently fd %s just died (event: %s)" % (self.fd, event))) self.terminate()
[docs] def _read(self, bytes=-1): """ Reads at most *bytes* from the incoming stream, writes the result to the terminal emulator using `term_write`, and returns what was read. If *bytes* is -1 (default) it will read `self.fd` until there's no more output. Returns the result of all that reading. .. note:: Non-blocking. """ #logging.debug("MultiplexPOSIXIOLoop._read()") result = b"" try: with io.open(self.fd, 'rb', closefd=False, buffering=0) as reader: if bytes == -1: # Even though -1 indicates "read everything" we still # want to read in chunks so we can catch when an fd is # feeding us too much data (so we can engage the rate # limiter). bytes = 8192 # Should be plenty # If we need to block/read for longer than five seconds # the fd is outputting too much data. five_seconds = timedelta(seconds=5) loop_start = datetime.now() while True: updated = reader.read(bytes) if not updated: break if self.ratelimiter_engaged: # Truncate the output to the last 1024 chars self.term_write(updated[-1024:]) result += updated[-1024:] # NOTE: If we didn't truncate the output we'd # eventually have to process it all which would # take forever. break elif datetime.now() - loop_start > five_seconds: self._blocked_io_handler() result += updated self.term_write(updated) elif bytes: result = reader.read(bytes) self.term_write(result) except IOError as e: # IOErrors can happen when self.fd is closed before we finish # reading from it. Not a big deal. pass except OSError as e: logging.error("Got exception in read: %s" % `e`) except Exception as e: import traceback logging.error( "Got unhandled exception in read (???): %s" % `e`) traceback.print_exc(file=sys.stdout) if self.isalive(): self.terminate() finally: return result
[docs] def _timeout_checker(self): """ Runs `timeout_check` and if there are no more non-sticky patterns in :attr:`self._patterns`, stops :attr:`scheduler`. """ if not self._checking_patterns: self._checking_patterns = True remaining_patterns = self.timeout_check() if not remaining_patterns: # No reason to keep the PeriodicCallback going logging.debug("Stopping self.scheduler (no remaining patterns)") try: self.scheduler.stop() except AttributeError: # Now this is a neat trick: The way IOLoop works with its # stack_context thingamabob the scheduler doesn't actualy end up # inside the MultiplexPOSIXIOLoop instance inside of this # instance of _timeout_checker() *except* inside the main # thread. It is absolutely wacky but it works and works well :) pass self._checking_patterns = False
[docs] def read(self, bytes=-1): """ .. note:: This is an override of `BaseMultiplex.read` in order to take advantage of the IOLoop for ensuring `BaseMultiplex.expect` patterns timeout properly. Calls `_read` and checks if any timeouts have been reached in :attr:`self._patterns`. Returns the result of :meth:`_read`. This is an override of `BaseMultiplex.read` that will create a :class:`tornado.ioloop.PeriodicCallback` (as `self.scheduler`) that executes :attr:`timeout_check` at a regular interval. The `PeriodicCallback` will automatically cancel itself if there are no more non-sticky patterns in :attr:`self._patterns`. """ result = self._read(bytes) remaining_patterns = self.timeout_check() if remaining_patterns and not self.scheduler._running: # Start 'er up in case we don't get any more output logging.debug("Starting self.scheduler to check for timeouts") self.scheduler.start() self.isalive() # This just ensures the exitfunc is called (if necessary) return result
[docs] def _write(self, chars): """ Writes *chars* to `self.fd` (pretty straightforward). If IOError or OSError exceptions are encountered, will run `terminate`. All other exceptions are logged but no action will be taken. """ #logging.debug("MultiplexPOSIXIOLoop._write()") try: if self.ratelimiter_engaged: # This empties the fd buffer so the user gets immediate # responses to their keystrokes. Just note that this can # truncating a lot of data. with io.open( self.fd, 'rb', closefd=False, buffering=0) as reader: reader.read() # Empty it out # In testing, emptying out the fd read buffer in this way # increased responsiveness to user keystrokes when the rate # limiter is engaged by about 3-4 seconds (which is a lot!) self.ratelimiter_engaged = False # For reference, the time between reader.read() above and # writer.write() below is enough for the 'yes' command to # output a few lines. with io.open( self.fd, 'wt', encoding='UTF-8', closefd=False) as writer: writer.write(chars) except (IOError, OSError) as e: if self.isalive(): self.terminate() except Exception as e: logging.error("write() exception: %s" % e)
[docs] def write(self, chars): """ Calls `_write(*chars*)` via `_call_callback` to ensure thread safety. """ if not self.isalive(): raise ProgramTerminated(_("Child process is not running.")) write = partial(self._write, chars) self._call_callback(write) # Here's an example of how termio compares to pexpect: #import pexpect #child = pexpect.spawn ('ftp ftp.openbsd.org') #child.expect ('Name .*: ') #child.sendline ('anonymous') #child.expect ('Password:') #child.sendline ('noah@example.com') #child.expect ('ftp> ') #child.sendline ('cd pub') #child.expect('ftp> ') #child.sendline ('get ls-lR.gz') #child.expect('ftp> ') #child.sendline ('bye') # NOTE: Every expect() in the above example is a blocking call. # This is the same thing, rewritten using termio: #import termio #child = termio.Multiplex('ftp ftp.openbsd.org', debug=True) ## Expectations come first #child.expect('Name .*:', "anonymous\n") #child.expect('Password:', 'user@company.com\n') #child.expect('ftp>$', 'cd pub\n') #child.expect('ftp>$', 'get ls-lR.gz\n') #child.expect('ftp>$', 'bye\n') #child.await() # Blocks until all patterns have been matched or a timeout # NOTE: If this code were called inside of an already-started IOLoop there would # be no need to call await(). Everything would be asynchronous and non-blocking.
[docs]def spawn(cmd, rows=24, cols=80, env=None, em_dimensions=None, *args, **kwargs): """ A shortcut to:: >>> m = Multiplex(cmd, *args, **kwargs) >>> m.spawn(rows, cols, env) >>> return m """ m = Multiplex(cmd, *args, **kwargs) m.spawn(rows, cols, env, em_dimensions=em_dimensions) return m
[docs]def getstatusoutput(cmd, **kwargs): """ Emulates Python's commands.getstatusoutput() function using a Multiplex instance. Optionally, any additional keyword arguments (\*\*kwargs) provided will be passed to the spawn() command. """ # NOTE: This function is primarily here to provide an example of how to use # termio.Multiplex instances in a traditional, blocking manner. output = "" m = Multiplex(cmd) m.spawn(**kwargs) while m.isalive(): output += m.read() time.sleep(0.01) return (m.exitstatus, output)
if POSIX: Multiplex = MultiplexPOSIXIOLoop else: raise NotImplementedError(_( "termio currently only works on Unix platforms."))