# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
'''
Inform users about the progress and success/fail state of long-running tasks.
'''
import sys
import time
import logging
import threading
from shutil import get_terminal_size
logger = logging.getLogger('pyrocko.progress')
g_spinners = [
'⣾⣽⣻⢿⡿⣟⣯⣷',
'◴◷◶◵',
'\u25dc\u25dd\u25de\u25df',
'0123456789']
skull = u'\u2620'
check = u'\u2714'
cross = u'\u2716'
bar = u'[- ]'
blocks = u'\u2588\u2589\u258a\u258b\u258c\u258d\u258e\u258f '
symbol_done = check
symbol_failed = cross # skull
ansi_up = u'\033[%iA'
ansi_down = u'\033[%iB'
ansi_left = u'\033[%iC'
ansi_right = u'\033[%iD'
ansi_next_line = u'\033E'
ansi_previous_line = u'\033[1F'
ansi_erase_display = u'\033[2J'
ansi_window = u'\033[%i;%ir'
ansi_move_to = u'\033[%i;%iH'
ansi_clear_down = u'\033[0J'
ansi_clear_up = u'\033[1J'
ansi_clear = u'\033[2J'
ansi_clear_right = u'\033[0K'
ansi_scroll_up = u'\033D'
ansi_scroll_down = u'\033M'
ansi_reset = u'\033c'
ansi_save = u'\033 7'
ansi_restore = u'\033 8'
g_force_viewer_off = False
g_viewer = 'terminal'
[docs]def set_default_viewer(viewer):
'''
Set default viewer for progress indicators.
:param viewer:
Name of viewer, choices: ``'terminal'``, ``'gui'``, ``'log'``,
``'off'``, default: ``'terminal'``.
:type viewer:
str
'''
global g_viewer
assert viewer in g_viewer_classes
g_viewer = viewer
class StatusViewer(object):
def __init__(self, parent, interval=0.1, delay=1.0):
self._parent = parent
self._interval = interval
self._delay = delay
self._last_update = 0.0
self._created = time.time()
def __enter__(self):
return self
def __exit__(self, *_):
self._cleanup()
if self._parent:
self._parent._remove_viewer(self)
def _cleanup(self):
pass
def update(self, force):
now = time.time()
if now < self._created + self._delay:
return
if self._last_update + self._interval < now or force:
self._draw()
self._last_update = now
def _idle(self):
pass
def _draw(self):
pass
class DummyStatusViewer(StatusViewer):
pass
class LogStatusViewer(StatusViewer):
def __init__(self, parent):
StatusViewer.__init__(self, parent, delay=5., interval=5.)
def _draw(self):
lines = self._parent._render('log')
if lines:
logger.info(
'Progress:\n%s' % '\n'.join(' '+line for line in lines))
class TerminalStatusViewer(StatusViewer):
def __init__(self, parent):
StatusViewer.__init__(self, parent)
self._terminal_size = get_terminal_size()
self._height = 0
self._state = 0
self._nlines_max = 0
self._isatty = sys.stdout.isatty()
def _cleanup(self):
if self._state == 1:
sx, sy = self._terminal_size
self._reset()
self._flush()
self._state = 2
def _draw(self):
lines = self._parent._render('terminal')
if self._state == 0:
self._state = 1
if self._state != 1:
return
self._terminal_size = get_terminal_size()
sx, sy = self._terminal_size
nlines = len(lines)
if self._nlines_max < nlines:
self._nlines_max = nlines
self._resize(self._nlines_max)
self._start_show()
for i in range(self._nlines_max - nlines):
lines.append('')
for iline, line in enumerate(reversed(lines)):
if len(line) > sx - 1:
line = line[:sx-1]
self._print(ansi_clear_right + line)
if iline != self._nlines_max - 1:
self._print(ansi_next_line)
self._end_show()
self._flush()
def _print(self, s):
if self._isatty:
print(s, end='', file=sys.stderr)
def _flush(self):
if self._isatty:
print('', end='', flush=True, file=sys.stderr)
def _reset(self):
sx, sy = self._terminal_size
self._print(ansi_window % (1, sy))
self._print(ansi_move_to % (sy-self._height, 1))
self._print(ansi_clear_down)
self._height = 0
def _resize(self, height):
sx, sy = self._terminal_size
k = height - self._height
if k > 0:
self._print(ansi_scroll_up * k)
self._print(ansi_window % (1, sy-height))
if k < 0:
self._print(ansi_window % (1, sy-height))
self._print(ansi_scroll_down * abs(k))
self._height = height
def _start_show(self):
sx, sy = self._terminal_size
self._print(ansi_move_to % (sy-self._height+1, 1))
def _end_show(self):
sx, sy = self._terminal_size
self._print(ansi_move_to % (sy-self._height, 1))
self._print(ansi_clear_right)
def make_TaskView():
from pyrocko.gui.qt_compat import qw, qg
class TaskView(qw.QFrame):
def __init__(self, task):
qw.QFrame.__init__(self)
layout = qw.QGridLayout()
self._task = task
label = qw.QLabel(task._render('terminal'))
label.setFont(qg.QFont('monospace'))
layout.addWidget(label, 0, 0)
self._label = label
self.setLayout(layout)
def update_progress(self):
self._label.setText(self._task._render('terminal'))
return TaskView
TaskView = None
def get_TaskView():
global TaskView
if TaskView is None:
TaskView = make_TaskView()
return TaskView
def make_GUIStatusViewer():
from pyrocko.gui.qt_compat import qc, qw
from pyrocko.gui import util as gui_util
class DontShrinkFrame(qw.QFrame):
def __init__(self):
qw.QFrame.__init__(self)
self._size = None
def sizeHint(self):
size = qw.QFrame.sizeHint(self)
if self._size is not None:
size = qc.QSize(
size.width(),
max(self._size.height(), size.height()))
self._size = size
return size
class GUIStatusViewer(StatusViewer):
def __init__(self, parent):
StatusViewer.__init__(self, parent)
frame = DontShrinkFrame()
frame.setWindowTitle('Progress')
layout_outer = qw.QVBoxLayout()
frame.setLayout(layout_outer)
dummy = qw.QFrame()
dummy.setSizePolicy(qw.QSizePolicy(
qw.QSizePolicy.Expanding,
qw.QSizePolicy.Expanding))
layout_outer.addWidget(dummy)
layout = qw.QVBoxLayout()
layout_outer.insertLayout(-1, layout)
layout.setDirection(qw.QBoxLayout.BottomToTop)
self._frame = frame
self._layout = layout
self._task_views = {}
self._timers = []
self._last_size = None
def _cleanup(self):
pass
def _draw(self):
if not threading.current_thread().name == 'MainThread':
return
tasks = self._parent._tasks
task_views = self._task_views
add = [
task_id for task_id in tasks if task_id not in task_views]
for task_id in add:
self._add_task_view(task_id)
remove = [
task_id for task_id in task_views if task_id not in tasks]
for task_id in remove:
self._remove_task_view(task_id)
if task_views and not self._frame.isVisible():
self._frame.show()
if not task_views and self._layout.count() == 0 \
and self._frame.isVisible():
self._frame.hide()
for task_view in task_views.values():
task_view.update_progress()
app = gui_util.get_app()
app.processEvents()
def _add_task_view(self, task_id):
view = get_TaskView()(self._parent._tasks[task_id])
self._task_views[task_id] = view
self._layout.addWidget(view)
def _remove_task_view(self, task_id):
view = self._task_views.pop(task_id)
timer = qc.QTimer()
self._timers.append(timer)
def do_remove():
view.hide()
self._layout.removeWidget(view)
self._timers.remove(timer)
if False:
timer.setSingleShot(True)
timer.setInterval(500)
timer.timeout.connect(do_remove)
timer.start()
else:
do_remove()
def _idle(self):
self._draw()
return GUIStatusViewer
GUIStatusViewer = None
def get_GUIStatusViewer():
global GUIStatusViewer
if GUIStatusViewer is None:
GUIStatusViewer = make_GUIStatusViewer()
return GUIStatusViewer
def get_gui_status_viewer(parent):
from pyrocko import gui_util
app = gui_util.get_app()
win = app.get_main_window()
if win and hasattr(win, 'get_status_viewer'):
viewer = win.get_status_viewer(parent)
if viewer:
return viewer
return get_GUIStatusViewer()(parent)
class Task(object):
def __init__(
self, progress, id, name, n, state='working', logger=None,
group=None, spinner=g_spinners[0]):
self._id = id
self._name = name
self._condition = ''
self._ispin0 = 0
self._ispin0_last = 0
self._ispin = 0
self._spinner = spinner
self._i = None
self._n = n
self._done = False
assert state in ('waiting', 'working')
self._state = state
self._progress = progress
self._logger = logger
self._tcreate = time.time()
self._group = group
self._lock = threading.RLock()
self._views = {}
def __enter__(self):
return self
def __exit__(self, type, value, tb):
if type is None:
self.done()
else:
self.fail()
def __call__(self, it):
try:
self._n = len(it)
except TypeError:
self._n = None
clean = False
try:
n = 0
for obj in it:
self.update(n)
yield obj
n += 1
self.update(n)
clean = True
finally:
if clean:
self.done()
else:
self.fail()
def task(self, *args, **kwargs):
kwargs['group'] = self
return self._progress.task(*args, **kwargs)
def update(self, i=None, condition=''):
with self._lock:
self._ispin0 += 1
self._state = 'working'
self._condition = condition
if i is not None:
if self._n is not None:
i = min(i, self._n)
self._i = i
self._progress._update(False)
def done(self, condition=''):
with self._lock:
self.duration = time.time() - self._tcreate
if self._state in ('done', 'failed'):
return
self._condition = condition
self._state = 'done'
self._log(str(self))
self._progress._task_end(self)
def fail(self, condition=''):
with self._lock:
self.duration = time.time() - self._tcreate
if self._state in ('done', 'failed'):
return
self._condition = condition
self._state = 'failed'
self._log(str(self))
self._progress._task_end(self)
def _log(self, s):
if self._logger is not None:
self._logger.debug(s)
def _get_group_time_start(self):
if self._group:
return self._group._get_group_time_start()
else:
return self._tcreate
def _str_state(self):
s = self._state
if s == 'waiting':
return ' '
elif s == 'working':
if self._ispin0_last != self._ispin0:
self._ispin += 1
self._ispin0_last = self._ispin0
return self._spinner[self._ispin % len(self._spinner)] + ' '
elif s == 'done':
return symbol_done + ' '
elif s == 'failed':
return symbol_failed + ' '
else:
return '? '
def _idisplay(self):
i = self._i
if self._n is not None and i > self._n:
i = self._n
return i
def _str_progress(self):
if self._i is None:
return self._state
elif self._n is None:
if self._state != 'working':
return '... %s (%i)' % (self._state, self._idisplay())
else:
return '%i' % self._idisplay()
else:
if self._state == 'working':
nw = len(str(self._n))
return (('%' + str(nw) + 'i / %i') % (
self._idisplay(), self._n)).center(11)
elif self._state == 'failed':
return '... %s (%i / %i)' % (
self._state, self._idisplay(), self._n)
else:
return '... %s (%i)' % (self._state, self._n)
def _str_percent(self):
if self._state == 'working' and self._n is not None and self._n >= 4 \
and self._i is not None:
return '%3.0f%%' % ((100. * self._i) / self._n)
else:
return ''
def _str_condition(self):
if self._condition:
return '%s' % self._condition
else:
return ''
def _str_bar(self):
if self._state == 'working' and self._n is not None and self._n >= 4 \
and self._i is not None:
nb = 20
fb = nb * float(self._i) / self._n
ib = int(fb)
ip = int((fb - ib) * (len(blocks)-1))
if ib == 0 and ip == 0:
ip = 1 # indication of start
s = blocks[0] * ib
if ib < nb:
s += blocks[-1-ip] + (nb - ib - 1) * blocks[-1] + blocks[-2]
# s = ' ' + bar[0] + bar[1] * ib + bar[2] * (nb - ib) + bar[3]
return s
else:
return ''
def _render(self, style):
if style == 'terminal':
return '%s%-23s %-11s %s%-4s %s' % (
self._str_state(),
self._name,
self._str_progress(),
self._str_bar(),
self._str_percent(),
self._str_condition())
elif style == 'log':
return '%s: %-23s %s%-4s %s' % (
self._state,
self._name,
self._str_progress(),
self._str_percent(),
self._str_condition())
else:
return ''
def __str__(self):
return '%s%-23s %s%-4s %s' % (
self._str_state(),
self._name,
self._str_progress(),
self._str_percent(),
self._str_condition())
class Progress(object):
def __init__(self):
self._current_id = 0
self._tasks = {}
self._viewers = []
self._lock = threading.RLock()
self._isatty = sys.stdout.isatty()
def view(self, viewer=None):
if g_force_viewer_off or self._viewers:
viewer = 'off'
elif viewer is None:
viewer = g_viewer
if not self._isatty and viewer == 'terminal':
logger.debug('No tty attached, switching to log progress viewer.')
viewer = 'log'
try:
viewer = g_viewer_classes[viewer](self)
except KeyError:
raise ValueError('Invalid viewer choice: %s' % viewer)
self._viewers.append(viewer)
return viewer
def task(self, name, n=None, logger=None, group=None):
with self._lock:
self._current_id += 1
task = Task(
self, self._current_id, name, n, logger=logger, group=group)
self._tasks[task._id] = task
self._update(True)
return task
def idle(self):
for viewer in self._viewers:
viewer._idle()
def _remove_viewer(self, viewer):
self._update(True)
self._viewers.remove(viewer)
def _task_end(self, task):
with self._lock:
self._update(True)
del self._tasks[task._id]
self._update(True)
def _update(self, force):
with self._lock:
for viewer in self._viewers:
viewer.update(force)
def _render(self, style):
task_ids = sorted(self._tasks)
lines = []
for task_id in task_ids:
task = self._tasks[task_id]
lines.extend(task._render(style).splitlines())
return lines
def _debug_log(self):
logger.debug(
'Viewers: %s\n Tasks active: %i\n Tasks total: %i',
', '.join(viewer.__class__.__name__ for viewer in self._viewers)
if self._viewers else 'none',
len(self._tasks),
self._current_id)
g_viewer_classes = {
'terminal': TerminalStatusViewer,
'gui': get_gui_status_viewer,
'log': LogStatusViewer,
'off': DummyStatusViewer}
g_progress = Progress()
progress = g_progress # compatibility
view = g_progress.view
task = g_progress.task
idle = g_progress.idle