Source code for pyrocko.progress

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------

'''
Inform users about the progress and success/fail state of long-running tasks.
'''

import sys
import time
import logging
import threading

from shutil import get_terminal_size

logger = logging.getLogger('pyrocko.progress')

g_spinners = [
    '⣾⣽⣻⢿⡿⣟⣯⣷',
    '◴◷◶◵',
    '\u25dc\u25dd\u25de\u25df',
    '0123456789']


skull = u'\u2620'
check = u'\u2714'
cross = u'\u2716'
bar = u'[- ]'
blocks = u'\u2588\u2589\u258a\u258b\u258c\u258d\u258e\u258f '

symbol_done = check
symbol_failed = cross  # skull

ansi_up = u'\033[%iA'
ansi_down = u'\033[%iB'
ansi_left = u'\033[%iC'
ansi_right = u'\033[%iD'
ansi_next_line = u'\033E'
ansi_previous_line = u'\033[1F'

ansi_erase_display = u'\033[2J'
ansi_window = u'\033[%i;%ir'
ansi_move_to = u'\033[%i;%iH'

ansi_clear_down = u'\033[0J'
ansi_clear_up = u'\033[1J'
ansi_clear = u'\033[2J'

ansi_clear_right = u'\033[0K'

ansi_scroll_up = u'\033D'
ansi_scroll_down = u'\033M'

ansi_reset = u'\033c'

ansi_save = u'\033 7'
ansi_restore = u'\033 8'


g_force_viewer_off = False

g_viewer = 'terminal'


