Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/gui/snuffler/snufflings/spectrogram.py: 20%

201 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-06 06:59 +0000

1import math 

2import logging 

3import numpy as num 

4from matplotlib.colors import LinearSegmentedColormap 

5 

6from pyrocko import plot, trace 

7from ..snuffling import Snuffling, Param, Choice 

8 

9logger = logging.getLogger('pyrocko.gui.snufflings.spectrogram') 

10 

11 

12def to01(c): 

13 return c[0]/255., c[1]/255., c[2]/255. 

14 

15 

16def desat(c, a): 

17 cmean = (c[0] + c[1] + c[2]) / 3. 

18 return tuple(cc*a + cmean*(1.0-a) for cc in c) 

19 

20 

21name_to_taper = { 

22 'Hanning': num.hanning, 

23 'Hamming': num.hamming, 

24 'Blackman': num.blackman, 

25 'Bartlett': num.bartlett} 

26 

27cmap_colors = [plot.tango_colors[x] for x in [ 

28 'skyblue1', 'chameleon1', 'butter1', 'orange1', 'scarletred1', 'plum3']] 

29 

30name_to_cmap = { 

31 'spectro': LinearSegmentedColormap.from_list( 

32 'spectro', [desat(to01(c), 0.8) for c in cmap_colors])} 

33 

34 

35def get_cmap(name): 

36 if name in name_to_cmap: 

37 return name_to_cmap[name] 

38 else: 

39 return plot.mpl_get_cmap(name) 

40 

41 

42def downsample_plan(n, n_max): 

43 n_mean = (n-1) // n_max + 1 

44 n_new = n // n_mean 

45 return n_new, n_mean 

46 

47 

48class Spectrogram(Snuffling): 

49 

50 ''' 

51 <html> 

52 <body> 

53 <h1>Plot spectrogram</h1> 

54 <p>Plots a basic spectrogram.</p> 

55 </body> 

56 </html> 

57 ''' 

58 

59 def setup(self): 

60 '''Customization of the snuffling.''' 

61 

62 self.set_name('Spectrogram') 

63 self.add_parameter( 

64 Param('Window length [s]:', 'twin', 100, 0.1, 10000.)) 

65 

66 self.add_parameter( 

67 Param('Overlap [%]:', 'overlap', 75., 0., 99.)) 

68 

69 self.add_parameter( 

70 Choice('Taper function', 'taper_name', 'Hanning', 

71 ['Hanning', 'Hamming', 'Blackman', 'Bartlett'])) 

72 

73 self.add_parameter(Param( 

74 'Fmin [Hz]', 'fmin', None, 0.001, 50., 

75 low_is_none=True)) 

76 

77 self.add_parameter(Param( 

78 'Fmax [Hz]', 'fmax', None, 0.001, 50., 

79 high_is_none=True)) 

80 

81 self.add_parameter( 

82 Choice('Color scale', 'color_scale', 'log', 

83 ['log', 'sqrt', 'lin'])) 

84 

85 self.add_parameter( 

86 Choice('Color table', 'ctb_name', 'spectro', 

87 ['spectro', 'rainbow'])) 

88 

89 self.set_live_update(False) 

90 self._tapers = {} 

91 self.nt_max = 2000 

92 self.nf_max = 500 

93 self.iframe = 0 

94 

95 def panel_visibility_changed(self, visible): 

96 viewer = self.get_viewer() 

97 if visible: 

98 viewer.pile_has_changed_signal.connect(self.adjust_controls) 

99 self.adjust_controls() 

100 

101 else: 

102 viewer.pile_has_changed_signal.disconnect(self.adjust_controls) 

103 

104 def adjust_controls(self): 

105 viewer = self.get_viewer() 

106 dtmin, dtmax = viewer.content_deltat_range() 

107 maxfreq = 0.5/dtmin 

108 minfreq = (0.5/dtmax)*0.001 

109 self.set_parameter_range('fmin', minfreq, maxfreq) 

110 self.set_parameter_range('fmax', minfreq, maxfreq) 

111 

112 def get_taper(self, name, n): 

113 

114 taper_key = (name, n) 

115 

116 if taper_key not in self._tapers: 

117 self._tapers[taper_key] = name_to_taper[name](n) 

118 

119 return self._tapers[taper_key] 

120 

121 def make_spectrogram(self, tmin, tmax, tinc, tpad, nslc, deltat): 

122 

123 nt = int(round((tmax - tmin) / tinc)) 

124 

