1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5from __future__ import absolute_import 

6 

7import os 

8import logging 

9 

10from . import pile, io 

11from . import trace as tracemod 

12 

13logger = logging.getLogger('pyrocko.hamster_pile') 

14 

15 

16class Processor(object): 

17 def __init__(self): 

18 self._buffers = {} 

19 

20 def process(self, trace): 

21 return [trace] 

22 

23 def get_buffer(self, trace): 

24 

25 nslc = trace.nslc_id 

26 if nslc not in self._buffers: 

27 return None 

28 

29 trbuf = self._buffers[nslc] 

30 if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat 

31 and trbuf.ydata.dtype == trace.ydata.dtype 

32 and trbuf.deltat == trace.deltat): 

33 return trbuf 

34 else: 

35 return None 

36 

37 def set_buffer(self, trace): 

38 nslc = trace.nslc_id 

39 self._buffers[nslc] = trace 

40 

41 def empty_buffer(self, trace): 

42 nslc = trace.nslc_id 

43 del self._buffers[nslc] 

44 

45 def flush_buffers(self): 

46 traces = list(self._buffers.values()) 

47 self._buffers = {} 

48 return traces 

49 

50 

51class Renamer(object): 

52 def __init__(self, mapping): 

53 self._mapping = mapping 

54 

55 def process(self, trace): 

56 target_id = self._mapping(trace) 

57 if target_id is None: 

58 return [] 

59 

60 out = trace.copy() 

61 out.set_codes(*target_id) 

62 return [out] 

63 

64 

65class Chain(Processor): 

66 def __init__(self, *processors): 

67 self._processors = processors 

68 

69 def process(self, trace): 

70 traces = [trace] 

71 xtraces = [] 

72 for p in self._processors: 

73 for tr in traces: 

74 xtraces.extend(p.process(traces)) 

75 

76 return xtraces 

77 

78 

79class Downsampler(Processor): 

80 

81 def __init__(self, mapping, deltat): 

82 Processor.__init__(self) 

83 self._mapping = mapping 

84 self._downsampler = tracemod.co_downsample_to(deltat) 

85 

86 def process(self, trace): 

87 target_id = self._mapping(trace) 

88 if target_id is None: 

89 return [] 

90 

91 ds_trace = self._downsampler.send(trace) 

92 ds_trace.set_codes(*target_id) 

93 if ds_trace.data_len() == 0: 

94 return [] 

95 

96 return [ds_trace] 

97 

98 def __del__(self): 

99 self._downsampler.close() 

100 

101 

102class Grower(Processor): 

103 

104 def __init__(self, tflush=None): 

105 Processor.__init__(self) 

106 self._tflush = tflush 

107 

108 def process(self, trace): 

109 buffer = self.get_buffer(trace) 

110 if buffer is None: 

111 buffer = trace 

112 self.set_buffer(buffer) 

113 else: 

114 buffer.append(trace.ydata) 

115 

116 if buffer.tmax - buffer.tmin >= self._tflush: 

117 self.empty_buffer(buffer) 

118 return [buffer] 

119 

120 else: 

121 return [] 

122 

123 

124class HamsterPile(pile.Pile): 

125 

126 def __init__( 

127 self, 

128 fixation_length=None, 

129 path=None, 

130 format='from_extension', 

131 forget_fixed=False, 

132 processors=None): 

133 

134 pile.Pile.__init__(self) 

135 self._buffers = {} # keys: nslc, values: MemTracesFile 

136 self._fixation_length = fixation_length 

137 self._format = format 

138 self._path = path 

139 self._forget_fixed = forget_fixed 

140 if processors is None: 

141 self._processors = [Processor()] 

142 else: 

143 self._processors = [] 

144 for p in processors: 

145 self.add_processor(p) 

146 

147 def set_fixation_length(self, length): 

148 ''' 

149 Set length after which the fixation method is called on buffer traces. 

150 

151 The length should be given in seconds. Give None to disable. 

152 ''' 

153 self.fixate_all() 

154 self._fixation_length = length # in seconds 

155 

156 def set_save_path( 

157 self, 

158 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' 

159 '%(tmin)s_%(tmax)s.mseed'): 

160 

161 self.fixate_all() 

162 self._path = path 

163 

164 def add_processor(self, processor): 

165 self.fixate_all() 

166 self._processors.append(processor) 

167 

168 def insert_trace(self, trace): 

169 logger.debug('Received a trace: %s' % trace) 

170 

171 for p in self._processors: 

172 for tr in p.process(trace): 

173 self._insert_trace(tr) 

174 

175 def _insert_trace(self, trace): 

176 buf = self._append_to_buffer(trace) 

177 nslc = trace.nslc_id 

178 

179 if buf is None: # create new buffer trace 

180 if nslc in self._buffers: 

181 self._fixate(self._buffers[nslc]) 

182 

183 trbuf = trace.copy() 

184 buf = pile.MemTracesFile(None, [trbuf]) 

185 self.add_file(buf) 

186 self._buffers[nslc] = buf 

187 

188 buf.recursive_grow_update([trace]) 

189 trbuf = buf.get_traces()[0] 

190 if self._fixation_length is not None: 

191 if trbuf.tmax - trbuf.tmin > self._fixation_length: 

192 self._fixate(buf) 

193 del self._buffers[nslc] 

194 

195 def _append_to_buffer(self, trace): 

196 ''' 

197 Try to append the trace to the active buffer traces. 

198 

199 Returns the current buffer trace or None if unsuccessful. 

200 ''' 

201 

202 nslc = trace.nslc_id 

203 if nslc not in self._buffers: 

204 return None 

205 

206 buf = self._buffers[nslc] 

207 trbuf = buf.get_traces()[0] 

208 if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat 

209 and trbuf.ydata.dtype == trace.ydata.dtype 

210 and trbuf.deltat == trace.deltat): 

211 

212 trbuf.append(trace.ydata) 

213 return buf 

214 

215 return None 

216 

217 def fixate_all(self): 

218 for buf in self._buffers.values(): 

219 self._fixate(buf) 

220 

221 self._buffers = {} 

222 

223 def _fixate(self, buf): 

224 if self._path: 

225 trbuf = buf.get_traces()[0] 

226 fns = io.save([trbuf], self._path, format=self._format) 

227 

228 self.remove_file(buf) 

229 if not self._forget_fixed: 

230 self.load_files( 

231 fns, show_progress=False, fileformat=self._format) 

232 

233 def drop_older(self, tmax, delete_disk_files=False): 

234 self.drop( 

235 condition=lambda file: file.tmax < tmax, 

236 delete_disk_files=delete_disk_files) 

237 

238 def drop(self, condition, delete_disk_files=False): 

239 candidates = [] 

240 buffers = list(self._buffers.values()) 

241 for file in self.iter_files(): 

242 if condition(file) and file not in buffers: 

243 candidates.append(file) 

244 

245 self.remove_files(candidates) 

246 if delete_disk_files: 

247 for file in candidates: 

248 if file.abspath and os.path.exists(file.abspath): 

249 os.unlink(file.abspath) 

250 

251 def __del__(self): 

252 self.fixate_all()