1import math 

2import logging 

3import numpy as num 

4from matplotlib.colors import LinearSegmentedColormap 

5from matplotlib import cm 

6 

7from pyrocko import plot, trace 

8from ..snuffling import Snuffling, Param, Choice 

9 

10logger = logging.getLogger('pyrocko.gui.snufflings.spectrogram') 

11 

12 

13def to01(c): 

14 return c[0]/255., c[1]/255., c[2]/255. 

15 

16 

17def desat(c, a): 

18 cmean = (c[0] + c[1] + c[2]) / 3. 

19 return tuple(cc*a + cmean*(1.0-a) for cc in c) 

20 

21 

22name_to_taper = { 

23 'Hanning': num.hanning, 

24 'Hamming': num.hamming, 

25 'Blackman': num.blackman, 

26 'Bartlett': num.bartlett} 

27 

28cmap_colors = [plot.tango_colors[x] for x in [ 

29 'skyblue1', 'chameleon1', 'butter1', 'orange1', 'scarletred1', 'plum3']] 

30 

31name_to_cmap = { 

32 'spectro': LinearSegmentedColormap.from_list( 

33 'spectro', [desat(to01(c), 0.8) for c in cmap_colors])} 

34 

35 

36def get_cmap(name): 

37 if name in name_to_cmap: 

38 return name_to_cmap[name] 

39 else: 

40 return cm.get_cmap(name) 

41 

42 

43def downsample_plan(n, n_max): 

44 n_mean = (n-1) // n_max + 1 

45 n_new = n // n_mean 

46 return n_new, n_mean 

47 

48 

49class Spectrogram(Snuffling): 

50 

51 ''' 

52 <html> 

53 <body> 

54 <h1>Plot spectrogram</h1> 

55 <p>Plots a basic spectrogram.</p> 

56 </body> 

57 </html> 

58 ''' 

59 

60 def setup(self): 

61 '''Customization of the snuffling.''' 

62 

63 self.set_name('Spectrogram') 

64 self.add_parameter( 

65 Param('Window length [s]:', 'twin', 100, 0.1, 10000.)) 

66 

67 self.add_parameter( 

68 Param('Overlap [%]:', 'overlap', 75., 0., 99.)) 

69 

70 self.add_parameter( 

71 Choice('Taper function', 'taper_name', 'Hanning', 

72 ['Hanning', 'Hamming', 'Blackman', 'Bartlett'])) 

73 

74 self.add_parameter(Param( 

75 'Fmin [Hz]', 'fmin', None, 0.001, 50., 

76 low_is_none=True)) 

77 

78 self.add_parameter(Param( 

79 'Fmax [Hz]', 'fmax', None, 0.001, 50., 

80 high_is_none=True)) 

81 

82 self.add_parameter( 

83 Choice('Color scale', 'color_scale', 'log', 

84 ['log', 'sqrt', 'lin'])) 

85 

86 self.add_parameter( 

87 Choice('Color table', 'ctb_name', 'spectro', 

88 ['spectro', 'rainbow'])) 

89 

90 self.set_live_update(False) 

91 self._tapers = {} 

92 self.nt_max = 2000 

93 self.nf_max = 500 

94 self.iframe = 0 

95 

96 def panel_visibility_changed(self, visible): 

97 viewer = self.get_viewer() 

98 if visible: 

99 viewer.pile_has_changed_signal.connect(self.adjust_controls) 

100 self.adjust_controls() 

101 

102 else: 

103 viewer.pile_has_changed_signal.disconnect(self.adjust_controls) 

104 

105 def adjust_controls(self): 

106 viewer = self.get_viewer() 

107 dtmin, dtmax = viewer.content_deltat_range() 

108 maxfreq = 0.5/dtmin 

109 minfreq = (0.5/dtmax)*0.001 

110 self.set_parameter_range('fmin', minfreq, maxfreq) 

111 self.set_parameter_range('fmax', minfreq, maxfreq) 

112 

113 def get_taper(self, name, n): 

114 

115 taper_key = (name, n) 

116 

117 if taper_key not in self._tapers: 

118 self._tapers[taper_key] = name_to_taper[name](n) 

119 

120 return self._tapers[taper_key] 

121 

122 def make_spectrogram(self, tmin, tmax, tinc, tpad, nslc, deltat): 

123 

124 nt = int(round((tmax - tmin) / tinc)) 

125 

126 nsamples_want = int( 

127 math.floor((tinc + 2*tpad) / deltat)) 