125 nsamples_want = int( 

126 math.floor((tinc + 2*tpad) / deltat)) 

127 

128 nsamples_want_pad = trace.nextpow2(nsamples_want) 

129 nf = nsamples_want_pad // 2 + 1 

130 df = 1.0 / (deltat * nsamples_want_pad) 

131 

132 if self.fmin is not None: 

133 ifmin = int(math.ceil(self.fmin / df)) 

134 else: 

135 ifmin = 0 

136 

137 if self.fmax is not None: 

138 ifmax = min(int(math.floor(self.fmax / df)) + 1, nf) 

139 else: 

140 ifmax = nf 

141 

142 nf_show = ifmax - ifmin 

143 assert nf_show >= 2 

144 

145 nf_show_ds, nf_mean = downsample_plan(nf_show, self.nf_max) 

146 

147 amps = num.zeros((nf_show_ds, nt)) 

148 amps.fill(num.nan) 

149 

150 win = self.get_taper(self.taper_name, nsamples_want) 

151 

152 for batch in self.chopper_selected_traces( 

153 tinc=tinc, 

154 tpad=tpad+deltat, 

155 want_incomplete=False, 

156 fallback=True, 

157 trace_selector=lambda tr: tr.nslc_id == nslc, 

158 mode='inview', 

159 style='batch', 

160 progress='Calculating Spectrogram'): 

161 

162 for tr in batch.traces: 

163 if tr.deltat != deltat: 

164 self.fail( 

165 'Unexpected sampling rate on channel %s.%s.%s.%s: %g' 

166 % (tr.nslc_id + (tr.deltat,))) 

167 

168 trs = batch.traces 

169 

170 if len(trs) != 1: 

171 continue 

172 

173 tr = trs[0] 

174 

175 if deltat is None: 

176 deltat = tr.deltat 

177 

178 else: 

179 if tr.deltat != deltat: 

180 raise Exception('Unexpected sampling rate.') 

181 

182 tr.chop(tmin - tpad, tr.tmax, inplace=True) 

183 tr.set_ydata(tr.ydata[:nsamples_want]) 

184 

185 if tr.data_len() != nsamples_want: 

186 logger.info('incomplete trace') 

187 continue 

188 

189 tr.ydata = tr.ydata.astype(num.float64) 

190 tr.ydata -= tr.ydata.mean() 

191 

192 tr.ydata *= win 

193 

194 f, a = tr.spectrum(pad_to_pow2=True) 

195 assert nf == f.size 

196 assert (nf-1)*df == f[-1] 

197 

198 f = f[ifmin:ifmax] 

199 a = a[ifmin:ifmax] 

200 

201 a = num.abs(a) 

202 # a /= self.cached_abs_response(sq.get_response(tr), f) 

203 

204 a **= 2 

205 a *= tr.deltat * 2. / (df*num.sum(win**2)) 

206 

207 if nf_mean != 1: 

