1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import os 

7import logging 

8 

9from . import pile, io 

10from . import trace as tracemod 

11 

12logger = logging.getLogger('pyrocko.hamster_pile') 

13 

14 

15class Processor(object): 

16 def __init__(self): 

17 self._buffers = {} 

18 

19 def process(self, trace): 

20 return [trace] 

21 

22 def get_buffer(self, trace): 

23 

24 nslc = trace.nslc_id 

25 if nslc not in self._buffers: 

26 return None 

27 

28 trbuf = self._buffers[nslc] 

29 if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat 

30 and trbuf.ydata.dtype == trace.ydata.dtype 

31 and trbuf.deltat == trace.deltat): 

32 return trbuf 

33 else: 

34 return None 

35 

36 def set_buffer(self, trace): 

37 nslc = trace.nslc_id 

38 self._buffers[nslc] = trace 

39 

40 def empty_buffer(self, trace): 

41 nslc = trace.nslc_id 

42 del self._buffers[nslc] 

43 

44 def flush_buffers(self): 

45 traces = list(self._buffers.values()) 

46 self._buffers = {} 

47 return traces 

48 

49 

50class Renamer(object): 

51 def __init__(self, mapping): 

52 self._mapping = mapping 

53 

54 def process(self, trace): 

55 target_id = self._mapping(trace) 

56 if target_id is None: 

57 return [] 

58 

59 out = trace.copy() 

60 out.set_codes(*target_id) 

61 return [out] 

62 

63 

64class Chain(Processor): 

65 def __init__(self, *processors): 

66 self._processors = processors 

67 

68 def process(self, trace): 

69 traces = [trace] 

70 xtraces = [] 

71 for p in self._processors: 

72 for tr in traces: 

73 xtraces.extend(p.process(traces)) 

74 

75 return xtraces 

76 

77 

78class Downsampler(Processor): 

79 

80 def __init__(self, mapping, deltat): 

81 Processor.__init__(self) 

82 self._mapping = mapping 

83 self._downsampler = tracemod.co_downsample_to(deltat) 

84 

85 def process(self, trace): 

86 target_id = self._mapping(trace) 

87 if target_id is None: 

88 return [] 

89 

90 ds_trace = self._downsampler.send(trace) 

91 ds_trace.set_codes(*target_id) 

92 if ds_trace.data_len() == 0: 

93 return [] 

94 

95 return [ds_trace] 

96 

97 def __del__(self): 

98 self._downsampler.close() 

99 

100 

101class Grower(Processor): 

102 

103 def __init__(self, tflush=None): 

104 Processor.__init__(self) 

105 self._tflush = tflush 

106 

107 def process(self, trace): 

108 buffer = self.get_buffer(trace) 

109 if buffer is None: 

110 buffer = trace 

111 self.set_buffer(buffer) 

112 else: 

113 buffer.append(trace.ydata) 

114 

115 if buffer.tmax - buffer.tmin >= self._tflush: 

116 self.empty_buffer(buffer) 

117 return [buffer] 

118 

119 else: 

120 return [] 

121 

122 

123class HamsterPile(pile.Pile): 

124 

125 def __init__( 

126 self, 

127 fixation_length=None, 

128 path=None, 

129 format='from_extension', 

130 forget_fixed=False, 

131 processors=None): 

132 

133 pile.Pile.__init__(self) 

134 self._buffers = {} # keys: nslc, values: MemTracesFile 

135 self._fixation_length = fixation_length 

136 self._format = format 

137 self._path = path 

138 self._forget_fixed = forget_fixed 

139 if processors is None: 

140 self._processors = [Processor()] 

141 else: 

142 self._processors = [] 

143 for p in processors: 

144 self.add_processor(p) 

145 

146 def set_fixation_length(self, length): 

147 ''' 

148 Set length after which the fixation method is called on buffer traces. 

149 

150 The length should be given in seconds. Give None to disable. 

151 ''' 

152 self.fixate_all() 

153 self._fixation_length = length # in seconds 

154 

155 def set_save_path( 

156 self, 

157 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' 

158 '%(tmin)s_%(tmax)s.mseed'): 

159 

160 self.fixate_all() 

161 self._path = path 

162 

163 def add_processor(self, processor): 

164 self.fixate_all() 

165 self._processors.append(processor) 

166 

167 def insert_trace(self, trace): 

168 logger.debug('Received a trace: %s' % trace) 

169 

170 for p in self._processors: 

171 for tr in p.process(trace): 

172 self._insert_trace(tr) 

173 

174 def _insert_trace(self, trace): 

175 buf = self._append_to_buffer(trace) 

176 nslc = trace.nslc_id 

177 

178 if buf is None: # create new buffer trace 

179 if nslc in self._buffers: 

180 self._fixate(self._buffers[nslc]) 

181 

182 trbuf = trace.copy() 

183 buf = pile.MemTracesFile(None, [trbuf]) 

184 self.add_file(buf) 

185 self._buffers[nslc] = buf 

186 

187 buf.recursive_grow_update([trace]) 

188 trbuf = buf.get_traces()[0] 

189 if self._fixation_length is not None: 

190 if trbuf.tmax - trbuf.tmin > self._fixation_length: 

191 self._fixate(buf) 

192 del self._buffers[nslc] 

193 

194 def _append_to_buffer(self, trace): 

195 ''' 

196 Try to append the trace to the active buffer traces. 

197 

198 Returns the current buffer trace or None if unsuccessful. 

199 ''' 

200 

201 nslc = trace.nslc_id 

202 if nslc not in self._buffers: 

203 return None 

204 

205 buf = self._buffers[nslc] 

206 trbuf = buf.get_traces()[0] 

207 if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat 

208 and trbuf.ydata.dtype == trace.ydata.dtype 

209 and trbuf.deltat == trace.deltat): 

210 

211 trbuf.append(trace.ydata) 

212 return buf 

213 

214 return None 

215 

216 def fixate_all(self): 

217 for buf in self._buffers.values(): 

218 self._fixate(buf) 

219 

220 self._buffers = {} 

221 

222 def _fixate(self, buf): 

223 if self._path: 

224 trbuf = buf.get_traces()[0] 

225 fns = io.save([trbuf], self._path, format=self._format) 

226 

227 self.remove_file(buf) 

228 if not self._forget_fixed: 

229 self.load_files( 

230 fns, show_progress=False, fileformat=self._format) 

231 

232 def drop_older(self, tmax, delete_disk_files=False): 

233 self.drop( 

234 condition=lambda file: file.tmax < tmax, 

235 delete_disk_files=delete_disk_files) 

236 

237 def drop(self, condition, delete_disk_files=False): 

238 candidates = [] 

239 buffers = list(self._buffers.values()) 

240 for file in self.iter_files(): 

241 if condition(file) and file not in buffers: 

242 candidates.append(file) 

243 

244 self.remove_files(candidates) 

245 if delete_disk_files: 

246 for file in candidates: 

247 if file.abspath and os.path.exists(file.abspath): 

248 os.unlink(file.abspath) 

249 

250 def __del__(self): 

251 self.fixate_all()