1import math
2import logging
3import numpy as num
4from matplotlib.colors import LinearSegmentedColormap
5from matplotlib import cm
7from pyrocko import plot, trace
8from ..snuffling import Snuffling, Param, Choice
10logger = logging.getLogger('pyrocko.gui.snufflings.spectrogram')
13def to01(c):
14 return c[0]/255., c[1]/255., c[2]/255.
17def desat(c, a):
18 cmean = (c[0] + c[1] + c[2]) / 3.
19 return tuple(cc*a + cmean*(1.0-a) for cc in c)
22name_to_taper = {
23 'Hanning': num.hanning,
24 'Hamming': num.hamming,
25 'Blackman': num.blackman,
26 'Bartlett': num.bartlett}
28cmap_colors = [plot.tango_colors[x] for x in [
29 'skyblue1', 'chameleon1', 'butter1', 'orange1', 'scarletred1', 'plum3']]
31name_to_cmap = {
32 'spectro': LinearSegmentedColormap.from_list(
33 'spectro', [desat(to01(c), 0.8) for c in cmap_colors])}
36def get_cmap(name):
37 if name in name_to_cmap:
38 return name_to_cmap[name]
39 else:
40 return cm.get_cmap(name)
43def downsample_plan(n, n_max):
44 n_mean = (n-1) // n_max + 1
45 n_new = n // n_mean
46 return n_new, n_mean
49class Spectrogram(Snuffling):
51 '''
52 <html>
53 <body>
54 <h1>Plot spectrogram</h1>
55 <p>Plots a basic spectrogram.</p>
56 </body>
57 </html>
58 '''
60 def setup(self):
61 '''Customization of the snuffling.'''
63 self.set_name('Spectrogram')
64 self.add_parameter(
65 Param('Window length [s]:', 'twin', 100, 0.1, 10000.))
67 self.add_parameter(
68 Param('Overlap [%]:', 'overlap', 75., 0., 99.))
70 self.add_parameter(
71 Choice('Taper function', 'taper_name', 'Hanning',
72 ['Hanning', 'Hamming', 'Blackman', 'Bartlett']))
74 self.add_parameter(Param(
75 'Fmin [Hz]', 'fmin', None, 0.001, 50.,
76 low_is_none=True))
78 self.add_parameter(Param(
79 'Fmax [Hz]', 'fmax', None, 0.001, 50.,
80 high_is_none=True))
82 self.add_parameter(
83 Choice('Color scale', 'color_scale', 'log',
84 ['log', 'sqrt', 'lin']))
86 self.add_parameter(
87 Choice('Color table', 'ctb_name', 'spectro',
88 ['spectro', 'rainbow']))
90 self.set_live_update(False)
91 self._tapers = {}
92 self.nt_max = 2000
93 self.nf_max = 500
94 self.iframe = 0
96 def get_taper(self, name, n):
98 taper_key = (name, n)
100 if taper_key not in self._tapers:
101 self._tapers[taper_key] = name_to_taper[name](n)
103 return self._tapers[taper_key]
105 def make_spectrogram(self, tmin, tmax, tinc, tpad, nslc, deltat):
107 nt = int(round((tmax - tmin) / tinc))
109 nsamples_want = int(
110 math.floor((tinc + 2*tpad) / deltat))
112 nsamples_want_pad = trace.nextpow2(nsamples_want)
113 nf = nsamples_want_pad // 2 + 1
114 df = 1.0 / (deltat * nsamples_want_pad)
116 if self.fmin is not None:
117 ifmin = int(math.ceil(self.fmin / df))
118 else:
119 ifmin = 0
121 if self.fmax is not None:
122 ifmax = int(math.floor(self.fmax / df)) + 1
123 else:
124 ifmax = nf
126 nf_show = ifmax - ifmin
127 assert nf_show >= 2
129 nf_show_ds, nf_mean = downsample_plan(nf_show, self.nf_max)
131 amps = num.zeros((nf_show_ds, nt))
132 amps.fill(num.nan)
134 win = self.get_taper(self.taper_name, nsamples_want)
136 for batch in self.chopper_selected_traces(
137 tinc=tinc,
138 tpad=tpad+deltat,
139 want_incomplete=False,
140 fallback=True,
141 trace_selector=lambda tr: tr.nslc_id == nslc,
142 mode='inview',
143 style='batch',
144 progress='Calculating Spectrogram'):
146 for tr in batch.traces:
147 if tr.deltat != deltat:
148 self.fail(
149 'Unexpected sampling rate on channel %s.%s.%s.%s: %g'
150 % (tr.nslc_id + (tr.deltat,)))
152 trs = batch.traces
154 if len(trs) != 1:
155 continue
157 tr = trs[0]
159 if deltat is None:
160 deltat = tr.deltat
162 else:
163 if tr.deltat != deltat:
164 raise Exception('Unexpected sampling rate.')
166 tr.chop(tmin - tpad, tr.tmax, inplace=True)
167 tr.set_ydata(tr.ydata[:nsamples_want])
169 if tr.data_len() != nsamples_want:
170 logger.info('incomplete trace')
171 continue
173 tr.ydata = tr.ydata.astype(num.float64)
174 tr.ydata -= tr.ydata.mean()
176 tr.ydata *= win
178 f, a = tr.spectrum(pad_to_pow2=True)
179 assert nf == f.size
180 assert (nf-1)*df == f[-1]
182 f = f[ifmin:ifmax]
183 a = a[ifmin:ifmax]
185 a = num.abs(a)
186 # a /= self.cached_abs_response(sq.get_response(tr), f)
188 a **= 2
189 a *= tr.deltat * 2. / (df*num.sum(win**2))
191 if nf_mean != 1:
192 nf_trim = (nf_show // nf_mean) * nf_mean
193 f = num.mean(f[:nf_trim].reshape((-1, nf_mean)), axis=1)
194 a = num.mean(a[:nf_trim].reshape((-1, nf_mean)), axis=1)
196 tmid = 0.5*(tr.tmax + tr.tmin)
198 it = int(round((tmid-tmin)/tinc))
199 if it < 0 or nt <= it:
200 continue
202 amps[:, it] = a
203 have_data = True
205 if not have_data:
206 self.fail(
207 'No data could be extracted for channel: %s.%s.%s.%s' % nslc)
209 t = tmin + 0.5 * tinc + tinc * num.arange(nt)
210 return t, f, amps
212 def get_selected_time_range(self):
213 markers = self.get_viewer().selected_markers()
214 times = []
215 for marker in markers:
216 times.append(marker.tmin)
217 times.append(marker.tmax)
219 if times:
220 return (min(times), max(times))
221 else:
222 return self.get_viewer().get_time_range()
224 def prescan(self, tinc, tpad):
226 tmin, tmax = self.get_selected_time_range()
227 nt = int(round((tmax - tmin) / tinc))
228 if nt > self.nt_max:
229 _, nt_mean = downsample_plan(nt, self.nt_max)
230 self.fail(
231 'Number of samples in spectrogram time axis: %i. Resulting '
232 'image will be unreasonably large. Consider increasing window '
233 'length to about %g s.' % (
234 nt, (tinc*nt_mean) / (1.-self.overlap/100.)))
236 data = {}
237 times = []
238 for batch in self.chopper_selected_traces(
239 tinc=tinc, tpad=tpad, want_incomplete=False, fallback=True,
240 load_data=False, mode='inview', style='batch'):
242 times.append(batch.tmin)
243 times.append(batch.tmax)
245 for tr in batch.traces:
246 nslc = tr.nslc_id
247 if nslc not in data:
248 data[nslc] = set()
250 data[nslc].add(tr.deltat)
252 nslcs = sorted(data.keys())
253 deltats = [sorted(data[nslc]) for nslc in nslcs]
254 return min(times), max(times), nslcs, deltats
256 def call(self):
257 '''Main work routine of the snuffling.'''
259 tpad = self.twin * self.overlap/100. * 0.5
260 tinc = self.twin - 2 * tpad
262 tmin, tmax, nslcs, deltats = self.prescan(tinc, tpad)
264 for nslc, deltats in zip(nslcs, deltats):
265 if len(deltats) > 1:
266 self.fail(
267 'Multiple sample rates found for channel %s.%s.%s.%s.'
268 % nslc)
270 ncols = int(len(nslcs) // 5 + 1)
271 nrows = (len(nslcs)-1) // ncols + 1
273 frame = self.smartplot_frame(
274 'Spectrogram %i' % (self.iframe + 1),
275 ['time'] * ncols,
276 ['frequency'] * nrows,
277 ['psd'])
279 self.iframe += 1
281 zvalues = []
282 for i, nslc in enumerate(nslcs):
284 t, f, a = self.make_spectrogram(
285 tmin, tmax, tinc, tpad, nslc, deltats[0])
287 if self.color_scale == 'log':
288 a = num.log(a)
289 elif self.color_scale == 'sqrt':
290 a = num.sqrt(a)
292 icol, irow = i % ncols, i // ncols
294 axes = frame.plot.axes(icol, nrows-1-irow)
296 min_a = num.nanmin(a)
297 max_a = num.nanmax(a)
298 mean_a = num.nanmean(a)
299 std_a = num.nanstd(a)
301 zmin = max(min_a, mean_a - 3.0 * std_a)
302 zmax = min(max_a, mean_a + 3.0 * std_a)
304 zvalues.append(zmin)
305 zvalues.append(zmax)
307 c = axes.pcolormesh(
308 t, f, a,
309 cmap=get_cmap(self.ctb_name),
310 shading='gouraud')
311 frame.plot.set_color_dim(c, 'psd')
313 if self.fmin is not None:
314 fmin = self.fmin
315 else:
316 fmin = 2.0 / self.twin
318 if self.fmax is not None:
319 fmax = self.fmax
320 else:
321 fmax = f[-1]
323 axes.set_title(
324 '.'.join(x for x in nslc if x),
325 ha='right',
326 va='top',
327 x=0.99,
328 y=0.9)
330 axes.grid(color='black', alpha=0.3)
332 plot.mpl_time_axis(axes, 10. / ncols)
334 for i in range(len(nslcs), ncols*nrows):
335 icol, irow = i % ncols, i // ncols
336 axes = frame.plot.axes(icol, nrows-1-irow)
337 axes.set_axis_off()
339 if self.color_scale == 'log':
340 label = 'log PSD'
341 elif self.color_scale == 'sqrt':
342 label = 'sqrt PSD'
343 else:
344 label = 'PSD'
346 frame.plot.set_label('psd', label)
347 frame.plot.colorbar('psd')
348 frame.plot.set_lim('time', t[0], t[-1])
349 frame.plot.set_lim('frequency', fmin, fmax)
350 frame.plot.set_lim('psd', min(zvalues), max(zvalues))
351 frame.plot.set_label('time', '')
352 frame.plot.set_label('frequency', 'Frequency [Hz]')
354 frame.draw()
357def __snufflings__():
358 '''Returns a list of snufflings to be exported by this module.'''
359 return [Spectrogram()]