Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/tool/commands/jackseis.py: 23%
340 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Implementation of :app:`squirrel jackseis`.
8'''
10import re
11import sys
12import logging
13import os.path as op
15import numpy as num
17from pyrocko import io, trace, util
18from pyrocko.progress import progress
19from pyrocko.has_paths import Path, HasPaths
20from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \
21 StringChoice, IntChoice, Defer, load_all, clone
22from pyrocko.squirrel.dataset import Dataset
23from pyrocko.squirrel.client.local import LocalData
24from pyrocko.squirrel.error import ToolError, SquirrelError
25from pyrocko.squirrel.model import CodesNSLCE, QuantityType
26from pyrocko.squirrel.operators.base import NetworkGrouping, StationGrouping, \
27 ChannelGrouping, SensorGrouping
28from pyrocko.squirrel.tool.common import ldq
30tts = util.time_to_str
32guts_prefix = 'jackseis'
33logger = logging.getLogger('psq.cli.jackseis')
36def dset(kwargs, keys, values):
37 for k, v in zip(keys, values):
38 kwargs[k] = v
41def make_task(*args):
42 return progress.task(*args, logger=logger)
45def parse_rename_rule_from_string(s):
46 s = s.strip()
47 if re.match(r'^([^:,]*:[^:,]*,?)+', s):
48 return dict(
49 x.split(':') for x in s.strip(',').split(','))
50 else:
51 return s
54class JackseisError(ToolError):
55 pass
58class Chain(object):
59 def __init__(self, node, parent=None):
60 self.node = node
61 self.parent = parent
63 def mcall(self, name, *args, **kwargs):
64 ret = []
65 if self.parent is not None:
66 ret.append(self.parent.mcall(name, *args, **kwargs))
68 ret.append(getattr(self.node, name)(*args, **kwargs))
69 return ret
71 def fcall(self, name, *args, **kwargs):
72 v = getattr(self.node, name)(*args, **kwargs)
73 if v is None and self.parent is not None:
74 return self.parent.fcall(name, *args, **kwargs)
75 else:
76 return v
78 def get(self, name):
79 v = getattr(self.node, name)
80 if v is None and self.parent is not None:
81 return self.parent.get(name)
82 else:
83 return v
85 def dget(self, name, k):
86 v = getattr(self.node, name).get(k, None)
87 if v is None and self.parent is not None:
88 return self.parent.dget(name, k)
89 else:
90 return v
93class OutputFormatChoice(StringChoice):
94 choices = io.allowed_formats('save')
97class OutputDataTypeChoice(StringChoice):
98 choices = ['int32', 'int64', 'float32', 'float64']
99 name_to_dtype = {
100 'int32': num.int32,
101 'int64': num.int64,
102 'float32': num.float32,
103 'float64': num.float64}
106class TraversalChoice(StringChoice):
107 choices = ['network', 'station', 'channel', 'sensor']
108 name_to_grouping = {
109 'network': NetworkGrouping(),
110 'station': StationGrouping(),
111 'sensor': SensorGrouping(),
112 'channel': ChannelGrouping()}
115class Converter(HasPaths):
117 in_dataset = Dataset.T(optional=True)
118 in_path = String.T(optional=True)
119 in_paths = List.T(String.T(optional=True))
121 codes = List.T(CodesNSLCE.T(), optional=True)
123 rename = Dict.T(
124 String.T(),
125 Choice.T([
126 String.T(),
127 Dict.T(String.T(), String.T())]))
128 tmin = Timestamp.T(optional=True)
129 tmax = Timestamp.T(optional=True)
130 tinc = Float.T(optional=True)
132 downsample = Float.T(optional=True)
134 quantity = QuantityType.T(optional=True)
135 fmin = Float.T(optional=True)
136 fmax = Float.T(optional=True)
137 fcut_factor = Float.T(optional=True)
138 fmin_cut = Float.T(optional=True)
139 fmax_cut = Float.T(optional=True)
141 out_path = Path.T(optional=True)
142 out_sds_path = Path.T(optional=True)
143 out_format = OutputFormatChoice.T(optional=True)
144 out_data_type = OutputDataTypeChoice.T(optional=True)
145 out_mseed_record_length = IntChoice.T(
146 optional=True,
147 choices=list(io.mseed.VALID_RECORD_LENGTHS))
148 out_mseed_steim = IntChoice.T(
149 optional=True,
150 choices=[1, 2])
151 out_meta_path = Path.T(optional=True)
153 traversal = TraversalChoice.T(optional=True)
155 parts = List.T(Defer('Converter.T'))
157 def get_effective_frequency_taper(self, chain):
158 fmin = chain.get('fmin')
159 fmax = chain.get('fmax')
161 if None in (fmin, fmax):
162 if fmin is not None:
163 raise JackseisError('Converter: fmax not specified.')
164 if fmax is not None:
165 raise JackseisError('Converter: fmin not specified.')
167 return None
169 fcut_factor = chain.get('fcut_factor') or 2.0
170 fmin_cut = chain.get('fmin_cut')
171 fmax_cut = chain.get('fmax_cut')
172 fmin_cut = fmin_cut if fmin_cut is not None else fmin / fcut_factor
173 fmax_cut = fmax_cut if fmax_cut is not None else fmax * fcut_factor
175 return fmin_cut, fmin, fmax, fmax_cut
177 @classmethod
178 def add_arguments(cls, p):
179 p.add_squirrel_query_arguments(without=['time'])
181 p.add_argument(
182 '--tinc',
183 dest='tinc',
184 type=float,
185 metavar='SECONDS',
186 help='Set time length of output files [s].')
188 p.add_argument(
189 '--downsample',
190 dest='downsample',
191 type=float,
192 metavar='RATE',
193 help='Downsample to RATE [Hz].')
195 p.add_argument(
196 '--quantity',
197 dest='quantity',
198 metavar='QUANTITY',
199 choices=QuantityType.choices,
200 help='''
201Restitute waveforms to selected ``QUANTITY``. Restitution is performed by
202multiplying the waveform spectra with a tapered inverse of the instrument
203response transfer function. The frequency band of the taper can be adjusted
204using the ``--band`` option. Choices: %s.
205'''.strip() % ldq(QuantityType.choices))
207 p.add_argument(
208 '--band',
209 metavar='FMIN,FMAX or FMIN,FMAX,CUTFACTOR or '
210 'FMINCUT,FMIN,FMAX,FMAXCUT',
211 help='''
212Frequency band used in restitution (see ``--quantity``) or for (acausal)
213filtering. Waveform spectra are multiplied with a taper with cosine-shaped
214flanks and which is flat between ``FMIN`` and ``FMAX``. The flanks of the taper
215drop to zero at ``FMINCUT`` and ``FMAXCUT``. If ``CUTFACTOR`` is given,
216``FMINCUT`` and ``FMAXCUT`` are set to ``FMIN/CUTFACTOR`` and
217``FMAX*CUTFACTOR`` respectively. ``CUTFACTOR`` defaults to 2.
218'''.strip())
220 p.add_argument(
221 '--out-path',
222 dest='out_path',
223 metavar='TEMPLATE',
224 help='''
225Set output path to ``TEMPLATE``. Available placeholders are ``%%n``: network,
226``%%s``: station, ``%%l``: location, ``%%c``: channel, ``%%b``: begin time,
227``%%e``: end time, ``%%j``: julian day of year. The following additional
228placeholders use the current processing window's begin and end times rather
229than trace begin and end times (to suppress producing many small files for
230gappy traces), ``%%(wmin_year)s``, ``%%(wmin_month)s``, ``%%(wmin_day)s``,
231``%%(wmin)s``, ``%%(wmin_jday)s``, ``%%(wmax_year)s``, ``%%(wmax_month)s``,
232``%%(wmax_day)s``, ``%%(wmax)s``, ``%%(wmax_jday)s``. Example: ``--out-path
233'data/%%s/trace-%%s-%%c.mseed'``
234'''.strip())
236 p.add_argument(
237 '--out-sds-path',
238 dest='out_sds_path',
239 metavar='PATH',
240 help='''
241Set output path to create an SDS archive
242(https://www.seiscomp.de/seiscomp3/doc/applications/slarchive/SDS.html), rooted
243at PATH. Implies ``--tinc 86400``. Example: ``--out-sds-path data/sds``
244'''.strip())
246 p.add_argument(
247 '--out-format',
248 dest='out_format',
249 choices=io.allowed_formats('save'),
250 metavar='FORMAT',
251 help='Set output file format. Choices: %s' % io.allowed_formats(
252 'save', 'cli_help', 'mseed'))
254 p.add_argument(
255 '--out-data-type',
256 dest='out_data_type',
257 choices=OutputDataTypeChoice.choices,
258 metavar='DTYPE',
259 help='Set numerical data type. Choices: %s. The output file '
260 'format must support the given type. By default, the data '
261 'type is kept unchanged.' % ldq(
262 OutputDataTypeChoice.choices))
264 p.add_argument(
265 '--out-mseed-record-length',
266 dest='out_mseed_record_length',
267 type=int,
268 choices=io.mseed.VALID_RECORD_LENGTHS,
269 metavar='INT',
270 help='Set the Mini-SEED record length in bytes. Choices: %s. '
271 'Default is 4096 bytes, which is commonly used for archiving.'
272 % ldq(str(b) for b in io.mseed.VALID_RECORD_LENGTHS))
274 p.add_argument(
275 '--out-mseed-steim',
276 dest='out_mseed_steim',
277 type=int,
278 choices=(1, 2),
279 metavar='INT',
280 help='Set the Mini-SEED STEIM compression. Choices: ``1`` or '
281 '``2``. Default is STEIM-2. Note: STEIM-2 is limited to 30 '
282 'bit dynamic range.')
284 p.add_argument(
285 '--out-meta-path',
286 dest='out_meta_path',
287 metavar='PATH',
288 help='Set output path for station metadata (StationXML) export.')
290 p.add_argument(
291 '--traversal',
292 dest='traversal',
293 metavar='GROUPING',
294 choices=TraversalChoice.choices,
295 help='By default the outermost processing loop is over time. '
296 'Add outer loop with given GROUPING. Choices: %s'
297 % ldq(TraversalChoice.choices))
299 p.add_argument(
300 '--rename-network',
301 dest='rename_network',
302 metavar='REPLACEMENT',
303 help="""
304Replace network code. REPLACEMENT can be a string for direct replacement, a
305mapping for selective replacement, or a regular expression for tricky
306replacements. Examples: Direct replacement: ```XX``` - set all network codes to
307```XX```. Mapping: ```AA:XX,BB:YY``` - replace ```AA``` with ```XX``` and
308```BB``` with ```YY```. Regular expression: ```/A(\\d)/X\\1/``` - replace
309```A1``` with ```X1``` and ```A2``` with ```X2``` etc.
310""".strip())
312 p.add_argument(
313 '--rename-station',
314 dest='rename_station',
315 metavar='REPLACEMENT',
316 help='Replace station code. See ``--rename-network``.')
318 p.add_argument(
319 '--rename-location',
320 dest='rename_location',
321 metavar='REPLACEMENT',
322 help='Replace location code. See ``--rename-network``.')
324 p.add_argument(
325 '--rename-channel',
326 dest='rename_channel',
327 metavar='REPLACEMENT',
328 help='Replace channel code. See ``--rename-network``.')
330 p.add_argument(
331 '--rename-extra',
332 dest='rename_extra',
333 metavar='REPLACEMENT',
334 help='Replace extra code. See ``--rename-network``. Note: the '
335 '```extra``` code is not available in Mini-SEED.')
337 @classmethod
338 def from_arguments(cls, args):
339 kwargs = args.squirrel_query
341 rename = {}
342 for (k, v) in [
343 ('network', args.rename_network),
344 ('station', args.rename_station),
345 ('location', args.rename_location),
346 ('channel', args.rename_channel),
347 ('extra', args.rename_extra)]:
349 if v is not None:
350 rename[k] = parse_rename_rule_from_string(v)
352 if args.band:
353 try:
354 values = list(map(float, args.band.split(',')))
355 if len(values) not in (2, 3, 4):
356 raise ValueError()
358 if len(values) == 2:
359 dset(kwargs, 'fmin fmax'.split(), values)
360 elif len(values) == 3:
361 dset(kwargs, 'fmin fmax fcut_factor'.split(), values)
362 elif len(values) == 4:
363 dset(kwargs, 'fmin_cut fmin fmax fmax_cut'.split(), values)
365 except ValueError:
366 raise JackseisError(
367 'Invalid argument to --band: %s' % args.band) from None
369 obj = cls(
370 downsample=args.downsample,
371 quantity=args.quantity,
372 out_format=args.out_format,
373 out_path=args.out_path,
374 tinc=args.tinc,
375 out_sds_path=args.out_sds_path,
376 out_data_type=args.out_data_type,
377 out_mseed_record_length=args.out_mseed_record_length,
378 out_mseed_steim=args.out_mseed_steim,
379 out_meta_path=args.out_meta_path,
380 traversal=args.traversal,
381 rename=rename,
382 **kwargs)
384 obj.validate()
385 return obj
387 def add_dataset(self, sq):
388 if self.in_dataset is not None:
389 sq.add_dataset(self.in_dataset)
391 if self.in_path is not None:
392 ds = Dataset(sources=[LocalData(paths=[self.in_path])])
393 ds.set_basepath_from(self)
394 sq.add_dataset(ds)
396 if self.in_paths:
397 ds = Dataset(sources=[LocalData(paths=self.in_paths)])
398 ds.set_basepath_from(self)
399 sq.add_dataset(ds)
401 def get_effective_rename_rules(self, chain):
402 d = {}
403 for k in ['network', 'station', 'location', 'channel']:
404 v = chain.dget('rename', k)
405 if isinstance(v, str):
406 m = re.match(r'/([^/]+)/([^/]*)/', v)
407 if m:
408 try:
409 v = (re.compile(m.group(1)), m.group(2))
410 except Exception:
411 raise JackseisError(
412 'Invalid replacement pattern: /%s/' % m.group(1))
414 d[k] = v
416 return d
418 def get_effective_out_path(self):
419 nset = sum(x is not None for x in (
420 self.out_path,
421 self.out_sds_path))
423 if nset > 1:
424 raise JackseisError(
425 'More than one out of [out_path, out_sds_path] set.')
427 is_sds = False
428 if self.out_path:
429 out_path = self.out_path
431 elif self.out_sds_path:
432 out_path = op.join(
433 self.out_sds_path,
434 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D'
435 '/%(network)s.%(station)s.%(location)s.%(channel)s.D'
436 '.%(wmin_year)s.%(wmin_jday)s')
437 is_sds = True
438 else:
439 out_path = None
441 if out_path is not None:
442 return self.expand_path(out_path), is_sds
443 else:
444 return None
446 def get_effective_out_meta_path(self):
447 if self.out_meta_path is not None:
448 return self.expand_path(self.out_meta_path)
449 else:
450 return None
452 def do_rename(self, rules, tr):
453 rename = {}
454 for k in ['network', 'station', 'location', 'channel']:
455 v = rules.get(k, None)
456 if isinstance(v, str):
457 rename[k] = v
458 elif isinstance(v, dict):
459 try:
460 oldval = getattr(tr, k)
461 rename[k] = v[oldval]
462 except KeyError:
463 raise ToolError(
464 'No mapping defined for %s code "%s".' % (k, oldval))
466 elif isinstance(v, tuple):
467 pat, repl = v
468 oldval = getattr(tr, k)
469 newval, n = pat.subn(repl, oldval)
470 if n:
471 rename[k] = newval
473 tr.set_codes(**rename)
475 def convert(self, args, chain=None):
476 if chain is None:
477 defaults = clone(g_defaults)
478 defaults.set_basepath_from(self)
479 chain = Chain(defaults)
481 chain = Chain(self, chain)
483 if self.parts:
484 task = make_task('Jackseis parts')
485 for part in task(self.parts):
486 part.convert(args, chain)
488 del task
490 else:
491 sq = args.make_squirrel()
493 cli_overrides = Converter.from_arguments(args)
494 cli_overrides.set_basepath('.')
496 chain = Chain(cli_overrides, chain)
498 chain.mcall('add_dataset', sq)
500 tmin = chain.get('tmin')
501 tmax = chain.get('tmax')
502 tinc = chain.get('tinc')
503 codes = chain.get('codes')
504 downsample = chain.get('downsample')
505 out_path, is_sds = chain.fcall('get_effective_out_path') \
506 or (None, False)
508 if is_sds and tinc != 86400.:
509 logger.warning('Setting time window to 1 day to generate SDS.')
510 tinc = 86400.0
512 out_format = chain.get('out_format')
513 out_data_type = chain.get('out_data_type')
515 out_meta_path = chain.fcall('get_effective_out_meta_path')
517 if out_meta_path is not None:
518 sx = sq.get_stationxml(codes=codes, tmin=tmin, tmax=tmax)
519 util.ensuredirs(out_meta_path)
520 sx.dump_xml(filename=out_meta_path)
521 if out_path is None:
522 return
524 target_deltat = None
525 if downsample is not None:
526 target_deltat = 1.0 / float(downsample)
528 save_kwargs = {}
529 if out_format == 'mseed':
530 save_kwargs['record_length'] = chain.get(
531 'out_mseed_record_length')
532 save_kwargs['steim'] = chain.get(
533 'out_mseed_steim')
535 traversal = chain.get('traversal')
536 if traversal is not None:
537 grouping = TraversalChoice.name_to_grouping[traversal]
538 else:
539 grouping = None
541 frequency_taper = self.get_effective_frequency_taper(chain)
543 if frequency_taper is not None:
544 if frequency_taper[0] != 0.0:
545 frequency_taper_tpad = 1.0 / frequency_taper[0]
546 else:
547 if frequency_taper[1] == 0.0:
548 raise JackseisError(
549 'Converter: fmin must be greater than zero.')
551 frequency_taper_tpad = 2.0 / frequency_taper[1]
552 else:
553 frequency_taper_tpad = 0.0
555 quantity = chain.get('quantity')
557 do_transfer = \
558 quantity is not None or frequency_taper is not None
560 tpad = 0.0
561 if target_deltat is not None:
562 tpad += target_deltat * 50.
564 if do_transfer:
565 tpad += frequency_taper_tpad
567 task = None
568 rename_rules = self.get_effective_rename_rules(chain)
569 for batch in sq.chopper_waveforms(
570 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc,
571 codes=codes,
572 snap_window=True,
573 grouping=grouping):
575 if task is None:
576 task = make_task(
577 'Jackseis blocks', batch.n * batch.ngroups)
579 tlabel = '%s%s - %s' % (
580 'groups %i / %i: ' % (batch.igroup, batch.ngroups)
581 if batch.ngroups > 1 else '',
582 util.time_to_str(batch.tmin),
583 util.time_to_str(batch.tmax))
585 task.update(batch.i + batch.igroup * batch.n, tlabel)
587 twmin = batch.tmin
588 twmax = batch.tmax
590 traces = batch.traces
592 if target_deltat is not None:
593 downsampled_traces = []
594 for tr in traces:
595 try:
596 tr.downsample_to(
597 target_deltat, snap=True, demean=False,
598 allow_upsample_max=4)
600 downsampled_traces.append(tr)
602 except (trace.TraceTooShort, trace.NoData) as e:
603 logger.warning(str(e))
605 traces = downsampled_traces
607 if do_transfer:
608 restituted_traces = []
609 for tr in traces:
610 try:
611 if quantity is not None:
612 resp = sq.get_response(tr).get_effective(
613 input_quantity=quantity)
614 else:
615 resp = None
617 restituted_traces.append(tr.transfer(
618 frequency_taper_tpad,
619 frequency_taper,
620 transfer_function=resp,
621 invert=True))
623 except (trace.NoData, trace.TraceTooShort,
624 SquirrelError) as e:
625 logger.warning(str(e))
627 traces = restituted_traces
629 for tr in traces:
630 self.do_rename(rename_rules, tr)
632 if out_data_type:
633 for tr in traces:
634 tr.ydata = tr.ydata.astype(
635 OutputDataTypeChoice.name_to_dtype[out_data_type])
637 chopped_traces = []
638 for tr in traces:
639 try:
640 otr = tr.chop(twmin, twmax, inplace=False)
641 chopped_traces.append(otr)
642 except trace.NoData:
643 pass
645 traces = chopped_traces
647 if out_path is not None:
648 try:
649 io.save(
650 traces, out_path,
651 format=out_format,
652 overwrite=args.force,
653 additional=dict(
654 wmin_year=tts(twmin, format='%Y'),
655 wmin_month=tts(twmin, format='%m'),
656 wmin_day=tts(twmin, format='%d'),
657 wmin_jday=tts(twmin, format='%j'),
658 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'),
659 wmax_year=tts(twmax, format='%Y'),
660 wmax_month=tts(twmax, format='%m'),
661 wmax_day=tts(twmax, format='%d'),
662 wmax_jday=tts(twmax, format='%j'),
663 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')),
664 **save_kwargs)
666 except io.FileSaveError as e:
667 raise JackseisError(str(e))
669 else:
670 for tr in traces:
671 print(tr.summary_stats)
673 if task:
674 task.done()
677g_defaults = Converter(
678 out_mseed_record_length=4096,
679 out_format='mseed',
680 out_mseed_steim=2)
683headline = 'Convert waveform archive data.'
686def make_subparser(subparsers):
687 return subparsers.add_parser(
688 'jackseis',
689 help=headline,
690 description=headline)
693def setup(parser):
694 parser.add_squirrel_selection_arguments()
696 parser.add_argument(
697 '--config',
698 dest='config_path',
699 metavar='NAME',
700 help='File containing `jackseis.Converter` settings.')
702 parser.add_argument(
703 '--dump-config',
704 dest='dump_config',
705 action='store_true',
706 default=False,
707 help='''
708Print configuration file snippet representing given command line arguments to
709standard output and exit. Only command line options affecting the conversion
710are included in the dump. Additional ``--config`` settings, data collection and
711data query options are ignored.
712'''.strip())
714 parser.add_argument(
715 '--force',
716 dest='force',
717 action='store_true',
718 default=False,
719 help='Force overwriting of existing files.')
721 Converter.add_arguments(parser)
724def run(parser, args):
725 if args.dump_config:
726 converter = Converter.from_arguments(args)
727 print(converter)
728 sys.exit(0)
730 if args.config_path:
731 try:
732 converters = load_all(filename=args.config_path)
733 except Exception as e:
734 raise ToolError(str(e))
736 for converter in converters:
737 if not isinstance(converter, Converter):
738 raise ToolError(
739 'Config file should only contain '
740 '`jackseis.Converter` objects.')
742 converter.set_basepath(op.dirname(args.config_path))
744 else:
745 converter = Converter()
746 converter.set_basepath('.')
747 converters = [converter]
749 with progress.view():
750 task = make_task('Jackseis jobs')
751 for converter in task(converters):
752 converter.convert(args)