Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/hamster_pile.py: 25%

158 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-06 10:48 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Real-time data acquisition and processing helpers for :py:mod:`pyrocko.pile`. 

8''' 

9 

10import os 

11import logging 

12 

13from . import pile, io 

14from . import trace as tracemod 

15 

16logger = logging.getLogger('pyrocko.hamster_pile') 

17 

18 

19class Processor(object): 

20 def __init__(self): 

21 self._buffers = {} 

22 

23 def process(self, trace): 

24 return [trace] 

25 

26 def get_buffer(self, trace): 

27 

28 nslc = trace.nslc_id 

29 if nslc not in self._buffers: 

30 return None 

31 

32 trbuf = self._buffers[nslc] 

33 if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat 

34 and trbuf.ydata.dtype == trace.ydata.dtype 

35 and trbuf.deltat == trace.deltat): 

36 return trbuf 

37 else: 

38 return None 

39 

40 def set_buffer(self, trace): 

41 nslc = trace.nslc_id 

42 self._buffers[nslc] = trace 

43 

44 def empty_buffer(self, trace): 

45 nslc = trace.nslc_id 

46 del self._buffers[nslc] 

47 

48 def flush_buffers(self): 

49 traces = list(self._buffers.values()) 

50 self._buffers = {} 

51 return traces 

52 

53 

54class Renamer(object): 

55 def __init__(self, mapping): 

56 self._mapping = mapping 

57 

58 def process(self, trace): 

59 target_id = self._mapping(trace) 

60 if target_id is None: 

61 return [] 

62 

63 out = trace.copy() 

64 out.set_codes(*target_id) 

65 return [out] 

66 

67 

68class Chain(Processor): 

69 def __init__(self, *processors): 

70 self._processors = processors 

71 

72 def process(self, trace): 

73 traces = [trace] 

74 xtraces = [] 

75 for p in self._processors: 

76 for tr in traces: 

77 xtraces.extend(p.process(traces)) 

78 

79 return xtraces 

80 

81 

82class Downsampler(Processor): 

83 

84 def __init__(self, mapping, deltat): 

85 Processor.__init__(self) 

86 self._mapping = mapping 

87 self._downsampler = tracemod.co_downsample_to(deltat) 

88 

89 def process(self, trace): 

90 target_id = self._mapping(trace) 

91 if target_id is None: 

92 return [] 

93 

94 ds_trace = self._downsampler.send(trace) 

95 ds_trace.set_codes(*target_id) 

96 if ds_trace.data_len() == 0: 

97 return [] 

98 

99 return [ds_trace] 

100 

101 def __del__(self): 

102 self._downsampler.close() 

103 

104 

105class Grower(Processor): 

106 

107 def __init__(self, tflush=None): 

108 Processor.__init__(self) 

109 self._tflush = tflush 

110 

111 def process(self, trace): 

112 buffer = self.get_buffer(trace) 

113 if buffer is None: 

114 buffer = trace 

115 self.set_buffer(buffer) 

116 else: 

117 buffer.append(trace.ydata) 

118 

119 if buffer.tmax - buffer.tmin >= self._tflush: 

120 self.empty_buffer(buffer) 

121 return [buffer] 

122 

123 else: 

124 return [] 

125 

126 

127class HamsterPile(pile.Pile): 

128 

129 def __init__( 

130 self, 

131 fixation_length=None, 

132 path=None, 

133 format='from_extension', 

134 forget_fixed=False, 

135 processors=None): 

136 

137 pile.Pile.__init__(self) 

138 self._buffers = {} # keys: nslc, values: MemTracesFile 

139 self._fixation_length = fixation_length 

140 self._format = format 

141 self._path = path 

142 self._forget_fixed = forget_fixed 

143 if processors is None: 

144 self._processors = [Processor()] 

145 else: 

146 self._processors = [] 

147 for p in processors: 

148 self.add_processor(p) 

149 

150 def set_fixation_length(self, length): 

151 ''' 

152 Set length after which the fixation method is called on buffer traces. 

153 

154 The length should be given in seconds. Give None to disable. 

155 ''' 

156 self.fixate_all() 

157 self._fixation_length = length # in seconds 

158 

159 def set_save_path( 

160 self, 

161 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' 

162 '%(tmin)s_%(tmax)s.mseed'): 

163 

164 self.fixate_all() 

165 self._path = path 

166 

167 def add_processor(self, processor): 

168 self.fixate_all() 

169 self._processors.append(processor) 

170 

171 def insert_trace(self, trace): 

172 logger.debug('Received a trace: %s' % trace) 

173 

174 for p in self._processors: 

175 for tr in p.process(trace): 

176 self._insert_trace(tr) 

177 

178 def _insert_trace(self, trace): 

179 buf = self._append_to_buffer(trace) 

180 nslc = trace.nslc_id 

181 

182 if buf is None: # create new buffer trace 

183 if nslc in self._buffers: 

184 self._fixate(self._buffers[nslc]) 

185 

186 trbuf = trace.copy() 

187 buf = pile.MemTracesFile(None, [trbuf]) 

188 self.add_file(buf) 

189 self._buffers[nslc] = buf 

190 

191 buf.recursive_grow_update([trace]) 

192 trbuf = buf.get_traces()[0] 

193 if self._fixation_length is not None: 

194 if trbuf.tmax - trbuf.tmin > self._fixation_length: 

195 self._fixate(buf) 

196 del self._buffers[nslc] 

197 

198 def _append_to_buffer(self, trace): 

199 ''' 

200 Try to append the trace to the active buffer traces. 

201 

202 Returns the current buffer trace or None if unsuccessful. 

203 ''' 

204 

205 nslc = trace.nslc_id 

206 if nslc not in self._buffers: 

207 return None 

208 

209 buf = self._buffers[nslc] 

210 trbuf = buf.get_traces()[0] 

211 if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat 

212 and trbuf.ydata.dtype == trace.ydata.dtype 

213 and trbuf.deltat == trace.deltat): 

214 

215 trbuf.append(trace.ydata) 

216 return buf 

217 

218 return None 

219 

220 def fixate_all(self): 

221 for buf in self._buffers.values(): 

222 self._fixate(buf) 

223 

224 self._buffers = {} 

225 

226 def _fixate(self, buf): 

227 if self._path: 

228 trbuf = buf.get_traces()[0] 

229 fns = io.save([trbuf], self._path, format=self._format) 

230 

231 self.remove_file(buf) 

232 if not self._forget_fixed: 

233 self.load_files( 

234 fns, show_progress=False, fileformat=self._format) 

235 

236 def drop_older(self, tmax, delete_disk_files=False): 

237 self.drop( 

238 condition=lambda file: file.tmax < tmax, 

239 delete_disk_files=delete_disk_files) 

240 

241 def drop(self, condition, delete_disk_files=False): 

242 candidates = [] 

243 buffers = list(self._buffers.values()) 

244 for file in self.iter_files(): 

245 if condition(file) and file not in buffers: 

246 candidates.append(file) 

247 

248 self.remove_files(candidates) 

249 if delete_disk_files: 

250 for file in candidates: 

251 if file.abspath and os.path.exists(file.abspath): 

252 os.unlink(file.abspath) 

253 

254 def __del__(self): 

255 self.fixate_all()