128 

129 nsamples_want_pad = trace.nextpow2(nsamples_want) 

130 nf = nsamples_want_pad // 2 + 1 

131 df = 1.0 / (deltat * nsamples_want_pad) 

132 

133 if self.fmin is not None: 

134 ifmin = int(math.ceil(self.fmin / df)) 

135 else: 

136 ifmin = 0 

137 

138 if self.fmax is not None: 

139 ifmax = min(int(math.floor(self.fmax / df)) + 1, nf) 

140 else: 

141 ifmax = nf 

142 

143 nf_show = ifmax - ifmin 

144 assert nf_show >= 2 

145 

146 nf_show_ds, nf_mean = downsample_plan(nf_show, self.nf_max) 

147 

148 amps = num.zeros((nf_show_ds, nt)) 

149 amps.fill(num.nan) 

150 

151 win = self.get_taper(self.taper_name, nsamples_want) 

152 

153 for batch in self.chopper_selected_traces( 

154 tinc=tinc, 

155 tpad=tpad+deltat, 

156 want_incomplete=False, 

157 fallback=True, 

158 trace_selector=lambda tr: tr.nslc_id == nslc, 

159 mode='inview', 

160 style='batch', 

161 progress='Calculating Spectrogram'): 

162 

163 for tr in batch.traces: 

164 if tr.deltat != deltat: 

165 self.fail( 

166 'Unexpected sampling rate on channel %s.%s.%s.%s: %g' 

167 % (tr.nslc_id + (tr.deltat,))) 

168 

169 trs = batch.traces 

170 

171 if len(trs) != 1: 

172 continue 

173 

174 tr = trs[0] 

175 

176 if deltat is None: 

177 deltat = tr.deltat 

178 

179 else: 

180 if tr.deltat != deltat: 

181 raise Exception('Unexpected sampling rate.') 

182 

183 tr.chop(tmin - tpad, tr.tmax, inplace=True) 

184 tr.set_ydata(tr.ydata[:nsamples_want]) 

185 

186 if tr.data_len() != nsamples_want: 

187 logger.info('incomplete trace') 

188 continue 

189 

190 tr.ydata = tr.ydata.astype(num.float64) 

191 tr.ydata -= tr.ydata.mean() 

192 

193 tr.ydata *= win 

194 

195 f, a = tr.spectrum(pad_to_pow2=True) 

196 assert nf == f.size 

197 assert (nf-1)*df == f[-1] 

198 

199 f = f[ifmin:ifmax] 

200 a = a[ifmin:ifmax] 

201 

202 a = num.abs(a) 

203 # a /= self.cached_abs_response(sq.get_response(tr), f) 

204 

205 a **= 2 

206 a *= tr.deltat * 2. / (df*num.sum(win**2)) 

207 

208 if nf_mean != 1: 

