Coverage for /usr/local/lib/python3.11/dist-packages/grond/monitor.py: 97%

133 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-11-27 15:15 +0000

1# https://pyrocko.org/grond - GPLv3 

2# 

3# The Grond Developers, 21st Century 

4import time 

5import logging 

6import threading 

7import os.path as op 

8import numpy as num 

9from datetime import timedelta 

10 

11from pyrocko import util, guts 

12from grond.environment import Environment 

13 

14 

15logger = logging.getLogger('grond.monit') 

16 

17 

18class RingBuffer(num.ndarray): 

19 def __new__(cls, *args, **kwargs): 

20 cls = num.ndarray.__new__(cls, *args, **kwargs) 

21 cls.fill(0.) 

22 return cls 

23 

24 def __init__(self, *args, **kwargs): 

25 self.pos = 0 

26 

27 def put(self, value): 

28 self[self.pos] = value 

29 self.pos += 1 

30 self.pos %= self.size 

31 

32 

33class color: 

34 PURPLE = '\033[95m' 

35 CYAN = '\033[96m' 

36 DARKCYAN = '\033[36m' 

37 BLUE = '\033[94m' 

38 GREEN = '\033[92m' 

39 YELLOW = '\033[93m' 

40 RED = '\033[91m' 

41 BOLD = '\033[1m' 

42 UNDERLINE = '\033[4m' 

43 END = '\033[0m' 

44 

45 

46class TerminalMonitor(object): 

47 

48 def __init__(self, nlines_scroll=10): 

49 self.nlines_scroll = nlines_scroll 

50 

51 def __enter__(self): 

52 print('\033[2J\033[1;%ir\033[%i;1H' 

53 % (self.nlines_scroll, self.nlines_scroll-1), end=None) 

54 return self 

55 

56 def __exit__(self, type, value, traceback): 

57 print('\033[r\033[%i;1H\033[0J\033[%i;1H' 

58 % (self.nlines_scroll+1, self.nlines_scroll-1)) 

59 

60 def _start_show(self): 

61 print('\033[%i;1H\033[0J' % (self.nlines_scroll+1), end=None) 

62 

63 def _end_show(self): 

64 print('\033[%i;1H' % (self.nlines_scroll-1), end=None) 

65 

66 def show(self, s): 

67 self._start_show() 

68 print(s, end=None) 

69 self._end_show() 

70 

71 

72class GrondMonitor(threading.Thread): 

73 

74 col_width = 15 

75 row_name = color.BOLD + '{:<{col_param_width}s}' + color.END 

76 parameter_fmt = '{:{col_width}s}' 

77 

78 def __init__(self, rundir): 

79 threading.Thread.__init__(self) 

80 self.rundir = rundir 

81 self.daemon = True 

82 

83 self.sig_terminate = threading.Event() 

84 self.iter_per_second = 0 

85 self._iiter = 0 

86 self._iter_buffer = RingBuffer(20) 

87 self._tm = None 

88 

89 def run(self): 

90 logger.info('Waiting to follow environment %s...' % self.rundir) 

91 env = Environment.discover(self.rundir) 

92 if env is None: 

93 logger.error('Could not attach to Grond environment.') 

94 return 

95 

96 self.environment = env 

97 self.history = self.environment.get_history() 

98 

99 optimiser_fn = op.join(self.rundir, 'optimiser.yaml') 

100 self.optimiser = guts.load(filename=optimiser_fn) 

101 

102 self.problem = self.history.problem 

103 self.niter = self.optimiser.niterations 

104 

105 self.starttime = time.time() 

106 self.last_update = self.starttime 

107 

108 self.history.add_listener(self) 

109 

110 with TerminalMonitor(10) as tm: 

111 

112 self._tm = tm 

113 

114 ii = 0 

115 while True: 

116 ii += 1 

117 self.history.update() 

118 time.sleep(0.1) 

119 if self.sig_terminate.is_set(): 

120 break 

121 

122 logger.debug('Monitor thread exiting.') 

123 

124 @property 

125 def runtime(self): 

126 return timedelta(seconds=round(time.time() - self.starttime)) 

127 

128 @property 

129 def iiter(self): 

130 return self._iiter 

131 

132 @iiter.setter 

133 def iiter(self, iiter): 

134 dt = time.time() - self.last_update 

135 self._iter_buffer.put(float((iiter - self.iiter) / dt)) 

136 self.iter_per_second = float(self._iter_buffer.mean()) 

137 self._iiter = iiter 

138 self.last_update = time.time() 

139 

140 @property 

141 def runtime_remaining(self): 

142 if self.iter_per_second == 0.: 

143 return timedelta() 

144 return timedelta(seconds=round((self.niter - self.iiter) 

145 / self.iter_per_second)) 

146 

147 def extend(self, *args): 

148 ''' Connected and called through the self.history.add_listener ''' 

149 self.iiter = self.history.nmodels 

150 problem = self.history.problem 

151 optimiser_status = self.optimiser.get_status(self.history) 

152 row_names = optimiser_status.row_names 

153 

154 lines = [] 

155 lnadd = lines.append 

156 

157 def fmt(s): 

158 return util.gform(s, significant_digits=(self.col_width-1-6)//2) 

159 

160 lnadd('Problem: {p.name}' 

161 '\t({s.runtime} - remaining {s.runtime_remaining}' 

162 ' @ {s.iter_per_second:.1f} iter/s)' 

163 .format(s=self, p=problem)) 

164 lnadd('Iteration: {s.iiter} / {s.niter}' 

165 .format(s=self)) 

166 if optimiser_status.extra_header is not None: 

167 lnadd(optimiser_status.extra_header) 

168 

169 col_param_width = max([len(p) for p in row_names]) + 2 

170 

171 out_ln = self.row_name +\ 

172 ''.join([self.parameter_fmt] * optimiser_status.ncolumns) 

173 

174 lnadd(out_ln.format( 

175 *['Parameter'] + list(optimiser_status.column_names), 

176 col_param_width=col_param_width, 

177 col_width=self.col_width, 

178 type='s')) 

179 

180 for ip, parameter_name in enumerate(row_names): 

181 lnadd(out_ln.format( 

182 parameter_name, 

183 *[fmt(v[ip]) for v in optimiser_status.values], 

184 col_param_width=col_param_width, 

185 col_width=self.col_width)) 

186 

187 if optimiser_status.extra_footer is not None: 

188 lnadd(optimiser_status.extra_footer) 

189 

190 self._tm.show('\n'.join(lines)) 

191 

192 def terminate(self): 

193 logger.debug('Setting thread termination flag.') 

194 self.sig_terminate.set() 

195 self.join() 

196 

197 @classmethod 

198 def watch(cls, rundir): 

199 monitor = cls(rundir) 

200 monitor.start() 

201 return monitor