1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import re
7import sys
8import logging
9import os.path as op
11import numpy as num
13from pyrocko import io, trace, util
14from pyrocko.progress import progress
15from pyrocko.has_paths import Path, HasPaths
16from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \
17 StringChoice, IntChoice, Defer, load_all, clone
18from pyrocko.squirrel.dataset import Dataset
19from pyrocko.squirrel.client.local import LocalData
20from pyrocko.squirrel.error import ToolError, SquirrelError
21from pyrocko.squirrel.model import CodesNSLCE, QuantityType
22from pyrocko.squirrel.operators.base import NetworkGrouping, StationGrouping, \
23 ChannelGrouping, SensorGrouping
24from pyrocko.squirrel.tool.common import ldq
26tts = util.time_to_str
28guts_prefix = 'jackseis'
29logger = logging.getLogger('psq.cli.jackseis')
32def dset(kwargs, keys, values):
33 for k, v in zip(keys, values):
34 kwargs[k] = v
37def make_task(*args):
38 return progress.task(*args, logger=logger)
41def parse_rename_rule_from_string(s):
42 s = s.strip()
43 if re.match(r'^([^:,]*:[^:,]*,?)+', s):
44 return dict(
45 x.split(':') for x in s.strip(',').split(','))
46 else:
47 return s
50class JackseisError(ToolError):
51 pass
54class Chain(object):
55 def __init__(self, node, parent=None):
56 self.node = node
57 self.parent = parent
59 def mcall(self, name, *args, **kwargs):
60 ret = []
61 if self.parent is not None:
62 ret.append(self.parent.mcall(name, *args, **kwargs))
64 ret.append(getattr(self.node, name)(*args, **kwargs))
65 return ret
67 def fcall(self, name, *args, **kwargs):
68 v = getattr(self.node, name)(*args, **kwargs)
69 if v is None and self.parent is not None:
70 return self.parent.fcall(name, *args, **kwargs)
71 else:
72 return v
74 def get(self, name):
75 v = getattr(self.node, name)
76 if v is None and self.parent is not None:
77 return self.parent.get(name)
78 else:
79 return v
81 def dget(self, name, k):
82 v = getattr(self.node, name).get(k, None)
83 if v is None and self.parent is not None:
84 return self.parent.dget(name, k)
85 else:
86 return v
89class OutputFormatChoice(StringChoice):
90 choices = io.allowed_formats('save')
93class OutputDataTypeChoice(StringChoice):
94 choices = ['int32', 'int64', 'float32', 'float64']
95 name_to_dtype = {
96 'int32': num.int32,
97 'int64': num.int64,
98 'float32': num.float32,
99 'float64': num.float64}
102class TraversalChoice(StringChoice):
103 choices = ['network', 'station', 'channel', 'sensor']
104 name_to_grouping = {
105 'network': NetworkGrouping(),
106 'station': StationGrouping(),
107 'sensor': SensorGrouping(),
108 'channel': ChannelGrouping()}
111class Converter(HasPaths):
113 in_dataset = Dataset.T(optional=True)
114 in_path = String.T(optional=True)
115 in_paths = List.T(String.T(optional=True))
117 codes = List.T(CodesNSLCE.T(), optional=True)
119 rename = Dict.T(
120 String.T(),
121 Choice.T([
122 String.T(),
123 Dict.T(String.T(), String.T())]))
124 tmin = Timestamp.T(optional=True)
125 tmax = Timestamp.T(optional=True)
126 tinc = Float.T(optional=True)
128 downsample = Float.T(optional=True)
130 quantity = QuantityType.T(optional=True)
131 fmin = Float.T(optional=True)
132 fmax = Float.T(optional=True)
133 fcut_factor = Float.T(optional=True)
134 fmin_cut = Float.T(optional=True)
135 fmax_cut = Float.T(optional=True)
137 out_path = Path.T(optional=True)
138 out_sds_path = Path.T(optional=True)
139 out_format = OutputFormatChoice.T(optional=True)
140 out_data_type = OutputDataTypeChoice.T(optional=True)
141 out_mseed_record_length = IntChoice.T(
142 optional=True,
143 choices=list(io.mseed.VALID_RECORD_LENGTHS))
144 out_mseed_steim = IntChoice.T(
145 optional=True,
146 choices=[1, 2])
147 out_meta_path = Path.T(optional=True)
149 traversal = TraversalChoice.T(optional=True)
151 parts = List.T(Defer('Converter.T'))
153 def get_effective_frequency_taper(self, chain):
154 fmin = chain.get('fmin')
155 fmax = chain.get('fmax')
157 if None in (fmin, fmax):
158 if fmin is not None:
159 raise JackseisError('Converter: fmax not specified.')
160 if fmax is not None:
161 raise JackseisError('Converter: fmin not specified.')
163 return None
165 fcut_factor = chain.get('fcut_factor') or 2.0
166 fmin_cut = chain.get('fmin_cut')
167 fmax_cut = chain.get('fmax_cut')
168 fmin_cut = fmin_cut if fmin_cut is not None else fmin / fcut_factor
169 fmax_cut = fmax_cut if fmax_cut is not None else fmax * fcut_factor
171 return fmin_cut, fmin, fmax, fmax_cut
173 @classmethod
174 def add_arguments(cls, p):
175 p.add_squirrel_query_arguments(without=['time'])
177 p.add_argument(
178 '--tinc',
179 dest='tinc',
180 type=float,
181 metavar='SECONDS',
182 help='Set time length of output files [s].')
184 p.add_argument(
185 '--downsample',
186 dest='downsample',
187 type=float,
188 metavar='RATE',
189 help='Downsample to RATE [Hz].')
191 p.add_argument(
192 '--quantity',
193 dest='quantity',
194 metavar='QUANTITY',
195 choices=QuantityType.choices,
196 help='''
197Restitute waveforms to selected ``QUANTITY``. Restitution is performed by
198multiplying the waveform spectra with a tapered inverse of the instrument
199response transfer function. The frequency band of the taper can be adjusted
200using the ``--band`` option. Choices: %s.
201'''.strip() % ldq(QuantityType.choices))
203 p.add_argument(
204 '--band',
205 metavar='FMIN,FMAX or FMIN,FMAX,CUTFACTOR or '
206 'FMINCUT,FMIN,FMAX,FMAXCUT',
207 help='''
208Frequency band used in restitution (see ``--quantity``) or for (acausal)
209filtering. Waveform spectra are multiplied with a taper with cosine-shaped
210flanks and which is flat between ``FMIN`` and ``FMAX``. The flanks of the taper
211drop to zero at ``FMINCUT`` and ``FMAXCUT``. If ``CUTFACTOR`` is given,
212``FMINCUT`` and ``FMAXCUT`` are set to ``FMIN/CUTFACTOR`` and
213``FMAX*CUTFACTOR`` respectively. ``CUTFACTOR`` defaults to 2.
214'''.strip())
216 p.add_argument(
217 '--out-path',
218 dest='out_path',
219 metavar='TEMPLATE',
220 help='''
221Set output path to ``TEMPLATE``. Available placeholders are ``%%n``: network,
222``%%s``: station, ``%%l``: location, ``%%c``: channel, ``%%b``: begin time,
223``%%e``: end time, ``%%j``: julian day of year. The following additional
224placeholders use the current processing window's begin and end times rather
225than trace begin and end times (to suppress producing many small files for
226gappy traces), ``%%(wmin_year)s``, ``%%(wmin_month)s``, ``%%(wmin_day)s``,
227``%%(wmin)s``, ``%%(wmin_jday)s``, ``%%(wmax_year)s``, ``%%(wmax_month)s``,
228``%%(wmax_day)s``, ``%%(wmax)s``, ``%%(wmax_jday)s``. Example: ``--out-path
229'data/%%s/trace-%%s-%%c.mseed'``
230'''.strip())
232 p.add_argument(
233 '--out-sds-path',
234 dest='out_sds_path',
235 metavar='PATH',
236 help='''
237Set output path to create an SDS archive
238(https://www.seiscomp.de/seiscomp3/doc/applications/slarchive/SDS.html), rooted
239at PATH. Implies ``--tinc 86400``. Example: ``--out-sds-path data/sds``
240'''.strip())
242 p.add_argument(
243 '--out-format',
244 dest='out_format',
245 choices=io.allowed_formats('save'),
246 metavar='FORMAT',
247 help='Set output file format. Choices: %s' % io.allowed_formats(
248 'save', 'cli_help', 'mseed'))
250 p.add_argument(
251 '--out-data-type',
252 dest='out_data_type',
253 choices=OutputDataTypeChoice.choices,
254 metavar='DTYPE',
255 help='Set numerical data type. Choices: %s. The output file '
256 'format must support the given type. By default, the data '
257 'type is kept unchanged.' % ldq(
258 OutputDataTypeChoice.choices))
260 p.add_argument(
261 '--out-mseed-record-length',
262 dest='out_mseed_record_length',
263 type=int,
264 choices=io.mseed.VALID_RECORD_LENGTHS,
265 metavar='INT',
266 help='Set the Mini-SEED record length in bytes. Choices: %s. '
267 'Default is 4096 bytes, which is commonly used for archiving.'
268 % ldq(str(b) for b in io.mseed.VALID_RECORD_LENGTHS))
270 p.add_argument(
271 '--out-mseed-steim',
272 dest='out_mseed_steim',
273 type=int,
274 choices=(1, 2),
275 metavar='INT',
276 help='Set the Mini-SEED STEIM compression. Choices: ``1`` or '
277 '``2``. Default is STEIM-2. Note: STEIM-2 is limited to 30 '
278 'bit dynamic range.')
280 p.add_argument(
281 '--out-meta-path',
282 dest='out_meta_path',
283 metavar='PATH',
284 help='Set output path for station metadata (StationXML) export.')
286 p.add_argument(
287 '--traversal',
288 dest='traversal',
289 metavar='GROUPING',
290 choices=TraversalChoice.choices,
291 help='By default the outermost processing loop is over time. '
292 'Add outer loop with given GROUPING. Choices: %s'
293 % ldq(TraversalChoice.choices))
295 p.add_argument(
296 '--rename-network',
297 dest='rename_network',
298 metavar='REPLACEMENT',
299 help="""
300Replace network code. REPLACEMENT can be a string for direct replacement, a
301mapping for selective replacement, or a regular expression for tricky
302replacements. Examples: Direct replacement: ```XX``` - set all network codes to
303```XX```. Mapping: ```AA:XX,BB:YY``` - replace ```AA``` with ```XX``` and
304```BB``` with ```YY```. Regular expression: ```/A(\\d)/X\\1/``` - replace
305```A1``` with ```X1``` and ```A2``` with ```X2``` etc.
306""".strip())
308 p.add_argument(
309 '--rename-station',
310 dest='rename_station',
311 metavar='REPLACEMENT',
312 help='Replace station code. See ``--rename-network``.')
314 p.add_argument(
315 '--rename-location',
316 dest='rename_location',
317 metavar='REPLACEMENT',
318 help='Replace location code. See ``--rename-network``.')
320 p.add_argument(
321 '--rename-channel',
322 dest='rename_channel',
323 metavar='REPLACEMENT',
324 help='Replace channel code. See ``--rename-network``.')
326 p.add_argument(
327 '--rename-extra',
328 dest='rename_extra',
329 metavar='REPLACEMENT',
330 help='Replace extra code. See ``--rename-network``. Note: the '
331 '```extra``` code is not available in Mini-SEED.')
333 @classmethod
334 def from_arguments(cls, args):
335 kwargs = args.squirrel_query
337 rename = {}
338 for (k, v) in [
339 ('network', args.rename_network),
340 ('station', args.rename_station),
341 ('location', args.rename_location),
342 ('channel', args.rename_channel),
343 ('extra', args.rename_extra)]:
345 if v is not None:
346 rename[k] = parse_rename_rule_from_string(v)
348 if args.band:
349 try:
350 values = list(map(float, args.band.split(',')))
351 if len(values) not in (2, 3, 4):
352 raise ValueError()
354 if len(values) == 2:
355 dset(kwargs, 'fmin fmax'.split(), values)
356 elif len(values) == 3:
357 dset(kwargs, 'fmin fmax fcut_factor'.split(), values)
358 elif len(values) == 4:
359 dset(kwargs, 'fmin_cut fmin fmax fmax_cut'.split(), values)
361 except ValueError:
362 raise JackseisError(
363 'Invalid argument to --band: %s' % args.band) from None
365 obj = cls(
366 downsample=args.downsample,
367 quantity=args.quantity,
368 out_format=args.out_format,
369 out_path=args.out_path,
370 tinc=args.tinc,
371 out_sds_path=args.out_sds_path,
372 out_data_type=args.out_data_type,
373 out_mseed_record_length=args.out_mseed_record_length,
374 out_mseed_steim=args.out_mseed_steim,
375 out_meta_path=args.out_meta_path,
376 traversal=args.traversal,
377 rename=rename,
378 **kwargs)
380 obj.validate()
381 return obj
383 def add_dataset(self, sq):
384 if self.in_dataset is not None:
385 sq.add_dataset(self.in_dataset)
387 if self.in_path is not None:
388 ds = Dataset(sources=[LocalData(paths=[self.in_path])])
389 ds.set_basepath_from(self)
390 sq.add_dataset(ds)
392 if self.in_paths:
393 ds = Dataset(sources=[LocalData(paths=self.in_paths)])
394 ds.set_basepath_from(self)
395 sq.add_dataset(ds)
397 def get_effective_rename_rules(self, chain):
398 d = {}
399 for k in ['network', 'station', 'location', 'channel']:
400 v = chain.dget('rename', k)
401 if isinstance(v, str):
402 m = re.match(r'/([^/]+)/([^/]*)/', v)
403 if m:
404 try:
405 v = (re.compile(m.group(1)), m.group(2))
406 except Exception:
407 raise JackseisError(
408 'Invalid replacement pattern: /%s/' % m.group(1))
410 d[k] = v
412 return d
414 def get_effective_out_path(self):
415 nset = sum(x is not None for x in (
416 self.out_path,
417 self.out_sds_path))
419 if nset > 1:
420 raise JackseisError(
421 'More than one out of [out_path, out_sds_path] set.')
423 is_sds = False
424 if self.out_path:
425 out_path = self.out_path
427 elif self.out_sds_path:
428 out_path = op.join(
429 self.out_sds_path,
430 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D'
431 '/%(network)s.%(station)s.%(location)s.%(channel)s.D'
432 '.%(wmin_year)s.%(wmin_jday)s')
433 is_sds = True
434 else:
435 out_path = None
437 if out_path is not None:
438 return self.expand_path(out_path), is_sds
439 else:
440 return None
442 def get_effective_out_meta_path(self):
443 if self.out_meta_path is not None:
444 return self.expand_path(self.out_meta_path)
445 else:
446 return None
448 def do_rename(self, rules, tr):
449 rename = {}
450 for k in ['network', 'station', 'location', 'channel']:
451 v = rules.get(k, None)
452 if isinstance(v, str):
453 rename[k] = v
454 elif isinstance(v, dict):
455 try:
456 oldval = getattr(tr, k)
457 rename[k] = v[oldval]
458 except KeyError:
459 raise ToolError(
460 'No mapping defined for %s code "%s".' % (k, oldval))
462 elif isinstance(v, tuple):
463 pat, repl = v
464 oldval = getattr(tr, k)
465 newval, n = pat.subn(repl, oldval)
466 if n:
467 rename[k] = newval
469 tr.set_codes(**rename)
471 def convert(self, args, chain=None):
472 if chain is None:
473 defaults = clone(g_defaults)
474 defaults.set_basepath_from(self)
475 chain = Chain(defaults)
477 chain = Chain(self, chain)
479 if self.parts:
480 task = make_task('Jackseis parts')
481 for part in task(self.parts):
482 part.convert(args, chain)
484 del task
486 else:
487 sq = args.make_squirrel()
489 cli_overrides = Converter.from_arguments(args)
490 cli_overrides.set_basepath('.')
492 chain = Chain(cli_overrides, chain)
494 chain.mcall('add_dataset', sq)
496 tmin = chain.get('tmin')
497 tmax = chain.get('tmax')
498 tinc = chain.get('tinc')
499 codes = chain.get('codes')
500 downsample = chain.get('downsample')
501 out_path, is_sds = chain.fcall('get_effective_out_path') \
502 or (None, False)
504 if is_sds and tinc != 86400.:
505 logger.warning('Setting time window to 1 day to generate SDS.')
506 tinc = 86400.0
508 out_format = chain.get('out_format')
509 out_data_type = chain.get('out_data_type')
511 out_meta_path = chain.fcall('get_effective_out_meta_path')
513 if out_meta_path is not None:
514 sx = sq.get_stationxml(codes=codes, tmin=tmin, tmax=tmax)
515 util.ensuredirs(out_meta_path)
516 sx.dump_xml(filename=out_meta_path)
517 if out_path is None:
518 return
520 target_deltat = None
521 if downsample is not None:
522 target_deltat = 1.0 / float(downsample)
524 save_kwargs = {}
525 if out_format == 'mseed':
526 save_kwargs['record_length'] = chain.get(
527 'out_mseed_record_length')
528 save_kwargs['steim'] = chain.get(
529 'out_mseed_steim')
531 traversal = chain.get('traversal')
532 if traversal is not None:
533 grouping = TraversalChoice.name_to_grouping[traversal]
534 else:
535 grouping = None
537 frequency_taper = self.get_effective_frequency_taper(chain)
539 if frequency_taper is not None:
540 if frequency_taper[0] != 0.0:
541 frequency_taper_tpad = 1.0 / frequency_taper[0]
542 else:
543 if frequency_taper[1] == 0.0:
544 raise JackseisError(
545 'Converter: fmin must be greater than zero.')
547 frequency_taper_tpad = 2.0 / frequency_taper[1]
548 else:
549 frequency_taper_tpad = 0.0
551 quantity = chain.get('quantity')
553 do_transfer = \
554 quantity is not None or frequency_taper is not None
556 tpad = 0.0
557 if target_deltat is not None:
558 tpad += target_deltat * 50.
560 if do_transfer:
561 tpad += frequency_taper_tpad
563 task = None
564 rename_rules = self.get_effective_rename_rules(chain)
565 for batch in sq.chopper_waveforms(
566 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc,
567 codes=codes,
568 snap_window=True,
569 grouping=grouping):
571 if task is None:
572 task = make_task(
573 'Jackseis blocks', batch.n * batch.ngroups)
575 tlabel = '%s%s - %s' % (
576 'groups %i / %i: ' % (batch.igroup, batch.ngroups)
577 if batch.ngroups > 1 else '',
578 util.time_to_str(batch.tmin),
579 util.time_to_str(batch.tmax))
581 task.update(batch.i + batch.igroup * batch.n, tlabel)
583 twmin = batch.tmin
584 twmax = batch.tmax
586 traces = batch.traces
588 if target_deltat is not None:
589 downsampled_traces = []
590 for tr in traces:
591 try:
592 tr.downsample_to(
593 target_deltat, snap=True, demean=False,
594 allow_upsample_max=4)
596 downsampled_traces.append(tr)
598 except (trace.TraceTooShort, trace.NoData) as e:
599 logger.warning(str(e))
601 traces = downsampled_traces
603 if do_transfer:
604 restituted_traces = []
605 for tr in traces:
606 try:
607 if quantity is not None:
608 resp = sq.get_response(tr).get_effective(
609 input_quantity=quantity)
610 else:
611 resp = None
613 restituted_traces.append(tr.transfer(
614 frequency_taper_tpad,
615 frequency_taper,
616 transfer_function=resp,
617 invert=True))
619 except (trace.NoData, trace.TraceTooShort,
620 SquirrelError) as e:
621 logger.warning(str(e))
623 traces = restituted_traces
625 for tr in traces:
626 self.do_rename(rename_rules, tr)
628 if out_data_type:
629 for tr in traces:
630 tr.ydata = tr.ydata.astype(
631 OutputDataTypeChoice.name_to_dtype[out_data_type])
633 chopped_traces = []
634 for tr in traces:
635 try:
636 otr = tr.chop(twmin, twmax, inplace=False)
637 chopped_traces.append(otr)
638 except trace.NoData:
639 pass
641 traces = chopped_traces
643 if out_path is not None:
644 try:
645 io.save(
646 traces, out_path,
647 format=out_format,
648 overwrite=args.force,
649 additional=dict(
650 wmin_year=tts(twmin, format='%Y'),
651 wmin_month=tts(twmin, format='%m'),
652 wmin_day=tts(twmin, format='%d'),
653 wmin_jday=tts(twmin, format='%j'),
654 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'),
655 wmax_year=tts(twmax, format='%Y'),
656 wmax_month=tts(twmax, format='%m'),
657 wmax_day=tts(twmax, format='%d'),
658 wmax_jday=tts(twmax, format='%j'),
659 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')),
660 **save_kwargs)
662 except io.FileSaveError as e:
663 raise JackseisError(str(e))
665 else:
666 for tr in traces:
667 print(tr.summary_stats)
669 if task:
670 task.done()
673g_defaults = Converter(
674 out_mseed_record_length=4096,
675 out_format='mseed',
676 out_mseed_steim=2)
679headline = 'Convert waveform archive data.'
682def make_subparser(subparsers):
683 return subparsers.add_parser(
684 'jackseis',
685 help=headline,
686 description=headline)
689def setup(parser):
690 parser.add_squirrel_selection_arguments()
692 parser.add_argument(
693 '--config',
694 dest='config_path',
695 metavar='NAME',
696 help='File containing `jackseis.Converter` settings.')
698 parser.add_argument(
699 '--dump-config',
700 dest='dump_config',
701 action='store_true',
702 default=False,
703 help='''
704Print configuration file snippet representing given command line arguments to
705standard output and exit. Only command line options affecting the conversion
706are included in the dump. Additional ``--config`` settings, data collection and
707data query options are ignored.
708'''.strip())
710 parser.add_argument(
711 '--force',
712 dest='force',
713 action='store_true',
714 default=False,
715 help='Force overwriting of existing files.')
717 Converter.add_arguments(parser)
720def run(parser, args):
721 if args.dump_config:
722 converter = Converter.from_arguments(args)
723 print(converter)
724 sys.exit(0)
726 if args.config_path:
727 try:
728 converters = load_all(filename=args.config_path)
729 except Exception as e:
730 raise ToolError(str(e))
732 for converter in converters:
733 if not isinstance(converter, Converter):
734 raise ToolError(
735 'Config file should only contain '
736 '`jackseis.Converter` objects.')
738 converter.set_basepath(op.dirname(args.config_path))
740 else:
741 converter = Converter()
742 converter.set_basepath('.')
743 converters = [converter]
745 with progress.view():
746 task = make_task('Jackseis jobs')
747 for converter in task(converters):
748 converter.convert(args)