209 nf_trim = (nf_show // nf_mean) * nf_mean 

210 f = num.mean(f[:nf_trim].reshape((-1, nf_mean)), axis=1) 

211 a = num.mean(a[:nf_trim].reshape((-1, nf_mean)), axis=1) 

212 

213 tmid = 0.5*(tr.tmax + tr.tmin) 

214 

215 it = int(round((tmid-tmin)/tinc)) 

216 if it < 0 or nt <= it: 

217 continue 

218 

219 amps[:, it] = a 

220 have_data = True 

221 

222 if not have_data: 

223 self.fail( 

224 'No data could be extracted for channel: %s.%s.%s.%s' % nslc) 

225 

226 t = tmin + 0.5 * tinc + tinc * num.arange(nt) 

227 return t, f, amps 

228 

229 def get_selected_time_range(self): 

230 markers = self.get_viewer().selected_markers() 

231 times = [] 

232 for marker in markers: 

233 times.append(marker.tmin) 

234 times.append(marker.tmax) 

235 

236 if times: 

237 return (min(times), max(times)) 

238 else: 

239 return self.get_viewer().get_time_range() 

240 

241 def prescan(self, tinc, tpad): 

242 

243 tmin, tmax = self.get_selected_time_range() 

244 nt = int(round((tmax - tmin) / tinc)) 

245 if nt > self.nt_max: 

246 _, nt_mean = downsample_plan(nt, self.nt_max) 

247 self.fail( 

248 'Number of samples in spectrogram time axis: %i. Resulting ' 

249 'image will be unreasonably large. Consider increasing window ' 

250 'length to about %g s.' % ( 

251 nt, (tinc*nt_mean) / (1.-self.overlap/100.))) 

252 

253 data = {} 

254 times = [] 

255 for batch in self.chopper_selected_traces( 

256 tinc=tinc, tpad=tpad, want_incomplete=False, fallback=True, 

257 load_data=False, mode='inview', style='batch'): 

258 

259 times.append(batch.tmin) 

260 times.append(batch.tmax) 

261 

262 for tr in batch.traces: 

263 nslc = tr.nslc_id 

264 if nslc not in data: 

265 data[nslc] = set() 

266 

267 data[nslc].add(tr.deltat) 

268 

269 nslcs = sorted(data.keys()) 

270 deltats = [sorted(data[nslc]) for nslc in nslcs] 

271 return min(times), max(times), nslcs, deltats 

272 

273 def call(self): 

274 '''Main work routine of the snuffling.''' 

275 

276 tpad = self.twin * self.overlap/100. * 0.5 

277 tinc = self.twin - 2 * tpad 

278 

279 tmin, tmax, nslcs, deltats = self.prescan(tinc, tpad) 

280 

281 for nslc, deltats in zip(nslcs, deltats): 

282 if len(deltats) > 1: 

283 self.fail( 

284 'Multiple sample rates found for channel %s.%s.%s.%s.' 

285 % nslc) 

286 

287 ncols = int(len(nslcs) // 5 + 1) 

288 nrows = (len(nslcs)-1) // ncols + 1 

289 

290 frame = self.smartplot_frame( 

291 'Spectrogram %i' % (self.iframe + 1), 

292 ['time'] * ncols, 

293 ['frequency'] * nrows, 

294 ['psd']) 

295 

296 self.iframe += 1 

297 

298 zvalues = [] 

299 for i, nslc in enumerate(nslcs): 

300 

301 t, f, a = self.make_spectrogram( 

302 tmin, tmax, tinc, tpad, nslc, deltats[0]) 

303 

304 if self.color_scale == 'log': 

305 a = num.log(a) 

306 elif self.color_scale == 'sqrt': 

307 a = num.sqrt(a) 

308 

309 icol, irow = i % ncols, i // ncols 

310 

311 axes = frame.plot.axes(icol, nrows-1-irow) 

312 

313 min_a = num.nanmin(a) 

314 max_a = num.nanmax(a) 

315 mean_a = num.nanmean(a) 

316 std_a = num.nanstd(a) 

317 

318 zmin = max(min_a, mean_a - 3.0 * std_a) 

319 zmax = min(max_a, mean_a + 3.0 * std_a) 

320 

321 zvalues.append(zmin) 

322 zvalues.append(zmax) 

323 

324 c = axes.pcolormesh( 

325 t, f, a, 

326 cmap=get_cmap(self.ctb_name), 

327 shading='gouraud') 

328 frame.plot.set_color_dim(c, 'psd') 

329 

330 if self.fmin is not None: 

331 fmin = self.fmin 

332 else: 

333 fmin = 2.0 / self.twin 

334 

335 if self.fmax is not None: 

336 fmax = self.fmax 

337 else: 

338 fmax = f[-1] 

339 

340 axes.set_title( 

341 '.'.join(x for x in nslc if x), 

342 ha='right', 

343 va='top', 

344 x=0.99, 

345 y=0.9) 

346 

347 axes.grid(color='black', alpha=0.3) 

348 

349 plot.mpl_time_axis(axes, 10. / ncols) 

350 

351 for i in range(len(nslcs), ncols*nrows): 

352 icol, irow = i % ncols, i // ncols 

353 axes = frame.plot.axes(icol, nrows-1-irow) 

354 axes.set_axis_off() 

355 

356 if self.color_scale == 'log': 

357 label = 'log PSD' 

358 elif self.color_scale == 'sqrt': 

359 label = 'sqrt PSD' 

360 else: 

361 label = 'PSD' 

362 

363 frame.plot.set_label('psd', label) 

364 frame.plot.colorbar('psd') 

365 frame.plot.set_lim('time', t[0], t[-1]) 

366 frame.plot.set_lim('frequency', fmin, fmax) 

367 frame.plot.set_lim('psd', min(zvalues), max(zvalues)) 

368 frame.plot.set_label('time', '') 

369 frame.plot.set_label('frequency', 'Frequency [Hz]') 

370 

371 frame.draw() 

372 

373 

374def __snufflings__(): 

375 '''Returns a list of snufflings to be exported by this module.''' 

376 return [Spectrogram()]