Coverage for /usr/local/lib/python3.11/dist-packages/grond/monitor.py: 97%
134 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-26 16:25 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-26 16:25 +0000
1from __future__ import print_function, absolute_import
3import time
4import logging
5import threading
6import os.path as op
7import numpy as num
8from datetime import timedelta
10from pyrocko import util, guts
11from grond.environment import Environment
14logger = logging.getLogger('grond.monit')
17class RingBuffer(num.ndarray):
18 def __new__(cls, *args, **kwargs):
19 cls = num.ndarray.__new__(cls, *args, **kwargs)
20 cls.fill(0.)
21 return cls
23 def __init__(self, *args, **kwargs):
24 self.pos = 0
26 def put(self, value):
27 self[self.pos] = value
28 self.pos += 1
29 self.pos %= self.size
32class color:
33 PURPLE = '\033[95m'
34 CYAN = '\033[96m'
35 DARKCYAN = '\033[36m'
36 BLUE = '\033[94m'
37 GREEN = '\033[92m'
38 YELLOW = '\033[93m'
39 RED = '\033[91m'
40 BOLD = '\033[1m'
41 UNDERLINE = '\033[4m'
42 END = '\033[0m'
45class TerminalMonitor(object):
47 def __init__(self, nlines_scroll=10):
48 self.nlines_scroll = nlines_scroll
50 def __enter__(self):
51 print('\033[2J\033[1;%ir\033[%i;1H'
52 % (self.nlines_scroll, self.nlines_scroll-1), end=None)
53 return self
55 def __exit__(self, type, value, traceback):
56 print('\033[r\033[%i;1H\033[0J\033[%i;1H'
57 % (self.nlines_scroll+1, self.nlines_scroll-1))
59 def _start_show(self):
60 print('\033[%i;1H\033[0J' % (self.nlines_scroll+1), end=None)
62 def _end_show(self):
63 print('\033[%i;1H' % (self.nlines_scroll-1), end=None)
65 def show(self, s):
66 self._start_show()
67 print(s, end=None)
68 self._end_show()
71class GrondMonitor(threading.Thread):
73 col_width = 15
74 row_name = color.BOLD + '{:<{col_param_width}s}' + color.END
75 parameter_fmt = '{:{col_width}s}'
77 def __init__(self, rundir):
78 threading.Thread.__init__(self)
79 self.rundir = rundir
80 self.daemon = True
82 self.sig_terminate = threading.Event()
83 self.iter_per_second = 0
84 self._iiter = 0
85 self._iter_buffer = RingBuffer(20)
86 self._tm = None
88 def run(self):
89 logger.info('Waiting to follow environment %s...', self.rundir)
90 env = Environment.discover(self.rundir)
91 if env is None:
92 logger.error('Could not attach to Grond environment.')
93 return
95 self.environment = env
96 self.history = self.environment.get_history()
98 optimiser_fn = op.join(self.rundir, 'optimiser.yaml')
99 self.optimiser = guts.load(filename=optimiser_fn)
101 self.problem = self.history.problem
102 self.niter = self.optimiser.niterations
104 self.starttime = time.time()
105 self.last_update = self.starttime
107 self.history.add_listener(self)
109 with TerminalMonitor(10) as tm:
111 self._tm = tm
113 ii = 0
114 while True:
115 ii += 1
116 self.history.update()
117 time.sleep(0.1)
118 if self.sig_terminate.is_set():
119 break
121 logger.debug('Monitor thread exiting.')
123 @property
124 def runtime(self):
125 return timedelta(seconds=round(time.time() - self.starttime))
127 @property
128 def iiter(self):
129 return self._iiter
131 @iiter.setter
132 def iiter(self, iiter):
133 dt = time.time() - self.last_update
134 self._iter_buffer.put(float((iiter - self.iiter) / dt))
135 self.iter_per_second = float(self._iter_buffer.mean())
136 self._iiter = iiter
137 self.last_update = time.time()
139 @property
140 def runtime_remaining(self):
141 if self.iter_per_second == 0.:
142 return timedelta()
143 return timedelta(seconds=round((self.niter - self.iiter)
144 / self.iter_per_second))
146 def extend(self, *args):
147 ''' Connected and called through the self.history.add_listener '''
148 self.iiter = self.history.nmodels
149 problem = self.history.problem
150 optimiser_status = self.optimiser.get_status(self.history)
151 row_names = optimiser_status.row_names
153 lines = []
154 lnadd = lines.append
156 def fmt(s):
157 return util.gform(s, significant_digits=(self.col_width-1-6)//2)
159 lnadd('Problem: {p.name}'
160 '\t({s.runtime} - remaining {s.runtime_remaining}'
161 ' @ {s.iter_per_second:.1f} iter/s)'
162 .format(s=self, p=problem))
163 lnadd('Iteration: {s.iiter} / {s.niter}'
164 .format(s=self))
165 if optimiser_status.extra_header is not None:
166 lnadd(optimiser_status.extra_header)
168 col_param_width = max([len(p) for p in row_names]) + 2
170 out_ln = self.row_name +\
171 ''.join([self.parameter_fmt] * optimiser_status.ncolumns)
173 lnadd(out_ln.format(
174 *['Parameter'] + list(optimiser_status.column_names),
175 col_param_width=col_param_width,
176 col_width=self.col_width,
177 type='s'))
179 for ip, parameter_name in enumerate(row_names):
180 lnadd(out_ln.format(
181 parameter_name,
182 *[fmt(v[ip]) for v in optimiser_status.values],
183 col_param_width=col_param_width,
184 col_width=self.col_width))
186 if optimiser_status.extra_footer is not None:
187 lnadd(optimiser_status.extra_footer)
189 self._tm.show('\n'.join(lines))
191 def terminate(self):
192 logger.debug('Setting thread termination flag.')
193 self.sig_terminate.set()
194 self.join()
196 @classmethod
197 def watch(cls, rundir):
198 monitor = cls(rundir)
199 monitor.start()
200 return monitor