[docs]def set_default_viewer(viewer): ''' Set default viewer for progress indicators. :param viewer: Name of viewer, choices: ``'terminal'``, ``'log'``, ``'off'``, default: ``'terminal'``. :type viewer: str ''' global g_viewer assert viewer in g_viewer_classes g_viewer = viewer
class StatusViewer(object): def __init__(self, parent, interval=0.1, delay=1.0): self._parent = parent self._interval = interval self._delay = delay self._last_update = 0.0 self._created = time.time() def __enter__(self): return self def __exit__(self, *_): self.cleanup() if self._parent: self._parent._remove_viewer(self) def cleanup(self): pass def update(self, force): now = time.time() if now < self._created + self._delay: return if self._last_update + self._interval < now or force: self._draw() self._last_update = now def _draw(self): pass class DummyStatusViewer(StatusViewer): pass class LogStatusViewer(StatusViewer): def __init__(self, parent): StatusViewer.__init__(self, parent, delay=5., interval=5.) def _draw(self): lines = self._parent._render('log') if lines: logger.info( 'Progress:\n%s' % '\n'.join(' '+line for line in lines)) class TerminalStatusViewer(StatusViewer): def __init__(self, parent): StatusViewer.__init__(self, parent) self._terminal_size = get_terminal_size() self._height = 0 self._state = 0 self._nlines_max = 0 self._isatty = sys.stdout.isatty() def cleanup(self): if self._state == 1: sx, sy = self._terminal_size self._reset() self._flush() self._state = 2 def _draw(self): lines = self._parent._render('terminal') if self._state == 0: self._state = 1 if self._state != 1: return self._terminal_size = get_terminal_size() sx, sy = self._terminal_size nlines = len(lines) if self._nlines_max < nlines: self._nlines_max = nlines self._resize(self._nlines_max) self._start_show() for i in range(self._nlines_max - nlines): lines.append('') for iline, line in enumerate(reversed(lines)): if len(line) > sx - 1: line = line[:sx-1] self._print(ansi_clear_right + line) if iline != self._nlines_max - 1: self._print(ansi_next_line) self._end_show() self._flush() def _print(self, s): if self._isatty: print(s, end='', file=sys.stderr) def _flush(self): if self._isatty: print('', end='', flush=True, file=sys.stderr) def _reset(self): sx, sy = self._terminal_size self._print(ansi_window % (1, sy)) self._print(ansi_move_to % (sy-self._height, 1)) self._print(ansi_clear_down) self._height = 0 def _resize(self, height): sx, sy = self._terminal_size k = height - self._height if k > 0: self._print(ansi_scroll_up * k) self._print(ansi_window % (1, sy-height)) if k < 0: self._print(ansi_window % (1, sy-height)) self._print(ansi_scroll_down * abs(k)) self._height = height def _start_show(self): sx, sy = self._terminal_size self._print(ansi_move_to % (sy-self._height+1, 1)) def _end_show(self): sx, sy = self._terminal_size self._print(ansi_move_to % (sy-self._height, 1)) self._print(ansi_clear_right) class Task(object): def __init__( self, progress, id, name, n, state='working', logger=None, group=None, spinner=g_spinners[0]): self._id = id self._name = name self._condition = '' self._ispin0 = 0 self._ispin0_last = 0 self._ispin = 0 self._spinner = spinner self._i = None self._n = n self._done = False assert state in ('waiting', 'working') self._state = state self._progress = progress self._logger = logger self._tcreate = time.time() self._group = group self._lock = threading.RLock() def __enter__(self): return self def __exit__(self, type, value, tb): if type is None: self.done() else: self.fail() def __call__(self, it): try: self._n = len(it) except TypeError: self._n = None clean = False try: n = 0 for obj in it: self.update(n) yield obj n += 1 self.update(n) clean = True finally: if clean: self.done() else: self.fail() def task(self, *args, **kwargs): kwargs['group'] = self return self._progress.task(*args, **kwargs) def update(self, i=None, condition=''): with self._lock: self._ispin0 += 1 self._state = 'working' self._condition = condition if i is not None: if self._n is not None: i = min(i, self._n) self._i = i self._progress._update(False) def done(self, condition=''): with self._lock: self.duration = time.time() - self._tcreate if self._state in ('done', 'failed'): return self._condition = condition self._state = 'done' self._log(str(self)) self._progress._task_end(self) def fail(self, condition=''): with self._lock: self.duration = time.time() - self._tcreate if self._state in ('done', 'failed'): return self._condition = condition self._state = 'failed' self._log(str(self)) self._progress._task_end(self) def _log(self, s): if self._logger is not None: self._logger.debug(s) def _get_group_time_start(self): if self._group: return self._group._get_group_time_start() else: return self._tcreate def _str_state(self): s = self._state if s == 'waiting': return ' ' elif s == 'working': if self._ispin0_last != self._ispin0: self._ispin += 1 self._ispin0_last = self._ispin0 return self._spinner[self._ispin % len(self._spinner)] + ' ' elif s == 'done': return symbol_done + ' ' elif s == 'failed': return symbol_failed + ' ' else: return '? ' def _idisplay(self): i = self._i if self._n is not None and i > self._n: i = self._n return i def _str_progress(self): if self._i is None: return self._state elif self._n is None: if self._state != 'working': return '... %s (%i)' % (self._state, self._idisplay()) else: return '%i' % self._idisplay() else: if self._state == 'working': nw = len(str(self._n)) return (('%' + str(nw) + 'i / %i') % ( self._idisplay(), self._n)).center(11) elif self._state == 'failed': return '... %s (%i / %i)' % ( self._state, self._idisplay(), self._n) else: return '... %s (%i)' % (self._state, self._n) def _str_percent(self): if self._state == 'working' and self._n is not None and self._n >= 4 \ and self._i is not None: return '%3.0f%%' % ((100. * self._i) / self._n) else: return '' def _str_condition(self): if self._condition: return '%s' % self._condition else: return '' def _str_bar(self): if self._state == 'working' and self._n is not None and self._n >= 4 \ and self._i is not None: nb = 20 fb = nb * float(self._i) / self._n ib = int(fb) ip = int((fb - ib) * (len(blocks)-1)) if ib == 0 and ip == 0: ip = 1 # indication of start s = blocks[0] * ib if ib < nb: s += blocks[-1-ip] + (nb - ib - 1) * blocks[-1] + blocks[-2] # s = ' ' + bar[0] + bar[1] * ib + bar[2] * (nb - ib) + bar[3] return s else: return '' def _render(self, style): if style == 'terminal': return '%s%-23s %-11s %s%-4s %s' % ( self._str_state(), self._name, self._str_progress(), self._str_bar(), self._str_percent(), self._str_condition()) elif style == 'log': return '%s: %-23s %s%-4s %s' % ( self._state, self._name, self._str_progress(), self._str_percent(), self._str_condition()) else: return '' def __str__(self): return '%s%-23s %s%-4s %s' % ( self._str_state(), self._name, self._str_progress(), self._str_percent(), self._str_condition()) class Progress(object): def __init__(self): self._current_id = 0 self._tasks = {} self._viewers = [] self._lock = threading.RLock() self._isatty = sys.stdout.isatty() def view(self, viewer=None): if g_force_viewer_off or self._viewers: viewer = 'off' elif viewer is None: viewer = g_viewer if not self._isatty and viewer == 'terminal': logger.debug('No tty attached, switching to log progress viewer.') viewer = 'log' try: viewer = g_viewer_classes[viewer](self) except KeyError: raise ValueError('Invalid viewer choice: %s' % viewer) self._viewers.append(viewer) return viewer def _remove_viewer(self, viewer): self._update(True) self._viewers.remove(viewer) def task(self, name, n=None, logger=None, group=None): with self._lock: self._current_id += 1 task = Task( self, self._current_id, name, n, logger=logger, group=group) self._tasks[task._id] = task self._update(True) return task def _task_end(self, task): with self._lock: self._update(True) del self._tasks[task._id] self._update(True) def _update(self, force): with self._lock: for viewer in self._viewers: viewer.update(force) def _render(self, style): task_ids = sorted(self._tasks) lines = [] for task_id in task_ids: task = self._tasks[task_id] lines.extend(task._render(style).splitlines()) return lines def _debug_log(self): logger.debug( 'Viewers: %s\n Tasks active: %i\n Tasks total: %i', ', '.join(viewer.__class__.__name__ for viewer in self._viewers) if self._viewers else 'none', len(self._tasks), self._current_id) g_viewer_classes = { 'terminal': TerminalStatusViewer, 'log': LogStatusViewer, 'off': DummyStatusViewer} g_progress = Progress() progress = g_progress # compatibility view = g_progress.view task = g_progress.task