Coverage for /usr/local/lib/python3.11/dist-packages/grond/monitor.py: 97%
133 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-11-27 15:15 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-11-27 15:15 +0000
1# https://pyrocko.org/grond - GPLv3
2#
3# The Grond Developers, 21st Century
4import time
5import logging
6import threading
7import os.path as op
8import numpy as num
9from datetime import timedelta
11from pyrocko import util, guts
12from grond.environment import Environment
15logger = logging.getLogger('grond.monit')
18class RingBuffer(num.ndarray):
19 def __new__(cls, *args, **kwargs):
20 cls = num.ndarray.__new__(cls, *args, **kwargs)
21 cls.fill(0.)
22 return cls
24 def __init__(self, *args, **kwargs):
25 self.pos = 0
27 def put(self, value):
28 self[self.pos] = value
29 self.pos += 1
30 self.pos %= self.size
33class color:
34 PURPLE = '\033[95m'
35 CYAN = '\033[96m'
36 DARKCYAN = '\033[36m'
37 BLUE = '\033[94m'
38 GREEN = '\033[92m'
39 YELLOW = '\033[93m'
40 RED = '\033[91m'
41 BOLD = '\033[1m'
42 UNDERLINE = '\033[4m'
43 END = '\033[0m'
46class TerminalMonitor(object):
48 def __init__(self, nlines_scroll=10):
49 self.nlines_scroll = nlines_scroll
51 def __enter__(self):
52 print('\033[2J\033[1;%ir\033[%i;1H'
53 % (self.nlines_scroll, self.nlines_scroll-1), end=None)
54 return self
56 def __exit__(self, type, value, traceback):
57 print('\033[r\033[%i;1H\033[0J\033[%i;1H'
58 % (self.nlines_scroll+1, self.nlines_scroll-1))
60 def _start_show(self):
61 print('\033[%i;1H\033[0J' % (self.nlines_scroll+1), end=None)
63 def _end_show(self):
64 print('\033[%i;1H' % (self.nlines_scroll-1), end=None)
66 def show(self, s):
67 self._start_show()
68 print(s, end=None)
69 self._end_show()
72class GrondMonitor(threading.Thread):
74 col_width = 15
75 row_name = color.BOLD + '{:<{col_param_width}s}' + color.END
76 parameter_fmt = '{:{col_width}s}'
78 def __init__(self, rundir):
79 threading.Thread.__init__(self)
80 self.rundir = rundir
81 self.daemon = True
83 self.sig_terminate = threading.Event()
84 self.iter_per_second = 0
85 self._iiter = 0
86 self._iter_buffer = RingBuffer(20)
87 self._tm = None
89 def run(self):
90 logger.info('Waiting to follow environment %s...' % self.rundir)
91 env = Environment.discover(self.rundir)
92 if env is None:
93 logger.error('Could not attach to Grond environment.')
94 return
96 self.environment = env
97 self.history = self.environment.get_history()
99 optimiser_fn = op.join(self.rundir, 'optimiser.yaml')
100 self.optimiser = guts.load(filename=optimiser_fn)
102 self.problem = self.history.problem
103 self.niter = self.optimiser.niterations
105 self.starttime = time.time()
106 self.last_update = self.starttime
108 self.history.add_listener(self)
110 with TerminalMonitor(10) as tm:
112 self._tm = tm
114 ii = 0
115 while True:
116 ii += 1
117 self.history.update()
118 time.sleep(0.1)
119 if self.sig_terminate.is_set():
120 break
122 logger.debug('Monitor thread exiting.')
124 @property
125 def runtime(self):
126 return timedelta(seconds=round(time.time() - self.starttime))
128 @property
129 def iiter(self):
130 return self._iiter
132 @iiter.setter
133 def iiter(self, iiter):
134 dt = time.time() - self.last_update
135 self._iter_buffer.put(float((iiter - self.iiter) / dt))
136 self.iter_per_second = float(self._iter_buffer.mean())
137 self._iiter = iiter
138 self.last_update = time.time()
140 @property
141 def runtime_remaining(self):
142 if self.iter_per_second == 0.:
143 return timedelta()
144 return timedelta(seconds=round((self.niter - self.iiter)
145 / self.iter_per_second))
147 def extend(self, *args):
148 ''' Connected and called through the self.history.add_listener '''
149 self.iiter = self.history.nmodels
150 problem = self.history.problem
151 optimiser_status = self.optimiser.get_status(self.history)
152 row_names = optimiser_status.row_names
154 lines = []
155 lnadd = lines.append
157 def fmt(s):
158 return util.gform(s, significant_digits=(self.col_width-1-6)//2)
160 lnadd('Problem: {p.name}'
161 '\t({s.runtime} - remaining {s.runtime_remaining}'
162 ' @ {s.iter_per_second:.1f} iter/s)'
163 .format(s=self, p=problem))
164 lnadd('Iteration: {s.iiter} / {s.niter}'
165 .format(s=self))
166 if optimiser_status.extra_header is not None:
167 lnadd(optimiser_status.extra_header)
169 col_param_width = max([len(p) for p in row_names]) + 2
171 out_ln = self.row_name +\
172 ''.join([self.parameter_fmt] * optimiser_status.ncolumns)
174 lnadd(out_ln.format(
175 *['Parameter'] + list(optimiser_status.column_names),
176 col_param_width=col_param_width,
177 col_width=self.col_width,
178 type='s'))
180 for ip, parameter_name in enumerate(row_names):
181 lnadd(out_ln.format(
182 parameter_name,
183 *[fmt(v[ip]) for v in optimiser_status.values],
184 col_param_width=col_param_width,
185 col_width=self.col_width))
187 if optimiser_status.extra_footer is not None:
188 lnadd(optimiser_status.extra_footer)
190 self._tm.show('\n'.join(lines))
192 def terminate(self):
193 logger.debug('Setting thread termination flag.')
194 self.sig_terminate.set()
195 self.join()
197 @classmethod
198 def watch(cls, rundir):
199 monitor = cls(rundir)
200 monitor.start()
201 return monitor