Coverage for /usr/local/lib/python3.11/dist-packages/grond/monitor.py: 97%

134 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-26 16:25 +0000

1from __future__ import print_function, absolute_import 

2 

3import time 

4import logging 

5import threading 

6import os.path as op 

7import numpy as num 

8from datetime import timedelta 

9 

10from pyrocko import util, guts 

11from grond.environment import Environment 

12 

13 

14logger = logging.getLogger('grond.monit') 

15 

16 

17class RingBuffer(num.ndarray): 

18 def __new__(cls, *args, **kwargs): 

19 cls = num.ndarray.__new__(cls, *args, **kwargs) 

20 cls.fill(0.) 

21 return cls 

22 

23 def __init__(self, *args, **kwargs): 

24 self.pos = 0 

25 

26 def put(self, value): 

27 self[self.pos] = value 

28 self.pos += 1 

29 self.pos %= self.size 

30 

31 

32class color: 

33 PURPLE = '\033[95m' 

34 CYAN = '\033[96m' 

35 DARKCYAN = '\033[36m' 

36 BLUE = '\033[94m' 

37 GREEN = '\033[92m' 

38 YELLOW = '\033[93m' 

39 RED = '\033[91m' 

40 BOLD = '\033[1m' 

41 UNDERLINE = '\033[4m' 

42 END = '\033[0m' 

43 

44 

45class TerminalMonitor(object): 

46 

47 def __init__(self, nlines_scroll=10): 

48 self.nlines_scroll = nlines_scroll 

49 

50 def __enter__(self): 

51 print('\033[2J\033[1;%ir\033[%i;1H' 

52 % (self.nlines_scroll, self.nlines_scroll-1), end=None) 

53 return self 

54 

55 def __exit__(self, type, value, traceback): 

56 print('\033[r\033[%i;1H\033[0J\033[%i;1H' 

57 % (self.nlines_scroll+1, self.nlines_scroll-1)) 

58 

59 def _start_show(self): 

60 print('\033[%i;1H\033[0J' % (self.nlines_scroll+1), end=None) 

61 

62 def _end_show(self): 

63 print('\033[%i;1H' % (self.nlines_scroll-1), end=None) 

64 

65 def show(self, s): 

66 self._start_show() 

67 print(s, end=None) 

68 self._end_show() 

69 

70 

71class GrondMonitor(threading.Thread): 

72 

73 col_width = 15 

74 row_name = color.BOLD + '{:<{col_param_width}s}' + color.END 

75 parameter_fmt = '{:{col_width}s}' 

76 

77 def __init__(self, rundir): 

78 threading.Thread.__init__(self) 

79 self.rundir = rundir 

80 self.daemon = True 

81 

82 self.sig_terminate = threading.Event() 

83 self.iter_per_second = 0 

84 self._iiter = 0 

85 self._iter_buffer = RingBuffer(20) 

86 self._tm = None 

87 

88 def run(self): 

89 logger.info('Waiting to follow environment %s...', self.rundir) 

90 env = Environment.discover(self.rundir) 

91 if env is None: 

92 logger.error('Could not attach to Grond environment.') 

93 return 

94 

95 self.environment = env 

96 self.history = self.environment.get_history() 

97 

98 optimiser_fn = op.join(self.rundir, 'optimiser.yaml') 

99 self.optimiser = guts.load(filename=optimiser_fn) 

100 

101 self.problem = self.history.problem 

102 self.niter = self.optimiser.niterations 

103 

104 self.starttime = time.time() 

105 self.last_update = self.starttime 

106 

107 self.history.add_listener(self) 

108 

109 with TerminalMonitor(10) as tm: 

110 

111 self._tm = tm 

112 

113 ii = 0 

114 while True: 

115 ii += 1 

116 self.history.update() 

117 time.sleep(0.1) 

118 if self.sig_terminate.is_set(): 

119 break 

120 

121 logger.debug('Monitor thread exiting.') 

122 

123 @property 

124 def runtime(self): 

125 return timedelta(seconds=round(time.time() - self.starttime)) 

126 

127 @property 

128 def iiter(self): 

129 return self._iiter 

130 

131 @iiter.setter 

132 def iiter(self, iiter): 

133 dt = time.time() - self.last_update 

134 self._iter_buffer.put(float((iiter - self.iiter) / dt)) 

135 self.iter_per_second = float(self._iter_buffer.mean()) 

136 self._iiter = iiter 

137 self.last_update = time.time() 

138 

139 @property 

140 def runtime_remaining(self): 

141 if self.iter_per_second == 0.: 

142 return timedelta() 

143 return timedelta(seconds=round((self.niter - self.iiter) 

144 / self.iter_per_second)) 

145 

146 def extend(self, *args): 

147 ''' Connected and called through the self.history.add_listener ''' 

148 self.iiter = self.history.nmodels 

149 problem = self.history.problem 

150 optimiser_status = self.optimiser.get_status(self.history) 

151 row_names = optimiser_status.row_names 

152 

153 lines = [] 

154 lnadd = lines.append 

155 

156 def fmt(s): 

157 return util.gform(s, significant_digits=(self.col_width-1-6)//2) 

158 

159 lnadd('Problem: {p.name}' 

160 '\t({s.runtime} - remaining {s.runtime_remaining}' 

161 ' @ {s.iter_per_second:.1f} iter/s)' 

162 .format(s=self, p=problem)) 

163 lnadd('Iteration: {s.iiter} / {s.niter}' 

164 .format(s=self)) 

165 if optimiser_status.extra_header is not None: 

166 lnadd(optimiser_status.extra_header) 

167 

168 col_param_width = max([len(p) for p in row_names]) + 2 

169 

170 out_ln = self.row_name +\ 

171 ''.join([self.parameter_fmt] * optimiser_status.ncolumns) 

172 

173 lnadd(out_ln.format( 

174 *['Parameter'] + list(optimiser_status.column_names), 

175 col_param_width=col_param_width, 

176 col_width=self.col_width, 

177 type='s')) 

178 

179 for ip, parameter_name in enumerate(row_names): 

180 lnadd(out_ln.format( 

181 parameter_name, 

182 *[fmt(v[ip]) for v in optimiser_status.values], 

183 col_param_width=col_param_width, 

184 col_width=self.col_width)) 

185 

186 if optimiser_status.extra_footer is not None: 

187 lnadd(optimiser_status.extra_footer) 

188 

189 self._tm.show('\n'.join(lines)) 

190 

191 def terminate(self): 

192 logger.debug('Setting thread termination flag.') 

193 self.sig_terminate.set() 

194 self.join() 

195 

196 @classmethod 

197 def watch(cls, rundir): 

198 monitor = cls(rundir) 

199 monitor.start() 

200 return monitor