Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/gui/snuffler/snufflings/spectrogram.py: 20%
201 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
1import math
2import logging
3import numpy as num
4from matplotlib.colors import LinearSegmentedColormap
6from pyrocko import plot, trace
7from ..snuffling import Snuffling, Param, Choice
9logger = logging.getLogger('pyrocko.gui.snufflings.spectrogram')
12def to01(c):
13 return c[0]/255., c[1]/255., c[2]/255.
16def desat(c, a):
17 cmean = (c[0] + c[1] + c[2]) / 3.
18 return tuple(cc*a + cmean*(1.0-a) for cc in c)
21name_to_taper = {
22 'Hanning': num.hanning,
23 'Hamming': num.hamming,
24 'Blackman': num.blackman,
25 'Bartlett': num.bartlett}
27cmap_colors = [plot.tango_colors[x] for x in [
28 'skyblue1', 'chameleon1', 'butter1', 'orange1', 'scarletred1', 'plum3']]
30name_to_cmap = {
31 'spectro': LinearSegmentedColormap.from_list(
32 'spectro', [desat(to01(c), 0.8) for c in cmap_colors])}
35def get_cmap(name):
36 if name in name_to_cmap:
37 return name_to_cmap[name]
38 else:
39 return plot.mpl_get_cmap(name)
42def downsample_plan(n, n_max):
43 n_mean = (n-1) // n_max + 1
44 n_new = n // n_mean
45 return n_new, n_mean
48class Spectrogram(Snuffling):
50 '''
51 <html>
52 <body>
53 <h1>Plot spectrogram</h1>
54 <p>Plots a basic spectrogram.</p>
55 </body>
56 </html>
57 '''
59 def setup(self):
60 '''Customization of the snuffling.'''
62 self.set_name('Spectrogram')
63 self.add_parameter(
64 Param('Window length [s]:', 'twin', 100, 0.1, 10000.))
66 self.add_parameter(
67 Param('Overlap [%]:', 'overlap', 75., 0., 99.))
69 self.add_parameter(
70 Choice('Taper function', 'taper_name', 'Hanning',
71 ['Hanning', 'Hamming', 'Blackman', 'Bartlett']))
73 self.add_parameter(Param(
74 'Fmin [Hz]', 'fmin', None, 0.001, 50.,
75 low_is_none=True))
77 self.add_parameter(Param(
78 'Fmax [Hz]', 'fmax', None, 0.001, 50.,
79 high_is_none=True))
81 self.add_parameter(
82 Choice('Color scale', 'color_scale', 'log',
83 ['log', 'sqrt', 'lin']))
85 self.add_parameter(
86 Choice('Color table', 'ctb_name', 'spectro',
87 ['spectro', 'rainbow']))
89 self.set_live_update(False)
90 self._tapers = {}
91 self.nt_max = 2000
92 self.nf_max = 500
93 self.iframe = 0
95 def panel_visibility_changed(self, visible):
96 viewer = self.get_viewer()
97 if visible:
98 viewer.pile_has_changed_signal.connect(self.adjust_controls)
99 self.adjust_controls()
101 else:
102 viewer.pile_has_changed_signal.disconnect(self.adjust_controls)
104 def adjust_controls(self):
105 viewer = self.get_viewer()
106 dtmin, dtmax = viewer.content_deltat_range()
107 maxfreq = 0.5/dtmin
108 minfreq = (0.5/dtmax)*0.001
109 self.set_parameter_range('fmin', minfreq, maxfreq)
110 self.set_parameter_range('fmax', minfreq, maxfreq)
112 def get_taper(self, name, n):
114 taper_key = (name, n)
116 if taper_key not in self._tapers:
117 self._tapers[taper_key] = name_to_taper[name](n)
119 return self._tapers[taper_key]
121 def make_spectrogram(self, tmin, tmax, tinc, tpad, nslc, deltat):
123 nt = int(round((tmax - tmin) / tinc))
125 nsamples_want = int(
126 math.floor((tinc + 2*tpad) / deltat))
128 nsamples_want_pad = trace.nextpow2(nsamples_want)
129 nf = nsamples_want_pad // 2 + 1
130 df = 1.0 / (deltat * nsamples_want_pad)
132 if self.fmin is not None:
133 ifmin = int(math.ceil(self.fmin / df))
134 else:
135 ifmin = 0
137 if self.fmax is not None:
138 ifmax = min(int(math.floor(self.fmax / df)) + 1, nf)
139 else:
140 ifmax = nf
142 nf_show = ifmax - ifmin
143 assert nf_show >= 2
145 nf_show_ds, nf_mean = downsample_plan(nf_show, self.nf_max)
147 amps = num.zeros((nf_show_ds, nt))
148 amps.fill(num.nan)
150 win = self.get_taper(self.taper_name, nsamples_want)
152 for batch in self.chopper_selected_traces(
153 tinc=tinc,
154 tpad=tpad+deltat,
155 want_incomplete=False,
156 fallback=True,
157 trace_selector=lambda tr: tr.nslc_id == nslc,
158 mode='inview',
159 style='batch',
160 progress='Calculating Spectrogram'):
162 for tr in batch.traces:
163 if tr.deltat != deltat:
164 self.fail(
165 'Unexpected sampling rate on channel %s.%s.%s.%s: %g'
166 % (tr.nslc_id + (tr.deltat,)))
168 trs = batch.traces
170 if len(trs) != 1:
171 continue
173 tr = trs[0]
175 if deltat is None:
176 deltat = tr.deltat
178 else:
179 if tr.deltat != deltat:
180 raise Exception('Unexpected sampling rate.')
182 tr.chop(tmin - tpad, tr.tmax, inplace=True)
183 tr.set_ydata(tr.ydata[:nsamples_want])
185 if tr.data_len() != nsamples_want:
186 logger.info('incomplete trace')
187 continue
189 tr.ydata = tr.ydata.astype(num.float64)
190 tr.ydata -= tr.ydata.mean()
192 tr.ydata *= win
194 f, a = tr.spectrum(pad_to_pow2=True)
195 assert nf == f.size
196 assert (nf-1)*df == f[-1]
198 f = f[ifmin:ifmax]
199 a = a[ifmin:ifmax]
201 a = num.abs(a)
202 # a /= self.cached_abs_response(sq.get_response(tr), f)
204 a **= 2
205 a *= tr.deltat * 2. / (df*num.sum(win**2))
207 if nf_mean != 1:
208 nf_trim = (nf_show // nf_mean) * nf_mean
209 f = num.mean(f[:nf_trim].reshape((-1, nf_mean)), axis=1)
210 a = num.mean(a[:nf_trim].reshape((-1, nf_mean)), axis=1)
212 tmid = 0.5*(tr.tmax + tr.tmin)
214 it = int(round((tmid-tmin)/tinc))
215 if it < 0 or nt <= it:
216 continue
218 amps[:, it] = a
219 have_data = True
221 if not have_data:
222 self.fail(
223 'No data could be extracted for channel: %s.%s.%s.%s' % nslc)
225 t = tmin + 0.5 * tinc + tinc * num.arange(nt)
226 return t, f, amps
228 def get_selected_time_range(self):
229 markers = self.get_viewer().selected_markers()
230 times = []
231 for marker in markers:
232 times.append(marker.tmin)
233 times.append(marker.tmax)
235 if times:
236 return (min(times), max(times))
237 else:
238 return self.get_viewer().get_time_range()
240 def prescan(self, tinc, tpad):
242 tmin, tmax = self.get_selected_time_range()
243 nt = int(round((tmax - tmin) / tinc))
244 if nt > self.nt_max:
245 _, nt_mean = downsample_plan(nt, self.nt_max)
246 self.fail(
247 'Number of samples in spectrogram time axis: %i. Resulting '
248 'image will be unreasonably large. Consider increasing window '
249 'length to about %g s.' % (
250 nt, (tinc*nt_mean) / (1.-self.overlap/100.)))
252 data = {}
253 times = []
254 for batch in self.chopper_selected_traces(
255 tinc=tinc, tpad=tpad, want_incomplete=False, fallback=True,
256 load_data=False, mode='inview', style='batch'):
258 times.append(batch.tmin)
259 times.append(batch.tmax)
261 for tr in batch.traces:
262 nslc = tr.nslc_id
263 if nslc not in data:
264 data[nslc] = set()
266 data[nslc].add(tr.deltat)
268 nslcs = sorted(data.keys())
269 deltats = [sorted(data[nslc]) for nslc in nslcs]
270 return min(times), max(times), nslcs, deltats
272 def call(self):
273 '''Main work routine of the snuffling.'''
275 tpad = self.twin * self.overlap/100. * 0.5
276 tinc = self.twin - 2 * tpad
278 tmin, tmax, nslcs, deltats = self.prescan(tinc, tpad)
280 for nslc, deltats in zip(nslcs, deltats):
281 if len(deltats) > 1:
282 self.fail(
283 'Multiple sample rates found for channel %s.%s.%s.%s.'
284 % nslc)
286 ncols = int(len(nslcs) // 5 + 1)
287 nrows = (len(nslcs)-1) // ncols + 1
289 frame = self.smartplot_frame(
290 'Spectrogram %i' % (self.iframe + 1),
291 ['time'] * ncols,
292 ['frequency'] * nrows,
293 ['psd'])
295 self.iframe += 1
297 zvalues = []
298 for i, nslc in enumerate(nslcs):
300 t, f, a = self.make_spectrogram(
301 tmin, tmax, tinc, tpad, nslc, deltats[0])
303 if self.color_scale == 'log':
304 a = num.log(a)
305 elif self.color_scale == 'sqrt':
306 a = num.sqrt(a)
308 icol, irow = i % ncols, i // ncols
310 axes = frame.plot.axes(icol, nrows-1-irow)
312 min_a = num.nanmin(a)
313 max_a = num.nanmax(a)
314 mean_a = num.nanmean(a)
315 std_a = num.nanstd(a)
317 zmin = max(min_a, mean_a - 3.0 * std_a)
318 zmax = min(max_a, mean_a + 3.0 * std_a)
320 zvalues.append(zmin)
321 zvalues.append(zmax)
323 c = axes.pcolormesh(
324 t, f, a,
325 cmap=get_cmap(self.ctb_name),
326 shading='gouraud')
327 frame.plot.set_color_dim(c, 'psd')
329 if self.fmin is not None:
330 fmin = self.fmin
331 else:
332 fmin = 2.0 / self.twin
334 if self.fmax is not None:
335 fmax = self.fmax
336 else:
337 fmax = f[-1]
339 axes.set_title(
340 '.'.join(x for x in nslc if x),
341 ha='right',
342 va='top',
343 x=0.99,
344 y=0.9)
346 axes.grid(color='black', alpha=0.3)
348 plot.mpl_time_axis(axes, 10. / ncols)
350 for i in range(len(nslcs), ncols*nrows):
351 icol, irow = i % ncols, i // ncols
352 axes = frame.plot.axes(icol, nrows-1-irow)
353 axes.set_axis_off()
355 if self.color_scale == 'log':
356 label = 'log PSD'
357 elif self.color_scale == 'sqrt':
358 label = 'sqrt PSD'
359 else:
360 label = 'PSD'
362 frame.plot.set_label('psd', label)
363 frame.plot.colorbar('psd')
364 frame.plot.set_lim('time', t[0], t[-1])
365 frame.plot.set_lim('frequency', fmin, fmax)
366 frame.plot.set_lim('psd', min(zvalues), max(zvalues))
367 frame.plot.set_label('time', '')
368 frame.plot.set_label('frequency', 'Frequency [Hz]')
370 frame.draw()
373def __snufflings__():
374 '''Returns a list of snufflings to be exported by this module.'''
375 return [Spectrogram()]