1import math
2import logging
3import numpy as num
4from matplotlib.colors import LinearSegmentedColormap
5from matplotlib import cm
7from pyrocko import plot, trace
8from ..snuffling import Snuffling, Param, Choice
10logger = logging.getLogger('pyrocko.gui.snufflings.spectrogram')
13def to01(c):
14 return c[0]/255., c[1]/255., c[2]/255.
17def desat(c, a):
18 cmean = (c[0] + c[1] + c[2]) / 3.
19 return tuple(cc*a + cmean*(1.0-a) for cc in c)
22name_to_taper = {
23 'Hanning': num.hanning,
24 'Hamming': num.hamming,
25 'Blackman': num.blackman,
26 'Bartlett': num.bartlett}
28cmap_colors = [plot.tango_colors[x] for x in [
29 'skyblue1', 'chameleon1', 'butter1', 'orange1', 'scarletred1', 'plum3']]
31name_to_cmap = {
32 'spectro': LinearSegmentedColormap.from_list(
33 'spectro', [desat(to01(c), 0.8) for c in cmap_colors])}
36def get_cmap(name):
37 if name in name_to_cmap:
38 return name_to_cmap[name]
39 else:
40 return cm.get_cmap(name)
43def downsample_plan(n, n_max):
44 n_mean = (n-1) // n_max + 1
45 n_new = n // n_mean
46 return n_new, n_mean
49class Spectrogram(Snuffling):
51 '''
52 <html>
53 <body>
54 <h1>Plot spectrogram</h1>
55 <p>Plots a basic spectrogram.</p>
56 </body>
57 </html>
58 '''
60 def setup(self):
61 '''Customization of the snuffling.'''
63 self.set_name('Spectrogram')
64 self.add_parameter(
65 Param('Window length [s]:', 'twin', 100, 0.1, 10000.))
67 self.add_parameter(
68 Param('Overlap [%]:', 'overlap', 75., 0., 99.))
70 self.add_parameter(
71 Choice('Taper function', 'taper_name', 'Hanning',
72 ['Hanning', 'Hamming', 'Blackman', 'Bartlett']))
74 self.add_parameter(Param(
75 'Fmin [Hz]', 'fmin', None, 0.001, 50.,
76 low_is_none=True))
78 self.add_parameter(Param(
79 'Fmax [Hz]', 'fmax', None, 0.001, 50.,
80 high_is_none=True))
82 self.add_parameter(
83 Choice('Color scale', 'color_scale', 'log',
84 ['log', 'sqrt', 'lin']))
86 self.add_parameter(
87 Choice('Color table', 'ctb_name', 'spectro',
88 ['spectro', 'rainbow']))
90 self.set_live_update(False)
91 self._tapers = {}
92 self.nt_max = 2000
93 self.nf_max = 500
94 self.iframe = 0
96 def panel_visibility_changed(self, visible):
97 viewer = self.get_viewer()
98 if visible:
99 viewer.pile_has_changed_signal.connect(self.adjust_controls)
100 self.adjust_controls()
102 else:
103 viewer.pile_has_changed_signal.disconnect(self.adjust_controls)
105 def adjust_controls(self):
106 viewer = self.get_viewer()
107 dtmin, dtmax = viewer.content_deltat_range()
108 maxfreq = 0.5/dtmin
109 minfreq = (0.5/dtmax)*0.001
110 self.set_parameter_range('fmin', minfreq, maxfreq)
111 self.set_parameter_range('fmax', minfreq, maxfreq)
113 def get_taper(self, name, n):
115 taper_key = (name, n)
117 if taper_key not in self._tapers:
118 self._tapers[taper_key] = name_to_taper[name](n)
120 return self._tapers[taper_key]
122 def make_spectrogram(self, tmin, tmax, tinc, tpad, nslc, deltat):
124 nt = int(round((tmax - tmin) / tinc))
126 nsamples_want = int(
127 math.floor((tinc + 2*tpad) / deltat))
129 nsamples_want_pad = trace.nextpow2(nsamples_want)
130 nf = nsamples_want_pad // 2 + 1
131 df = 1.0 / (deltat * nsamples_want_pad)
133 if self.fmin is not None:
134 ifmin = int(math.ceil(self.fmin / df))
135 else:
136 ifmin = 0
138 if self.fmax is not None:
139 ifmax = min(int(math.floor(self.fmax / df)) + 1, nf)
140 else:
141 ifmax = nf
143 nf_show = ifmax - ifmin
144 assert nf_show >= 2
146 nf_show_ds, nf_mean = downsample_plan(nf_show, self.nf_max)
148 amps = num.zeros((nf_show_ds, nt))
149 amps.fill(num.nan)
151 win = self.get_taper(self.taper_name, nsamples_want)
153 for batch in self.chopper_selected_traces(
154 tinc=tinc,
155 tpad=tpad+deltat,
156 want_incomplete=False,
157 fallback=True,
158 trace_selector=lambda tr: tr.nslc_id == nslc,
159 mode='inview',
160 style='batch',
161 progress='Calculating Spectrogram'):
163 for tr in batch.traces:
164 if tr.deltat != deltat:
165 self.fail(
166 'Unexpected sampling rate on channel %s.%s.%s.%s: %g'
167 % (tr.nslc_id + (tr.deltat,)))
169 trs = batch.traces
171 if len(trs) != 1:
172 continue
174 tr = trs[0]
176 if deltat is None:
177 deltat = tr.deltat
179 else:
180 if tr.deltat != deltat:
181 raise Exception('Unexpected sampling rate.')
183 tr.chop(tmin - tpad, tr.tmax, inplace=True)
184 tr.set_ydata(tr.ydata[:nsamples_want])
186 if tr.data_len() != nsamples_want:
187 logger.info('incomplete trace')
188 continue
190 tr.ydata = tr.ydata.astype(num.float64)
191 tr.ydata -= tr.ydata.mean()
193 tr.ydata *= win
195 f, a = tr.spectrum(pad_to_pow2=True)
196 assert nf == f.size
197 assert (nf-1)*df == f[-1]
199 f = f[ifmin:ifmax]
200 a = a[ifmin:ifmax]
202 a = num.abs(a)
203 # a /= self.cached_abs_response(sq.get_response(tr), f)
205 a **= 2
206 a *= tr.deltat * 2. / (df*num.sum(win**2))
208 if nf_mean != 1:
209 nf_trim = (nf_show // nf_mean) * nf_mean
210 f = num.mean(f[:nf_trim].reshape((-1, nf_mean)), axis=1)
211 a = num.mean(a[:nf_trim].reshape((-1, nf_mean)), axis=1)
213 tmid = 0.5*(tr.tmax + tr.tmin)
215 it = int(round((tmid-tmin)/tinc))
216 if it < 0 or nt <= it:
217 continue
219 amps[:, it] = a
220 have_data = True
222 if not have_data:
223 self.fail(
224 'No data could be extracted for channel: %s.%s.%s.%s' % nslc)
226 t = tmin + 0.5 * tinc + tinc * num.arange(nt)
227 return t, f, amps
229 def get_selected_time_range(self):
230 markers = self.get_viewer().selected_markers()
231 times = []
232 for marker in markers:
233 times.append(marker.tmin)
234 times.append(marker.tmax)
236 if times:
237 return (min(times), max(times))
238 else:
239 return self.get_viewer().get_time_range()
241 def prescan(self, tinc, tpad):
243 tmin, tmax = self.get_selected_time_range()
244 nt = int(round((tmax - tmin) / tinc))
245 if nt > self.nt_max:
246 _, nt_mean = downsample_plan(nt, self.nt_max)
247 self.fail(
248 'Number of samples in spectrogram time axis: %i. Resulting '
249 'image will be unreasonably large. Consider increasing window '
250 'length to about %g s.' % (
251 nt, (tinc*nt_mean) / (1.-self.overlap/100.)))
253 data = {}
254 times = []
255 for batch in self.chopper_selected_traces(
256 tinc=tinc, tpad=tpad, want_incomplete=False, fallback=True,
257 load_data=False, mode='inview', style='batch'):
259 times.append(batch.tmin)
260 times.append(batch.tmax)
262 for tr in batch.traces:
263 nslc = tr.nslc_id
264 if nslc not in data:
265 data[nslc] = set()
267 data[nslc].add(tr.deltat)
269 nslcs = sorted(data.keys())
270 deltats = [sorted(data[nslc]) for nslc in nslcs]
271 return min(times), max(times), nslcs, deltats
273 def call(self):
274 '''Main work routine of the snuffling.'''
276 tpad = self.twin * self.overlap/100. * 0.5
277 tinc = self.twin - 2 * tpad
279 tmin, tmax, nslcs, deltats = self.prescan(tinc, tpad)
281 for nslc, deltats in zip(nslcs, deltats):
282 if len(deltats) > 1:
283 self.fail(
284 'Multiple sample rates found for channel %s.%s.%s.%s.'
285 % nslc)
287 ncols = int(len(nslcs) // 5 + 1)
288 nrows = (len(nslcs)-1) // ncols + 1
290 frame = self.smartplot_frame(
291 'Spectrogram %i' % (self.iframe + 1),
292 ['time'] * ncols,
293 ['frequency'] * nrows,
294 ['psd'])
296 self.iframe += 1
298 zvalues = []
299 for i, nslc in enumerate(nslcs):
301 t, f, a = self.make_spectrogram(
302 tmin, tmax, tinc, tpad, nslc, deltats[0])
304 if self.color_scale == 'log':
305 a = num.log(a)
306 elif self.color_scale == 'sqrt':
307 a = num.sqrt(a)
309 icol, irow = i % ncols, i // ncols
311 axes = frame.plot.axes(icol, nrows-1-irow)
313 min_a = num.nanmin(a)
314 max_a = num.nanmax(a)
315 mean_a = num.nanmean(a)
316 std_a = num.nanstd(a)
318 zmin = max(min_a, mean_a - 3.0 * std_a)
319 zmax = min(max_a, mean_a + 3.0 * std_a)
321 zvalues.append(zmin)
322 zvalues.append(zmax)
324 c = axes.pcolormesh(
325 t, f, a,
326 cmap=get_cmap(self.ctb_name),
327 shading='gouraud')
328 frame.plot.set_color_dim(c, 'psd')
330 if self.fmin is not None:
331 fmin = self.fmin
332 else:
333 fmin = 2.0 / self.twin
335 if self.fmax is not None:
336 fmax = self.fmax
337 else:
338 fmax = f[-1]
340 axes.set_title(
341 '.'.join(x for x in nslc if x),
342 ha='right',
343 va='top',
344 x=0.99,
345 y=0.9)
347 axes.grid(color='black', alpha=0.3)
349 plot.mpl_time_axis(axes, 10. / ncols)
351 for i in range(len(nslcs), ncols*nrows):
352 icol, irow = i % ncols, i // ncols
353 axes = frame.plot.axes(icol, nrows-1-irow)
354 axes.set_axis_off()
356 if self.color_scale == 'log':
357 label = 'log PSD'
358 elif self.color_scale == 'sqrt':
359 label = 'sqrt PSD'
360 else:
361 label = 'PSD'
363 frame.plot.set_label('psd', label)
364 frame.plot.colorbar('psd')
365 frame.plot.set_lim('time', t[0], t[-1])
366 frame.plot.set_lim('frequency', fmin, fmax)
367 frame.plot.set_lim('psd', min(zvalues), max(zvalues))
368 frame.plot.set_label('time', '')
369 frame.plot.set_label('frequency', 'Frequency [Hz]')
371 frame.draw()
374def __snufflings__():
375 '''Returns a list of snufflings to be exported by this module.'''
376 return [Spectrogram()]