208 nf_trim = (nf_show // nf_mean) * nf_mean 

209 f = num.mean(f[:nf_trim].reshape((-1, nf_mean)), axis=1) 

210 a = num.mean(a[:nf_trim].reshape((-1, nf_mean)), axis=1) 

211 

212 tmid = 0.5*(tr.tmax + tr.tmin) 

213 

214 it = int(round((tmid-tmin)/tinc)) 

215 if it < 0 or nt <= it: 

216 continue 

217 

218 amps[:, it] = a 

219 have_data = True 

220 

221 if not have_data: 

222 self.fail( 

223 'No data could be extracted for channel: %s.%s.%s.%s' % nslc) 

224 

225 t = tmin + 0.5 * tinc + tinc * num.arange(nt) 

226 return t, f, amps 

227 

228 def get_selected_time_range(self): 

229 markers = self.get_viewer().selected_markers() 

230 times = [] 

231 for marker in markers: 

232 times.append(marker.tmin) 

233 times.append(marker.tmax) 

234 

235 if times: 

236 return (min(times), max(times)) 

237 else: 

238 return self.get_viewer().get_time_range() 

239 

240 def prescan(self, tinc, tpad): 

241 

242 tmin, tmax = self.get_selected_time_range() 

243 nt = int(round((tmax - tmin) / tinc)) 

244 if nt > self.nt_max: 

245 _, nt_mean = downsample_plan(nt, self.nt_max) 

246 self.fail( 

247 'Number of samples in spectrogram time axis: %i. Resulting ' 

248 'image will be unreasonably large. Consider increasing window ' 

249 'length to about %g s.' % ( 

250 nt, (tinc*nt_mean) / (1.-self.overlap/100.))) 

251 

252 data = {} 

253 times = [] 

254 for batch in self.chopper_selected_traces( 

255 tinc=tinc, tpad=tpad, want_incomplete=False, fallback=True, 

256 load_data=False, mode='inview', style='batch'): 

257 

258 times.append(batch.tmin) 

259 times.append(batch.tmax) 

260 

261 for tr in batch.traces: 

262 nslc = tr.nslc_id 

263 if nslc not in data: 

264 data[nslc] = set() 

265 

266 data[nslc].add(tr.deltat) 

267 

268 nslcs = sorted(data.keys()) 

269 deltats = [sorted(data[nslc]) for nslc in nslcs] 

270 return min(times), max(times), nslcs, deltats 

271 

272 def call(self): 

273 '''Main work routine of the snuffling.''' 

274 

275 tpad = self.twin * self.overlap/100. * 0.5 

276 tinc = self.twin - 2 * tpad 

277 

278 tmin, tmax, nslcs, deltats = self.prescan(tinc, tpad) 

279 

280 for nslc, deltats in zip(nslcs, deltats): 

281 if len(deltats) > 1: 

282 self.fail( 

283 'Multiple sample rates found for channel %s.%s.%s.%s.' 

284 % nslc) 

285 

286 ncols = int(len(nslcs) // 5 + 1) 

287 nrows = (len(nslcs)-1) // ncols + 1 

288 

289 frame = self.smartplot_frame( 

290 'Spectrogram %i' % (self.iframe + 1), 

291 ['time'] * ncols, 

292 ['frequency'] * nrows, 

293 ['psd']) 

294 

295 self.iframe += 1 

296 

297 zvalues = [] 

298 for i, nslc in enumerate(nslcs): 

299 

300 t, f, a = self.make_spectrogram( 

301 tmin, tmax, tinc, tpad, nslc, deltats[0]) 

302 

303 if self.color_scale == 'log': 

304 a = num.log(a) 

305 elif self.color_scale == 'sqrt': 

306 a = num.sqrt(a) 

307 

308 icol, irow = i % ncols, i // ncols 

309 

310 axes = frame.plot.axes(icol, nrows-1-irow) 

311 

312 min_a = num.nanmin(a) 

313 max_a = num.nanmax(a) 

314 mean_a = num.nanmean(a) 

315 std_a = num.nanstd(a) 

316 

317 zmin = max(min_a, mean_a - 3.0 * std_a) 

318 zmax = min(max_a, mean_a + 3.0 * std_a) 

319 

320 zvalues.append(zmin) 

321 zvalues.append(zmax) 

322 

323 c = axes.pcolormesh( 

324 t, f, a, 

325 cmap=get_cmap(self.ctb_name), 

326 shading='gouraud') 

327 frame.plot.set_color_dim(c, 'psd') 

328 

329 if self.fmin is not None: 

330 fmin = self.fmin 

331 else: 

332 fmin = 2.0 / self.twin 

333 

334 if self.fmax is not None: 

335 fmax = self.fmax 

336 else: 

337 fmax = f[-1] 

338 

339 axes.set_title( 

340 '.'.join(x for x in nslc if x), 

341 ha='right', 

342 va='top', 

343 x=0.99, 

344 y=0.9) 

345 

346 axes.grid(color='black', alpha=0.3) 

347 

348 plot.mpl_time_axis(axes, 10. / ncols) 

349 

350 for i in range(len(nslcs), ncols*nrows): 

351 icol, irow = i % ncols, i // ncols 

352 axes = frame.plot.axes(icol, nrows-1-irow) 

353 axes.set_axis_off() 

354 

355 if self.color_scale == 'log': 

356 label = 'log PSD' 

357 elif self.color_scale == 'sqrt': 

358 label = 'sqrt PSD' 

359 else: 

360 label = 'PSD' 

361 

362 frame.plot.set_label('psd', label) 

363 frame.plot.colorbar('psd') 

364 frame.plot.set_lim('time', t[0], t[-1]) 

365 frame.plot.set_lim('frequency', fmin, fmax) 

366 frame.plot.set_lim('psd', min(zvalues), max(zvalues)) 

367 frame.plot.set_label('time', '') 

368 frame.plot.set_label('frequency', 'Frequency [Hz]') 

369 

370 frame.draw() 

371 

372 

373def __snufflings__(): 

374 '''Returns a list of snufflings to be exported by this module.''' 

375 return [Spectrogram()]