# http://pyrocko.org - GPLv3 # # The Pyrocko Developers, 21st Century # ---|P------/S----------~Lg---------- This module provides basic signal processing for seismic traces. '''
StringChoice, Timestamp
''' Create new trace object.
A ``Trace`` object represents a single continuous strip of evenly sampled time series data. It is built from a 1D NumPy array containing the data samples and some attributes describing its beginning and ending time, its sampling rate and four string identifiers (its network, station, location and channel code).
:param network: network code :param station: station code :param location: location code :param channel: channel code :param tmin: system time of first sample in [s] :param tmax: system time of last sample in [s] (if set to ``None`` it is computed from length of ``ydata``) :param deltat: sampling interval in [s] :param ydata: 1D numpy array with data samples (can be ``None`` when ``tmax`` is not ``None``) :param mtime: optional modification time
:param meta: additional meta information (not used, but maintained by the library)
The length of the network, station, location and channel codes is not resricted by this software, but data formats like SAC, Mini-SEED or GSE have different limits on the lengths of these codes. The codes set here are silently truncated when the trace is stored '''
tmin=0., tmax=None, deltat=1., ydata=None, mtime=None, meta=None):
reuse(x) for x in (network, station, location, channel)]
else: raise Exception( 'fixme: trace must be created with tmax or ydata') else:
def __str__(self): fmt = min(9, max(0, -int(math.floor(math.log10(self.deltat))))) s = 'Trace (%s, %s, %s, %s)\n' % self.nslc_id s += ' timerange: %s - %s\n' % ( util.time_to_str(self.tmin, format=fmt), util.time_to_str(self.tmax, format=fmt))
s += ' delta t: %g\n' % self.deltat if self.meta: for k in sorted(self.meta.keys()): s += ' %s: %s\n' % (k, self.meta[k]) return s
self.tmin, self.tmax, self.deltat, self.mtime, self.ydata, self.meta)
self.tmin, self.tmax, self.deltat, self.mtime, \ self.ydata, self.meta = state
else: # backward compatibility with old behaviour self.network, self.station, self.location, self.channel, \ self.tmin, self.tmax, self.deltat, self.mtime = state self.ydata = None self.meta = None
sha1.update(memoryview(self.ydata)) else:
''' Get a short string description. '''
s = '%s.%s.%s.%s, %s, %s' % (self.nslc_id + ( util.time_to_str(self.tmin), util.time_to_str(self.tmax)))
return s
isinstance(other, Trace) and self.network == other.network and self.station == other.station and self.location == other.location and self.channel == other.channel and (abs(self.deltat - other.deltat) < (self.deltat + other.deltat)*1e-6) and abs(self.tmin-other.tmin) < self.deltat*0.01 and abs(self.tmax-other.tmax) < self.deltat*0.01 and num.all(self.ydata == other.ydata))
return ( self.network == other.network and self.station == other.station and self.location == other.location and self.channel == other.channel and (abs(self.deltat - other.deltat) < (self.deltat + other.deltat)*1e-6) and abs(self.tmin-other.tmin) < self.deltat*0.01 and abs(self.tmax-other.tmax) < self.deltat*0.01 and num.allclose(self.ydata, other.ydata, rtol=rtol, atol=atol))
assert self.network == other.network, \ 'network codes differ: %s, %s' % (self.network, other.network) assert self.station == other.station, \ 'station codes differ: %s, %s' % (self.station, other.station) assert self.location == other.location, \ 'location codes differ: %s, %s' % (self.location, other.location) assert self.channel == other.channel, 'channel codes differ' assert (abs(self.deltat - other.deltat) < (self.deltat + other.deltat)*1e-6), \ 'sampling intervals differ %g, %g' % (self.deltat, other.delta) assert abs(self.tmin-other.tmin) < self.deltat*0.01, \ 'start times differ: %s, %s' % ( util.time_to_str(self.tmin), util.time_to_str(other.tmin)) assert abs(self.tmax-other.tmax) < self.deltat*0.01, \ 'end times differ: %s, %s' % ( util.time_to_str(self.tmax), util.time_to_str(other.tmax))
assert num.allclose(self.ydata, other.ydata, rtol=rtol, atol=atol), \ 'trace values differ'
it = max(0, min(it, self.ydata.size-1)) else: raise IndexError()
''' Value of trace between supporting points through linear interpolation.
:param t: time instant :param clip: whether to clip indices to trace ends '''
return y0 else:
''' Clip index to valid range. '''
return min(max(0, i), self.ydata.size)
''' Add values of other trace (self += other).
Add values of ``other`` trace to the values of ``self``, where it intersects with ``other``. This method does not change the extent of ``self``. If ``interpolate`` is ``True`` (the default), the values of ``other`` to be added are interpolated at sampling instants of ``self``. Linear interpolation is performed. In this case the sampling rate of ``other`` must be equal to or lower than that of ``self``. If ``interpolate`` is ``False``, the sampling rates of the two traces must match. '''
assert self.deltat <= other.deltat \ or same_sampling_rate(self, other)
other_xdata = other.get_xdata() xdata = self.get_xdata() self.ydata += num.interp( xdata, other_xdata, other.ydata, left=left, right=left) else:
''' Muliply with values of other trace ``(self *= other)``.
Multiply values of ``other`` trace to the values of ``self``, where it intersects with ``other``. This method does not change the extent of ``self``. If ``interpolate`` is ``True`` (the default), the values of ``other`` to be multiplied are interpolated at sampling instants of ``self``. Linear interpolation is performed. In this case the sampling rate of ``other`` must be equal to or lower than that of ``self``. If ``interpolate`` is ``False``, the sampling rates of the two traces must match. '''
if interpolate: assert self.deltat <= other.deltat or \ same_sampling_rate(self, other)
other_xdata = other.get_xdata() xdata = self.get_xdata() self.ydata *= num.interp( xdata, other_xdata, other.ydata, left=0., right=0.) else: assert self.deltat == other.deltat ibeg1 = int(round((other.tmin-self.tmin)/self.deltat)) ibeg2 = int(round((self.tmin-other.tmin)/self.deltat)) iend1 = int(round((other.tmax-self.tmin)/self.deltat))+1 iend2 = int(round((self.tmax-other.tmin)/self.deltat))+1
ibeg1 = self.index_clip(ibeg1) iend1 = self.index_clip(iend1) ibeg2 = self.index_clip(ibeg2) iend2 = self.index_clip(iend2)
self.ydata[ibeg1:iend1] *= other.ydata[ibeg2:iend2]
''' Get time and value of data maximum. '''
''' Get time and value of data minimum. '''
''' Get time and value of maximum of the absolute of data. '''
else:
self, network=None, station=None, location=None, channel=None):
''' Set network, station, location, and channel codes. '''
self.network = network self._update_ids()
self.channel = channel self._update_ids()
''' Check if trace has overlap with a given time span. '''
''' Check if trace has overlap with a given time span and matches a condition callback. (internal use) '''
and (selector is None or selector(self))
''' Update dependent ids. '''
self.network, self.station, self.location, self.channel, self.tmin) (self.network, self.station, self.location, self.channel))
util.deuse(self.nslc_id) util.deuse(self.network) util.deuse(self.station) util.deuse(self.location) util.deuse(self.channel)
''' Set modification time of the trace. '''
''' Create array for time axis. '''
raise NoData()
+ num.arange(len(self.ydata), dtype=num.float64) * self.deltat
''' Get data array. '''
raise NoData()
''' Replace data array. '''
else:
''' Forget data, make dataless trace. '''
''' Detach the traces grow buffer. '''
''' Make a deep copy of the trace. '''
''' Remove any zeros at beginning and end. '''
raise NoData()
return
''' Append data to the end of the trace.
To make this method efficient when successively very few or even single samples are appended, a larger grow buffer is allocated upon first invocation. The traces data is then changed to be a view into the currently filled portion of the grow buffer array. '''
self, tmin, tmax, inplace=True, include_last=False, snap=(round, round), want_incomplete=True):
''' Cut the trace to given time span.
If the ``inplace`` argument is True (the default) the trace is cut in place, otherwise a new trace with the cut part is returned. By default, the indices where to start and end the trace data array are determined by rounding of ``tmin`` and ``tmax`` to sampling instances using Python's :py:func:`round` function. This behaviour can be changed with the ``snap`` argument, which takes a tuple of two functions (one for the lower and one for the upper end) to be used instead of :py:func:`round`. The last sample is by default not included unless ``include_last`` is set to True. If the given time span exceeds the available time span of the trace, the available part is returned, unless ``want_incomplete`` is set to False - in that case, a :py:exc:`NoData` exception is raised. This exception is always raised, when the requested time span does dot overlap with the trace's time span. '''
raise NoData() else: if tmin < self.tmin or self.tmax < tmax: raise NoData()
self.data_len(), t2ind(tmax-self.tmin, self.deltat, snap[1])+iplus)
raise NoData()
else: obj.ydata = None
''' Downsample trace by a given integer factor.
:param ndecimate: decimation factor, avoid values larger than 8 :param snap: whether to put the new sampling instances closest to multiples of the sampling rate. :param initials: ``None``, ``True``, or initial conditions for the anti-aliasing filter, obtained from a previous run. In the latter two cases the final state of the filter is returned instead of ``None``. :param demean: whether to demean the signal before filtering. '''
(math.ceil(self.tmin / newdeltat) * newdeltat - self.tmin) / self.deltat)) else:
else:
data -= num.mean(data)
data, ndecimate, ftype='fir', zi=initials, ioff=ilag) else: result = data
else: self.ydata, finals = result
initials=None, demean=False):
''' Downsample to given sampling rate.
Tries to downsample the trace to a target sampling interval of ``deltat``. This runs the :py:meth:`Trace.downsample` one or several times. If allow_upsample_max is set to a value larger than 1, intermediate upsampling steps are allowed, in order to increase the number of possible downsampling ratios.
If the requested ratio is not supported, an exception of type :py:exc:`pyrocko.util.UnavailableDecimation` is raised. '''
util.decitab(int(round(dratio))):
raise util.UnavailableDecimation('ratio = %g' % ratio)
ydata.size*upsratio-(upsratio-1), ydata.dtype) float(i)/upsratio * ydata[:-1] \ + float(upsratio-i)/upsratio * ydata[1:]
xinitials = initials[i] ndecimate, snap=snap, initials=xinitials, demean=demean))
return finals
''' Resample to given sampling rate ``deltat``.
Resampling is performed in the frequency domain. '''
ndata = self.ydata.size ntrans = nextpow2(ndata) fntrans2 = ntrans * self.deltat/deltat ntrans2 = int(round(fntrans2)) deltat2 = self.deltat * float(ntrans)/float(ntrans2) ndata2 = int(round(ndata*self.deltat/deltat2)) if abs(fntrans2 - ntrans2) > 1e-7: logger.warning( 'resample: requested deltat %g could not be matched exactly: ' '%g' % (deltat, deltat2))
data = self.ydata data_pad = num.zeros(ntrans, dtype=float) data_pad[:ndata] = data fdata = num.fft.rfft(data_pad) fdata2 = num.zeros((ntrans2+1)//2, dtype=fdata.dtype) n = min(fdata.size, fdata2.size) fdata2[:n] = fdata[:n] data2 = num.fft.irfft(fdata2) data2 = data2[:ndata2] data2 *= float(ntrans2) / float(ntrans) self.deltat = deltat2 self.set_ydata(data2)
tyear = 3600*24*365.
if deltat == self.deltat: return
if abs(self.deltat - deltat) * tyear/deltat < deltat: logger.warning( 'resample_simple: less than one sample would have to be ' 'inserted/deleted per year. Doing nothing.') return
ninterval = int(round(deltat / (self.deltat - deltat))) if abs(ninterval) < 20: logger.error( 'resample_simple: sample insertion/deletion interval less ' 'than 20. results would be erroneous.') raise ResamplingFailed()
delete = False if ninterval < 0: ninterval = - ninterval delete = True
tyearbegin = util.year_start(self.tmin)
nmin = int(round((self.tmin - tyearbegin)/deltat))
ibegin = (((nmin-1)//ninterval)+1) * ninterval - nmin nindices = (len(self.ydata) - ibegin - 1) / ninterval + 1 if nindices > 0: indices = ibegin + num.arange(nindices) * ninterval data_split = num.split(self.ydata, indices) data = [] for ln, h in zip(data_split[:-1], data_split[1:]): if delete: ln = ln[:-1]
data.append(ln) if not delete: if ln.size == 0: v = h[0] else: v = 0.5*(ln[-1] + h[0]) data.append(num.array([v], dtype=ln.dtype))
data.append(data_split[-1])
ydata_new = num.concatenate(data)
self.tmin = tyearbegin + nmin * deltat self.deltat = deltat self.set_ydata(ydata_new)
''' Stretch signal while preserving sample rate using sinc interpolation.
:param tmin_new: new time of first sample :param tmax_new: new time of last sample
This method can be used to correct for a small linear time drift or to introduce sub-sample time shifts. The amount of stretching is limited to 10% by the implementation and is expected to be much smaller than that by the approximations used. '''
n_new = int(math.floor(r))
self.ydata.astype(float), tmin_new, self.deltat, ydata_new)
def nyquist_check(self, frequency, intro='Corner frequency', warn=True, raise_exception=False):
''' Check if a given frequency is above the Nyquist frequency of the trace.
:param intro: string used to introduce the warning/error message :param warn: whether to emit a warning :param raise_exception: whether to raise an :py:exc:`AboveNyquist` exception. '''
if frequency >= 0.5/self.deltat: message = '%s (%g Hz) is equal to or higher than nyquist ' \ 'frequency (%g Hz). (Trace %s)' \ % (intro, frequency, 0.5/self.deltat, self.name()) if warn: logger.warning(message) if raise_exception: raise AboveNyquist(message)
nyquist_exception=False, demean=True):
''' Apply Butterworth lowpass to the trace.
:param order: order of the filter :param corner: corner frequency of the filter
Mean is removed before filtering. '''
corner, 'Corner frequency of lowpass', nyquist_warn, nyquist_exception)
order, [corner*2.0*self.deltat], btype='low')
logger.warning( 'Erroneous filter coefficients returned by ' 'scipy.signal.butter(). You may need to downsample the ' 'signal before filtering.')
nyquist_exception=False, demean=True):
''' Apply butterworth highpass to the trace.
:param order: order of the filter :param corner: corner frequency of the filter
Mean is removed before filtering. '''
corner, 'Corner frequency of highpass', nyquist_warn, nyquist_exception)
order, [corner*2.0*self.deltat], btype='high')
logger.warning( 'Erroneous filter coefficients returned by ' 'scipy.signal.butter(). You may need to downsample the ' 'signal before filtering.')
''' Apply butterworth bandpass to the trace.
:param order: order of the filter :param corner_hp: lower corner frequency of the filter :param corner_lp: upper corner frequency of the filter
Mean is removed before filtering. '''
order, [corner*2.0*self.deltat for corner in (corner_hp, corner_lp)], btype='band')
''' Apply bandstop (attenuates frequencies in band) to the trace.
:param order: order of the filter :param corner_hp: lower corner frequency of the filter :param corner_lp: upper corner frequency of the filter
Mean is removed before filtering. '''
order, [corner*2.0*self.deltat for corner in (corner_hp, corner_lp)], btype='bandstop')
''' Calculate the envelope of the trace.
:param inplace: calculate envelope in place
The calculation follows:
.. math::
Y' = \\sqrt{Y^2+H(Y)^2}
where H is the Hilbert-Transform of the signal Y. '''
self.drop_growbuffer() self.ydata = num.sqrt(self.ydata**2 + hilbert(self.ydata)**2) else:
''' Apply a :py:class:`Taper` to the trace.
:param taperer: instance of :py:class:`Taper` subclass :param inplace: apply taper inplace :param chop: if ``True``: exclude tapered parts from the resulting trace '''
else: tr = self
''' Whiten signal in time domain using autoregression and recursive filter.
:param order: order of the autoregression process '''
self, width, td_taper='auto', fd_taper='auto', pad_to_pow2=True, demean=True):
''' Whiten signal via frequency domain using moving average on amplitude spectra.
:param width: width of smoothing kernel [Hz] :param td_taper: time domain taper, object of type :py:class:`Taper` or ``None`` or ``'auto'``. :param fd_taper: frequency domain taper, object of type :py:class:`Taper` or ``None`` or ``'auto'``. :param pad_to_pow2: whether to pad the signal with zeros up to a length of 2^n :param demean: whether to demean the signal before tapering
The signal is first demeaned and then tapered using ``td_taper``. Then, the spectrum is calculated and inversely weighted with a smoothed version of its amplitude spectrum. A moving average is used for the smoothing. The smoothed spectrum is then tapered using ``fd_taper``. Finally, the smoothed and tapered spectrum is back-transformed into the time domain.
If ``td_taper`` is set to ``'auto'``, ``CosFader(1.0/width)`` is used. If ``fd_taper`` is set to ``'auto'``, ``CosFader(width)`` is used. '''
ndata = self.data_len()
if pad_to_pow2: ntrans = nextpow2(ndata) else: ntrans = ndata
df = 1./(ntrans*self.deltat) nw = int(round(width/df)) if ndata//2+1 <= nw: raise TraceTooShort( 'Samples in trace: %s, samples needed: %s' % (ndata, nw))
if td_taper == 'auto': td_taper = CosFader(1./width)
if fd_taper == 'auto': fd_taper = CosFader(width)
if td_taper: self.taper(td_taper)
ydata = self.get_ydata().astype(float) if demean: ydata -= ydata.mean()
spec = num.fft.rfft(ydata, ntrans)
amp = num.abs(spec) nspec = amp.size csamp = num.cumsum(amp) amp_smoothed = num.empty(nspec, dtype=csamp.dtype) n1, n2 = nw//2, nw//2 + nspec - nw amp_smoothed[n1:n2] = (csamp[nw:] - csamp[:-nw]) / nw amp_smoothed[:n1] = amp_smoothed[n1] amp_smoothed[n2:] = amp_smoothed[n2-1]
denom = amp_smoothed * amp numer = amp eps = num.mean(denom) * 1e-9 if eps == 0.0: eps = 1e-9
numer += eps denom += eps spec *= numer/denom
if fd_taper: fd_taper(spec, 0., df)
ydata = num.fft.irfft(spec) self.set_ydata(ydata[:ndata])
nf, dtype=float)
''' Apply boxcar bandbpass to trace (in spectral domain). '''
''' Time shift the trace. '''
''' Shift trace samples to nearest even multiples of the sampling rate.
:param inplace: (boolean) snap traces inplace
If ``inplace`` is ``False`` and the difference of tmin and tmax of both, the snapped and the original trace is smaller than 0.01 x deltat, :py:func:`snap` returns the unsnapped instance of the original trace. '''
else: if abs(self.tmin - tmin) < 1e-2*self.deltat and \ abs(self.tmax - tmax) < 1e-2*self.deltat: return self
xself = self.copy()
[float(xself.tmin-tref), float(xself.tmax-tref)], dtype=float)
xself.ydata.astype(float), float(tmin-tref), xself.deltat, ydata_new)
''' Try to undo sampling rate rounding errors.
See :py:func:`fix_deltat_rounding_errors`. '''
self.deltat = fix_deltat_rounding_errors(self.deltat) self.tmax = self.tmin + (self.data_len() - 1) * self.deltat
''' Run special STA/LTA filter where the short time window is centered on the long time window.
:param tshort: length of short time window in [s] :param tlong: length of long time window in [s] :param quad: whether to square the data prior to applying the STA/LTA filter :param scalingmethod: integer key to select how output values are scaled / normalized (``1``, ``2``, or ``3``)
============= ====================================== =========== Scalingmethod Implementation Range ============= ====================================== =========== ``1`` As/Al* Ts/Tl [0,1] ``2`` (As/Al - 1) / (Tl/Ts - 1) [-Ts/Tl,1] ``3`` Like ``2`` but clipping range at zero [0,1] ============= ====================================== ===========
'''
nshort = int(round(tshort/self.deltat)) nlong = int(round(tlong/self.deltat))
assert nshort < nlong if nlong > len(self.ydata): raise TraceTooShort( 'Samples in trace: %s, samples needed: %s' % (len(self.ydata), nlong))
if quad: sqrdata = self.ydata**2 else: sqrdata = self.ydata
mavg_short = moving_avg(sqrdata, nshort) mavg_long = moving_avg(sqrdata, nlong)
self.drop_growbuffer()
if scalingmethod not in (1, 2, 3): raise Exception('Invalid argument to scalingrange argument.')
if scalingmethod == 1: self.ydata = mavg_short/mavg_long * float(nshort)/float(nlong) elif scalingmethod in (2, 3): self.ydata = (mavg_short/mavg_long - 1.) \ / ((float(nlong)/float(nshort)) - 1)
if scalingmethod == 3: self.ydata = num.maximum(self.ydata, 0.)
''' Run special STA/LTA filter where the short time window is overlapping with the last part of the long time window.
:param tshort: length of short time window in [s] :param tlong: length of long time window in [s] :param quad: whether to square the data prior to applying the STA/LTA filter :param scalingmethod: integer key to select how output values are scaled / normalized (``1``, ``2``, or ``3``)
============= ====================================== =========== Scalingmethod Implementation Range ============= ====================================== =========== ``1`` As/Al* Ts/Tl [0,1] ``2`` (As/Al - 1) / (Tl/Ts - 1) [-Ts/Tl,1] ``3`` Like ``2`` but clipping range at zero [0,1] ============= ====================================== ===========
With ``scalingmethod=1``, the values produced by this variant of the STA/LTA are equivalent to
.. math:: s_i = \\frac{s}{l} \\frac{\\frac{1}{s}\\sum_{j=i}^{i+s-1} f_j} {\\frac{1}{l}\\sum_{j=i+s-l}^{i+s-1} f_j}
where :math:`f_j` are the input samples, :math:`s` are the number of samples in the short time window and :math:`l` are the number of samples in the long time window. '''
raise TraceTooShort( 'Samples in trace: %s, samples needed: %s' % (len(self.ydata), nlong))
else: sqrdata = self.ydata
raise Exception('Invalid argument to scalingrange argument.')
elif scalingmethod in (2, 3): ydata = (mavg_short/mavg_long - 1.) \ / ((float(nlong)/float(nshort)) - 1)
ydata = num.maximum(self.ydata, 0.)
tmin + (nlong - nshort) * self.deltat, tmin + (n - nshort) * self.deltat)
deadtime=False, nblock_duration_detection=100):
''' Detect peaks above a given threshold (method 1).
From every instant, where the signal rises above ``threshold``, a time length of ``tsearch`` seconds is searched for a maximum. A list with tuples (time, value) for each detected peak is returned. The ``deadtime`` argument turns on a special deadtime duration detection algorithm useful in combination with recursive STA/LTA filters. '''
len(self.ydata), itrig_pos + int(math.ceil(tsearch/self.deltat)))
ibeg = itrig_pos iblock = 0 nblock = nblock_duration_detection totalsum = 0. while True: if ibeg+iblock*nblock >= len(y): tzero = self.tmin + (len(y)-1) * self.deltat break
logy = num.log( y[ibeg+iblock*nblock:ibeg+(iblock+1)*nblock]) logy[0] += totalsum ysum = num.cumsum(logy) totalsum = ysum[-1] below = num.where(ysum <= 0., 1, 0) deriv = num.zeros(ysum.size, dtype=num.int8) deriv[1:] = below[1:]-below[:-1] izero_positions = num.nonzero(deriv > 0)[0] + iblock*nblock if len(izero_positions) > 0: tzero = self.tmin + self.deltat * ( ibeg + izero_positions[0]) break iblock += 1 else:
return tpeaks, apeaks, tzeros else:
''' Detect peaks above a given threshold (method 2).
This variant of peak detection is a bit more robust (and slower) than the one implemented in :py:meth:`Trace.peaks`. First all samples with ``a[i-1] < a[i] > a[i+1]`` are masked as potential peaks. From these, iteratively the one with the maximum amplitude ``a[j]`` and time ``t[j]`` is choosen and potential peaks within ``t[j] - tsearch, t[j] + tsearch`` are discarded. The algorithm stops, when ``a[j] < threshold`` or when no more potential peaks are left. '''
a = num.copy(self.ydata)
amin = num.min(a)
a[0] = amin a[1: -1][num.diff(a, 2) <= 0.] = amin a[-1] = amin
data = [] while True: imax = num.argmax(a) amax = a[imax]
if amax < threshold or amax == amin: break
data.append((self.tmin + imax * self.deltat, amax))
ntsearch = int(round(tsearch / self.deltat)) a[max(imax-ntsearch//2, 0):min(imax+ntsearch//2, a.size)] = amin
if data: data.sort() tpeaks, apeaks = list(zip(*data)) else: tpeaks, apeaks = [], []
return tpeaks, apeaks
''' Extend trace to given span.
:param tmin: begin time of new span :param tmax: end time of new span :param fillmethod: ``'zeros'``, ``'repeat'``, ``'mean'``, or ``'median'`` '''
else: nl = 0
else: nh = nold - 1
v = num.median(self.ydata) data[:-nl] = v data[-nl + nold:] = v v = num.mean(self.ydata) data[:-nl] = v data[-nl + nold:] = v
tfade=0., freqlimits=None, transfer_function=None, cut_off_fading=True, demean=True, invert=False):
''' Return new trace with transfer function applied (convolution).
:param tfade: rise/fall time in seconds of taper applied in timedomain at both ends of trace. :param freqlimits: 4-tuple with corner frequencies in Hz. :param transfer_function: FrequencyResponse object; must provide a method 'evaluate(freqs)', which returns the transfer function coefficients at the frequencies 'freqs'. :param cut_off_fading: whether to cut off rise/fall interval in output trace. :param demean: remove mean before applying transfer function :param invert: set to True to do a deconvolution '''
raise TraceTooShort( 'Trace %s.%s.%s.%s too short for fading length setting. ' 'trace length = %g, fading length = %g' % (self.nslc_id + (self.tmax-self.tmin, tfade)))
transfer_function is None or transfer_function.is_scalar()):
# special case for flat responses
c = 1.0/c
data *= costaper( 0., tfade, self.deltat*(ndata-1)-tfade, self.deltat*ndata, ndata, self.deltat)
else: ntrans, freqlimits, transfer_function, invert=invert)
0., tfade, self.deltat*(ndata-1)-tfade, self.deltat*ndata, ndata, self.deltat)
except NoData: raise TraceTooShort( 'Trace %s.%s.%s.%s too short for fading length setting. ' 'trace length = %g, fading length = %g' % (self.nslc_id + (self.tmax-self.tmin, tfade))) else:
''' Approximate first or second derivative of the trace.
:param n: 1 for first derivative, 2 for second :param order: order of the approximation 2 and 4 are supported :param inplace: if ``True`` the trace is differentiated in place, otherwise a new trace object with the derivative is returned.
Raises :py:exc:`ValueError` for unsupported `n` or `order`.
See :py:func:`~pyrocko.util.diff_fd` for implementation details. '''
self.ydata = ddata else:
if self._pchain: self._pchain.clear()
do_downsample, do_extend, do_pre_taper, do_fft, do_filter, do_ifft)
(self, deltat), (tmin, tmax), (setup.taper,), (setup.filter,), (setup.filter,), nocache=nocache)
else: (self, deltat), (tmin, tmax), (setup.taper,), (setup.filter,), (setup.filter,), (), nocache=nocache)
''' Calculate misfit and normalization factor against candidate trace.
:param candidate: :py:class:`Trace` object :param setup: :py:class:`MisfitSetup` object :returns: tuple ``(m, n)``, where m is the misfit value and n is the normalization divisor
If the sampling rates of ``self`` and ``candidate`` differ, the trace with the higher sampling rate will be downsampled. '''
else: ctr = correlate(aproc, bproc, mode='full', normalization='normal') ccmax = ctr.max()[1] m = 0.5 - 0.5 * ccmax n = 0.5
else:
''' Get FFT spectrum of trace.
:param pad_to_pow2: whether to zero-pad the data to next larger power-of-two length :param tfade: ``None`` or a time length in seconds, to apply cosine shaped tapers to both
:returns: a tuple with (frequencies, values) '''
ntrans = nextpow2(ndata) else:
else: ydata = self.ydata * costaper( 0., tfade, self.deltat*(ndata-1)-tfade, self.deltat*ndata, ndata, self.deltat)
class Gauss(FrequencyResponse): def __init__(self, f0, a=1.0): self._omega0 = 2.*math.pi*f0 self._a = a
def evaluate(self, freqs): omega = 2.*math.pi*freqs return num.exp(-((omega-self._omega0) / (self._a*self._omega0))**2)
freqs, coefs = self.spectrum() n = self.data_len() nfilt = len(filter_freqs) signal_tf = num.zeros((nfilt, n)) centroid_freqs = num.zeros(nfilt) for ifilt, f0 in enumerate(filter_freqs): taper = Gauss(f0, a=bandwidth) weights = taper.evaluate(freqs) nhalf = freqs.size analytic_spec = num.zeros(n, dtype=complex) analytic_spec[:nhalf] = coefs*weights
enorm = num.abs(analytic_spec[:nhalf])**2 enorm /= num.sum(enorm)
if n % 2 == 0: analytic_spec[1:nhalf-1] *= 2. else: analytic_spec[1:nhalf] *= 2.
analytic = num.fft.ifft(analytic_spec) signal_tf[ifilt, :] = num.abs(analytic)
enorm = num.abs(analytic_spec[:nhalf])**2 enorm /= num.sum(enorm) centroid_freqs[ifilt] = num.sum(freqs*enorm)
return centroid_freqs, signal_tf
self, ntrans, freqlimits, transfer_function, invert=False):
+ hi(a)*deltaf
raise InfiniteResponse('%s.%s.%s.%s' % self.nslc_id)
else:
else: raise Exception( 'transfer: `freqlimits` must be given when `invert` is ' 'set to `True`')
''' Fill string template with trace metadata.
Uses normal python '%(placeholder)s' string templates. The following placeholders are considered: ``network``, ``station``, ``location``, ``channel``, ``tmin`` (time of first sample), ``tmax`` (time of last sample), ``tmin_ms``, ``tmax_ms``, ``tmin_us``, ``tmax_us``, ``tmin_year``, ``tmax_year``, ``julianday``. The variants ending with ``'_ms'`` include milliseconds, those with ``'_us'`` include microseconds, those with ``'_year'`` contain only the year. '''
.replace('%s', '%(station)s')\ .replace('%l', '%(location)s')\ .replace('%c', '%(channel)s')\ .replace('%b', '%(tmin)s')\ .replace('%e', '%(tmax)s')\ .replace('%j', '%(julianday)s')
zip(('network', 'station', 'location', 'channel'), self.nslc_id)) self.tmin, format='%Y-%m-%d_%H-%M-%S') self.tmax, format='%Y-%m-%d_%H-%M-%S') self.tmin, format='%Y-%m-%d_%H-%M-%S.3FRAC') self.tmax, format='%Y-%m-%d_%H-%M-%S.3FRAC') self.tmin, format='%Y-%m-%d_%H-%M-%S.6FRAC') self.tmax, format='%Y-%m-%d_%H-%M-%S.6FRAC') self.tmin, format='%Y') self.tmax, format='%Y')
''' Show trace with matplotlib.
See also: :py:meth:`Trace.snuffle`. '''
import pylab pylab.plot(self.get_xdata(), self.get_ydata()) name = '%s %s %s - %s' % ( self.channel, self.station, time.strftime("%d-%m-%y %H:%M:%S", time.gmtime(self.tmin)), time.strftime("%d-%m-%y %H:%M:%S", time.gmtime(self.tmax)))
pylab.title(name) pylab.show()
''' Show trace in a snuffler window.
:param stations: list of `pyrocko.model.Station` objects or ``None`` :param events: list of `pyrocko.model.Event` objects or ``None`` :param markers: list of `pyrocko.gui.util.Marker` objects or ``None`` :param ntracks: float, number of tracks to be shown initially (default: 12) :param follow: time interval (in seconds) for real time follow mode or ``None`` :param controls: bool, whether to show the main controls (default: ``True``) :param opengl: bool, whether to use opengl (default: ``False``) '''
''' Show traces in a snuffler window.
:param stations: list of `pyrocko.model.Station` objects or ``None`` :param events: list of `pyrocko.model.Event` objects or ``None`` :param markers: list of `pyrocko.gui.util.Marker` objects or ``None`` :param ntracks: float, number of tracks to be shown initially (default: 12) :param follow: time interval (in seconds) for real time follow mode or ``None`` :param controls: bool, whether to show the main controls (default: ``True``) :param opengl: bool, whether to use opengl (default: ``False``) '''
''' This exception is raised by :py:class:`Trace` operations when deconvolution of a frequency response (instrument response transfer function) would result in a division by zero. '''
''' This exception is raised by some :py:class:`Trace` operations when tmin, tmax or number of samples do not match. '''
''' This exception is raised by some :py:class:`Trace` operations when no or not enough data is available. '''
''' This exception is raised by some :py:class:`Trace` operations when given frequencies are above the Nyquist frequency. '''
''' This exception is raised by some :py:class:`Trace` operations when the trace is too short. '''
''' Get data range given traces grouped by selected pattern.
:param key: a callable which takes as single argument a trace and returns a key for the grouping of the results. If this is ``None``, the default, ``lambda tr: (tr.network, tr.station, tr.location, tr.channel)`` is used. :param mode: 'minmax' or floating point number. If this is 'minmax', minimum and maximum of the traces are used, if it is a number, mean +- standard deviation times ``mode`` is used.
:returns: a dict with the combined data ranges.
Examples::
ranges = minmax(traces, lambda tr: tr.channel) print ranges['N'] # print min & max of all traces with channel == 'N' print ranges['E'] # print min & max of all traces with channel == 'E'
ranges = minmax(traces, lambda tr: (tr.network, tr.station)) print ranges['GR', 'HAM3'] # print min & max of all traces with # network == 'GR' and station == 'HAM3'
ranges = minmax(traces, lambda tr: None) print ranges[None] # prints min & max of all traces '''
key = _default_key
else: mean = trace.ydata.mean() std = trace.ydata.std() mi, ma = mean-std*mode, mean+std*mode
else: tmi, tma = ranges[k] ranges[k] = min(tmi, mi), max(tma, ma)
''' Get time range given traces grouped by selected pattern.
:param key: a callable which takes as single argument a trace and returns a key for the grouping of the results. If this is ``None``, the default, ``lambda tr: (tr.network, tr.station, tr.location, tr.channel)`` is used.
:returns: a dict with the combined data ranges. '''
key = _default_key
else:
traces, maxgap=5, fillmethod='interpolate', deoverlap='use_second', maxlap=None):
''' Try to connect traces and remove gaps.
This method will combine adjacent traces, which match in their network, station, location and channel attributes. Overlapping parts are handled according to the ``deoverlap`` argument.
:param traces: input traces, must be sorted by their full_id attribute. :param maxgap: maximum number of samples to interpolate. :param fillmethod: what to put into the gaps: 'interpolate' or 'zeros'. :param deoverlap: how to handle overlaps: 'use_second' to use data from second trace (default), 'use_first' to use data from first trace, 'crossfade_cos' to crossfade with cosine taper, 'add' to add amplitude values. :param maxlap: maximum number of samples of overlap which are removed
:returns: list of traces '''
'traces given to degapper() must either all have data or have ' \ 'no data.'
and a.data_len() >= 1 and b.data_len() >= 1 and (virtual or a.ydata.dtype == b.ydata.dtype)):
# logger.warning('Cannot degap traces with displaced sampling ' # '(%s, %s, %s, %s)' % a.nslc_id) pass else: ((1.0 + num.arange(idist-1, dtype=float)) / idist) * (b.ydata[0]-a.ydata[-1]) ).astype(a.ydata.dtype) elif fillmethod == 'zeros': filler = num.zeros(idist-1, dtype=a.ydist.dtype)
(a.ydata[:-n], b.ydata)) (a.ydata, b.ydata[n:])) (a.ydata, b.ydata[n:])) else: assert False, 'unknown deoverlap method'
(1.+num.arange(n))/(1.+n)*num.pi)
else: # make short second trace vanish continue
''' 2D rotation of traces.
:param traces: list of input traces :param azimuth: difference of the azimuths of the component directions (azimuth of out_channels[0]) - (azimuth of in_channels[0]) :param in_channels: names of the input channels (e.g. 'N', 'E') :param out_channels: names of the output channels (e.g. 'R', 'T') :returns: list of rotated traces '''
a.nslc_id[:3] == b.nslc_id[:3] and abs(a.deltat-b.deltat) < a.deltat*0.001):
logger.warning( 'Cannot rotate traces with displaced sampling ' '(%s, %s, %s, %s)' % a.nslc_id) continue
azimuth = orthodrome.azimuth(receiver, source) + 180. in_channels = n.channel, e.channel out = rotate( [n, e], azimuth, in_channels=in_channels, out_channels=out_channels)
assert len(out) == 2 for tr in out: if tr.channel == 'R': r = tr elif tr.channel == 'T': t = tr
return r, t
out_channels=('L', 'Q', 'T')): ''' Rotate traces from ZNE to LQT system.
:param traces: list of traces in arbitrary order :param backazimuth: backazimuth in degrees clockwise from north :param incidence: incidence angle in degrees from vertical :param in_channels: input channel names :param out_channels: output channel names (default: ('L', 'Q', 'T')) :returns: list of transformed traces '''
[[ci, -cb*si, -sb*si], [si, cb*ci, sb*ci], [0., sb, -cb]])
''' Decompose matrix into independent submatrices. '''
continue
continue
else:
''' Affine transform of three-component traces.
Compute matrix-vector product of three-component traces, to e.g. rotate traces into a different basis. The traces are distinguished and ordered by their channel attribute. The tranform is applied to overlapping parts of any appropriate combinations of the input traces. This should allow this function to be robust with data gaps. It also tries to apply the tranformation to subsets of the channels, if this is possible, so that, if for example a vertical compontent is missing, horizontal components can still be rotated.
:param traces: list of traces in arbitrary order :param matrix: tranformation matrix :param in_channels: input channel names :param out_channels: output channel names :returns: list of transformed traces '''
# fallback to full matrix if some are not quadratic return _project3(traces, matrix, in_channels, out_channels)
else:
''' Figure out what dependencies project() would produce. '''
subpro.append((matrix, in_channels, out_channels))
a.nslc_id[:3] == b.nslc_id[:3] and abs(a.deltat-b.deltat) < a.deltat*0.001): continue
continue
logger.warning( 'Cannot project traces with displaced sampling ' '(%s, %s, %s, %s)' % a.nslc_id) continue
and a.nslc_id[:3] == b.nslc_id[:3] and b.nslc_id[:3] == c.nslc_id[:3] and abs(a.deltat-b.deltat) < a.deltat*0.001 and abs(b.deltat-c.deltat) < b.deltat*0.001):
continue
continue
or abs(bc.tmin - cc.tmin) > bc.deltat*0.01):
logger.warning( 'Cannot project traces with displaced sampling ' '(%s, %s, %s, %s)' % a.nslc_id) continue
matrix[0], (ac.get_ydata(), bc.get_ydata(), cc.get_ydata())) matrix[1], (ac.get_ydata(), bc.get_ydata(), cc.get_ydata())) matrix[2], (ac.get_ydata(), bc.get_ydata(), cc.get_ydata()))
''' Cross correlation of two traces.
:param a,b: input traces :param mode: ``'valid'``, ``'full'``, or ``'same'`` :param normalization: ``'normal'``, ``'gliding'``, or ``None`` :param use_fft: bool, whether to do cross correlation in spectral domain
:returns: trace containing cross correlation coefficients
This function computes the cross correlation between two traces. It evaluates the discrete equivalent of
.. math::
c(t) = \\int_{-\\infty}^{\\infty} a^{\\ast}(\\tau) b(t+\\tau) d\\tau
where the star denotes complex conjugate. Note, that the arguments here are swapped when compared with the :py:func:`numpy.correlate` function, which is internally called. This function should be safe even with older versions of NumPy, where the correlate function has some problems.
A trace containing the cross correlation coefficients is returned. The time information of the output trace is set so that the returned cross correlation can be viewed directly as a function of time lag.
Example::
# align two traces a and b containing a time shifted similar signal: c = pyrocko.trace.correlate(a,b) t, coef = c.max() # get time and value of maximum b.shift(-t) # align b with a
'''
# need reversed order here:
normfac = num.sqrt(num.sum(ya**2))*num.sqrt(num.sum(yb**2)) yc = yc/normfac
assert False, 'gliding normalization currently only available ' \ 'with "valid" mode.'
else:
moving_sum(ylong**2, yshort.size, mode='valid')) \ + normfac_short*epsilon
a, b, waterlevel, tshift=0., pad=0.5, fd_taper=None, pad_to_pow2=True):
same_sampling_rate(a, b) assert abs(a.tmin - b.tmin) < a.deltat * 0.001 deltat = a.deltat npad = int(round(a.data_len()*pad + tshift / deltat))
ndata = max(a.data_len(), b.data_len()) ndata_pad = ndata + npad
if pad_to_pow2: ntrans = nextpow2(ndata_pad) else: ntrans = ndata
aspec = num.fft.rfft(a.ydata, ntrans) bspec = num.fft.rfft(b.ydata, ntrans)
out = aspec * num.conj(bspec)
bautocorr = bspec*num.conj(bspec) denom = num.maximum(bautocorr, waterlevel * bautocorr.max())
out /= denom df = 1/(ntrans*deltat)
if fd_taper is not None: fd_taper(out, 0.0, df)
ydata = num.roll(num.fft.irfft(out), int(round(tshift/deltat))) c = a.copy(data=False) c.set_ydata(ydata[:ndata]) c.set_codes(*merge_codes(a, b, '/')) return c
'Sampling rates differ: %g != %g' % (a.deltat, b.deltat)
''' Check if two traces have the same sampling rate.
:param a,b: input traces :param eps: relative tolerance '''
''' Try to undo sampling rate rounding errors.
Fix rounding errors of sampling intervals when these are read from single precision floating point values.
Assumes that the true sampling rate or sampling interval was an integer value. No correction will be applied if this would change the sampling rate by more than 0.001%. '''
if deltat <= 1.0: deltat_new = 1.0 / round(1.0 / deltat) else: deltat_new = round(deltat)
if abs(deltat_new - deltat) / deltat > 1e-5: deltat_new = deltat
return deltat_new
''' Merge network-station-location-channel codes of a pair of traces. '''
else:
''' Base class for tapers.
Does nothing by default. '''
pass
''' Cosine Taper.
:param a: start of fading in :param b: end of fading in :param c: start of fading out :param d: end of fading out '''
''' Cosine Fader.
:param xfade: fade in and fade out time in seconds (optional) :param xfrac: fade in and fade out as fraction between 0. and 1. (optional)
Only one argument can be set. The other should to be ``None``. '''
xfade = xlen * self._xfrac
return None, None
return None else:
return None else:
''' Multiplication of several tapers. '''
tapers = []
spans = [] for taper in self.tapers: spans.append(taper.span(y, x0, dx))
mins, maxs = list(zip(*spans)) return min(mins), max(maxs)
''' Frequency domain Gaussian filter. '''
Taper.__init__(self, alpha=alpha) self._alpha = alpha
f = x0 + num.arange(y.size)*dx y *= num.exp(-num.pi**2 / (self._alpha**2) * f**2)
''' Evaluates frequency response at given frequencies. '''
''' Check if this is a flat response. '''
else:
''' Calls evalresp and generates values of the instrument response transfer function.
:param respfile: response file in evalresp format :param trace: trace for which the response is to be extracted from the file :param target: ``'dis'`` for displacement or ``'vel'`` for velocity '''
self, respfile, trace=None, target='dis', nslc_id=None, time=None):
nslc_id = trace.nslc_id time = (trace.tmin + trace.tmax) / 2.
self, respfile=respfile, nslc_id=nslc_id, instant=time, target=target)
sta_list=station, cha_list=channel, net_code=network, locid=location, instant=self.instant, freqs=freqs, units=self.target.upper(), file=self.respfile, rtype='CS')
''' Calls evalresp and generates values of the inverse instrument response for deconvolution of instrument response.
:param respfile: response file in evalresp format :param trace: trace for which the response is to be extracted from the file :param target: ``'dis'`` for displacement or ``'vel'`` for velocity '''
self, respfile=respfile, nslc_id=trace.nslc_id, instant=(trace.tmin + trace.tmax)/2., target=target)
network, station, location, channel = self.nslc_id x = evalresp.evalresp(sta_list=station, cha_list=channel, net_code=network, locid=location, instant=self.instant, freqs=freqs, units=self.target.upper(), file=self.respfile, rtype='CS')
transfer = x[0][4] return 1./transfer
''' Evaluates frequency response from pole-zero representation.
:param zeros: :py:class:`numpy.array` containing complex positions of zeros :param poles: :py:class:`numpy.array` containing complex positions of poles :param constant: gain as floating point number
::
(j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ... T(f) = constant * ---------------------------------------------------- (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ...
The poles and zeros should be given as angular frequencies, not in Hz. '''
self, zeros=zeros, poles=poles, constant=constant)
return len(self.zeros) == 0 and len(self.poles) == 0
''' Butterworth frequency response.
:param corner: corner frequency of the response :param order: order of the response :param type: either ``high`` or ``low`` '''
int(self.order), float(self.corner), self.type, analog=True)
''' Interpolates frequency response given at a set of sampled frequencies.
:param frequencies,values: frequencies and values of the sampled response function. :param left,right: values to return when input is out of range. If set to ``None`` (the default) the endpoints are returned. '''
self, frequencies=asarray_1d(frequencies, float), values=asarray_1d(values, complex))
ereal = num.interp( freqs, self.frequencies, num.real(self.values), left=self.left, right=self.right) eimag = num.interp( freqs, self.frequencies, num.imag(self.values), left=self.left, right=self.right) transfer = ereal + 1.0j*eimag return transfer
''' Get inverse as a new :py:class:`SampledResponse` object. '''
def inv_or_none(x): if x is not None: return 1./x
return SampledResponse( self.frequencies, 1./self.values, left=inv_or_none(self.left), right=inv_or_none(self.right))
''' The integration response, optionally multiplied by a constant gain.
:param n: exponent (integer) :param gain: gain factor (float)
::
gain T(f) = -------------- (j*2*pi * f)^n '''
''' The differentiation response, optionally multiplied by a constant gain.
:param n: exponent (integer) :param gain: gain factor (float)
::
T(f) = gain * (j*2*pi * f)^n '''
''' Frequency response of an analog filter.
(see :py:func:`scipy.signal.freqs`). '''
return signal.freqs(self.b, self.a, freqs/(2.*num.pi))[1]
''' Multiplication of several :py:class:`FrequencyResponse` objects. '''
responses = []
return all(resp.is_scalar() for resp in self.responses)
else: raise ValueError('could not convert to 1D array')
cached_coefficients[ck] = signal.butter( order, corners[0], btype=btype) else: order, corners, btype=btype)
return (tr.network, tr.station, tr.location, tr.channel)
''' Check if NumPy's correlate function reveals old behaviour. '''
''' Call :py:func:`numpy.correlate` with fixes.
c[k] = sum_i a[i+k] * conj(b[i])
Note that the result produced by newer numpy.correlate is always flipped with respect to the formula given in its documentation (if ascending k assumed for the output). '''
else:
else:
b = num.conj(b)
return c[::-1] else:
''' Slow version of :py:func:`numpy.correlate` for comparison. '''
a[imin+k:imin+ilen+k] * num.conj(b[imin:imin+ilen]))
''' Get range of lags for which :py:func:`numpy.correlate` produces values. '''
int(not use_fft and a.size % 2 == 0 and b.size > a.size)
''' Compute biased estimate of the first autocorrelation coefficients.
:param x: input array :param nshifts: number of coefficients to calculate '''
''' Compute autoregression coefficients using Yule-Walker method.
:param x: input array :param order: number of coefficients to produce
A biased estimate of the autocorrelation is used. The Yule-Walker equations are solved by :py:func:`numpy.linalg.inv` instead of Levinson-Durbin recursion which is normally used. '''
return num.zeros(0, dtype=cx.dtype)
else: y[0:nn] = cx[0:nn] y[nn:n] = cx[nn-1] y[n:nn+n-1] = cx[nn-1] - cx[0:nn-1]
n1 = (n-1)//2 y = num.zeros(nn, dtype=cx.dtype) if n <= nn: y[0:n-n1] = cx[n1:n] y[n-n1:nn-n1] = cx[n:nn]-cx[0:nn-n] y[nn-n1:nn] = cx[nn-1] - cx[nn-n:nn-n+n1] else: y[0:max(0, nn-n1)] = cx[min(n1, nn):nn] y[max(nn-n1, 0):min(n-n1, nn)] = cx[nn-1] y[min(n-n1, nn):nn] = cx[nn-1] - cx[0:max(0, nn-(n-n1))]
- 0.5*num.cos((dx*num.arange(hi(a), hi(b))-(a-x0))/(b-a)*num.pi) + 0.5*num.cos((dx*num.arange(hi(c), hi(d))-(c-x0))/(d-c)*num.pi)
- 0.5*num.cos((deltaf*num.arange(hi(a), hi(b))-a)/(b-a)*num.pi) + 0.5*num.cos((deltaf*num.arange(hi(c), hi(d))-c)/(d-c)*num.pi)
''' Return the hilbert transform of x of length N.
(from scipy.signal, but changed to use fft and ifft from numpy.fft) '''
raise ValueError("N must be positive.") logger.warning('imaginary part of x ignored.') x = num.real(x) else:
h = h[:, num.newaxis]
''' Utility to store channel-specific state in coroutines. '''
and near(deltat, tr.deltat, deltat/10000.) and dtype == tr.ydata.dtype):
def co_list_append(list):
def co_lfilter(target, b, a): ''' Successively filter broken continuous trace data (coroutine).
Create coroutine which takes :py:class:`Trace` objects, filters their data through :py:func:`scipy.signal.lfilter` and sends new :py:class:`Trace` objects containing the filtered data to target. This is useful, if one wants to filter a long continuous time series, which is split into many successive traces without producing filter artifacts at trace boundaries.
Filter states are kept *per channel*, specifically, for each (network, station, location, channel) combination occuring in the input traces, a separate state is created and maintained. This makes it possible to filter multichannel or multistation data with only one :py:func:`co_lfilter` instance.
Filter state is reset, when gaps occur.
Use it like this::
from pyrocko.trace import co_lfilter, co_list_append
filtered_traces = [] pipe = co_lfilter(co_list_append(filtered_traces), a, b) for trace in traces: pipe.send(trace)
pipe.close()
'''
except ValueError: raise ScipyBug( 'signal.lfilter failed: could be related to a bug ' 'in some older scipy versions, e.g. on opensuse42.1')
def co_dropsamples(target, q, nfir): # for fir filter, the first nfir samples are pulluted by # boundary effects; cut it off. # for iir this may be (much) more, we do not correct for that. # put sample instances to a time which is a multiple of the # new sampling interval. (tr.tmin+(nfir+1)*tr.deltat) / newdeltat) * newdeltat \ - (nfir/2*tr.deltat) ioffset = ioffset % q
# because the fir kernel shifts data by nfir/2 samples:
''' Successively downsample broken continuous trace data (coroutine).
Create coroutine which takes :py:class:`Trace` objects, downsamples their data and sends new :py:class:`Trace` objects containing the downsampled data to target. This is useful, if one wants to downsample a long continuous time series, which is split into many successive traces without producing filter artifacts and gaps at trace boundaries.
Filter states are kept *per channel*, specifically, for each (network, station, location, channel) combination occuring in the input traces, a separate state is created and maintained. This makes it possible to filter multichannel or multistation data with only one :py:func:`co_lfilter` instance.
Filter state is reset, when gaps occur. The sampling instances are choosen so that they occur at (or as close as possible) to even multiples of the sampling interval of the downsampled trace (based on system time). '''
def co_downsample_to(target, deltat):
raise util.UnavailableDecimation('ratio = %g' % ratio)
'time_domain', 'frequency_domain', 'envelope', 'absolute', 'cc_max_norm']
''' Contains misfit setup to be used in :py:func:`trace.misfit`
:param description: Description of the setup :param norm: L-norm classifier :param taper: Object of :py:class:`Taper` :param filter: Object of :py:class:`FrequencyResponse` :param domain: ['time_domain', 'frequency_domain', 'envelope', 'absolute', 'cc_max_norm']
Can be dumped to a yaml file. '''
''' Equalize sampling rates of two traces (reduce higher sampling rate to lower).
:param trace_1: :py:class:`Trace` object :param trace_2: :py:class:`Trace` object
Returns a copy of the resampled trace if resampling is needed. '''
return trace_1, trace_2
% '.'.join(t1_out.nslc_id))
elif trace_1.deltat > trace_2.deltat: t2_out = trace_2.copy() t2_out.downsample_to(deltat=trace_1.deltat, snap=True) logger.debug('Trace downsampled (return copy of trace): %s' % '.'.join(t2_out.nslc_id)) return trace_1, t2_out
''' Calculate the misfit denominator *m* and the normalization devisor *n* according to norm.
The normalization divisor *n* is calculated from ``v``.
:param u: :py:class:`numpy.array` :param v: :py:class:`numpy.array` :param norm: (default = 2)
``u`` and ``v`` must be of same size. '''
num.sum(num.abs(v-u)), num.sum(num.abs(v)))
num.sqrt(num.sum((v-u)**2)), num.sqrt(num.sum(v**2)))
else: return ( num.power(num.sum(num.abs(num.power(v - u, norm))), 1./norm), num.power(num.sum(num.abs(num.power(v, norm))), 1./norm))
else:
return tr else:
return inp else:
return inp else:
if abs(t1.tmin-t2.tmin) > t1.deltat * 1e-4 or \ abs(t1.tmax - t2.tmax) > t1.deltat * 1e-4 or \ t1.ydata.shape != t2.ydata.shape: raise MisalignedTraces( 'Cannot calculate misfit of %s and %s due to misaligned ' 'traces.' % ('.'.join(t1.nslc_id), '.'.join(t2.nslc_id))) |