1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import re
7import logging
8import os.path as op
10import numpy as num
12from pyrocko import io, trace, util
13from pyrocko.progress import progress
14from pyrocko.has_paths import Path, HasPaths
15from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \
16 StringChoice, IntChoice, Defer, load_all, clone
17from pyrocko.squirrel.dataset import Dataset
18from pyrocko.squirrel.client.local import LocalData
19from pyrocko.squirrel.error import ToolError
20from pyrocko.squirrel.model import CodesNSLCE
21from pyrocko.squirrel.operators.base import NetworkGrouping, StationGrouping, \
22 ChannelGrouping, SensorGrouping
24tts = util.time_to_str
26guts_prefix = 'jackseis'
27logger = logging.getLogger('psq.cli.jackseis')
30def make_task(*args):
31 return progress.task(*args, logger=logger)
34def parse_rename_rule_from_string(s):
35 s = s.strip()
36 if re.match(r'^([^:,]*:[^:,]*,?)+', s):
37 return dict(
38 x.split(':') for x in s.strip(',').split(','))
39 else:
40 return s
43class JackseisError(ToolError):
44 pass
47class Chain(object):
48 def __init__(self, node, parent=None):
49 self.node = node
50 self.parent = parent
52 def mcall(self, name, *args, **kwargs):
53 ret = []
54 if self.parent is not None:
55 ret.append(self.parent.mcall(name, *args, **kwargs))
57 ret.append(getattr(self.node, name)(*args, **kwargs))
58 return ret
60 def fcall(self, name, *args, **kwargs):
61 v = getattr(self.node, name)(*args, **kwargs)
62 if v is None and self.parent is not None:
63 return self.parent.fcall(name, *args, **kwargs)
64 else:
65 return v
67 def get(self, name):
68 v = getattr(self.node, name)
69 if v is None and self.parent is not None:
70 return self.parent.get(name)
71 else:
72 return v
74 def dget(self, name, k):
75 v = getattr(self.node, name).get(k, None)
76 if v is None and self.parent is not None:
77 return self.parent.dget(name, k)
78 else:
79 return v
82class OutputFormatChoice(StringChoice):
83 choices = io.allowed_formats('save')
86class OutputDataTypeChoice(StringChoice):
87 choices = ['int32', 'int64', 'float32', 'float64']
88 name_to_dtype = {
89 'int32': num.int32,
90 'int64': num.int64,
91 'float32': num.float32,
92 'float64': num.float64}
95class TraversalChoice(StringChoice):
96 choices = ['network', 'station', 'channel', 'sensor']
97 name_to_grouping = {
98 'network': NetworkGrouping(),
99 'station': StationGrouping(),
100 'sensor': SensorGrouping(),
101 'channel': ChannelGrouping()}
104class Converter(HasPaths):
106 in_dataset = Dataset.T(optional=True)
107 in_path = String.T(optional=True)
108 in_paths = List.T(String.T(optional=True))
110 codes = List.T(CodesNSLCE.T(), optional=True)
112 rename = Dict.T(
113 String.T(),
114 Choice.T([
115 String.T(),
116 Dict.T(String.T(), String.T())]))
117 tmin = Timestamp.T(optional=True)
118 tmax = Timestamp.T(optional=True)
119 tinc = Float.T(optional=True)
121 downsample = Float.T(optional=True)
123 out_path = Path.T(optional=True)
124 out_sds_path = Path.T(optional=True)
125 out_format = OutputFormatChoice.T(optional=True)
126 out_data_type = OutputDataTypeChoice.T(optional=True)
127 out_mseed_record_length = IntChoice.T(
128 optional=True,
129 choices=list(io.mseed.VALID_RECORD_LENGTHS))
130 out_mseed_steim = IntChoice.T(
131 optional=True,
132 choices=[1, 2])
133 out_meta_path = Path.T(optional=True)
135 traversal = TraversalChoice.T(optional=True)
137 parts = List.T(Defer('Converter.T'))
139 @classmethod
140 def add_arguments(cls, p):
141 p.add_squirrel_query_arguments(without=['time'])
143 p.add_argument(
144 '--tinc',
145 dest='tinc',
146 type=float,
147 metavar='SECONDS',
148 help='Set time length of output files [s].')
150 p.add_argument(
151 '--downsample',
152 dest='downsample',
153 type=float,
154 metavar='RATE',
155 help='Downsample to RATE [Hz].')
157 p.add_argument(
158 '--out-path',
159 dest='out_path',
160 metavar='TEMPLATE',
161 help='Set output path to TEMPLATE. Available placeholders '
162 'are %%n: network, %%s: station, %%l: location, %%c: '
163 'channel, %%b: begin time, %%e: end time, %%j: julian day of '
164 'year. The following additional placeholders use the window '
165 'begin and end times rather than trace begin and end times '
166 '(to suppress producing many small files for gappy traces), '
167 '%%(wmin_year)s, %%(wmin_month)s, %%(wmin_day)s, %%(wmin)s, '
168 '%%(wmin_jday)s, %%(wmax_year)s, %%(wmax_month)s, '
169 '%%(wmax_day)s, %%(wmax)s, %%(wmax_jday)s. '
170 "Example: --out-path='data/%%s/trace-%%s-%%c.mseed'")
172 p.add_argument(
173 '--out-sds-path',
174 dest='out_sds_path',
175 metavar='PATH',
176 help='Set output path to create SDS (https://www.seiscomp.de'
177 '/seiscomp3/doc/applications/slarchive/SDS.html), rooted at '
178 'the given path. Implies --tinc=86400. '
179 'Example: --out-sds-path=data/sds')
181 p.add_argument(
182 '--out-format',
183 dest='out_format',
184 choices=io.allowed_formats('save'),
185 metavar='FORMAT',
186 help='Set output file format. Choices: %s' % io.allowed_formats(
187 'save', 'cli_help', 'mseed'))
189 p.add_argument(
190 '--out-data-type',
191 dest='out_data_type',
192 choices=OutputDataTypeChoice.choices,
193 metavar='DTYPE',
194 help='Set numerical data type. Choices: %s. The output file '
195 'format must support the given type. By default, the data '
196 'type is kept unchanged.' % ', '.join(
197 OutputDataTypeChoice.choices))
199 p.add_argument(
200 '--out-mseed-record-length',
201 dest='out_mseed_record_length',
202 type=int,
203 choices=io.mseed.VALID_RECORD_LENGTHS,
204 metavar='INT',
205 help='Set the mseed record length in bytes. Choices: %s. '
206 'Default is 4096 bytes, which is commonly used for archiving.'
207 % ', '.join(str(b) for b in io.mseed.VALID_RECORD_LENGTHS))
209 p.add_argument(
210 '--out-mseed-steim',
211 dest='out_mseed_steim',
212 type=int,
213 choices=(1, 2),
214 metavar='INT',
215 help='Set the mseed STEIM compression. Choices: 1 or 2. '
216 'Default is STEIM-2, which can compress full range int32. '
217 'Note: STEIM-2 is limited to 30 bit dynamic range.')
219 p.add_argument(
220 '--out-meta-path',
221 dest='out_meta_path',
222 metavar='PATH',
223 help='Set output path for station metadata (StationXML) export.')
225 p.add_argument(
226 '--traversal',
227 dest='traversal',
228 metavar='GROUPING',
229 choices=TraversalChoice.choices,
230 help='By default the outermost processing loop is over time. '
231 'Add outer loop with given GROUPING. Choices: %s'
232 % ', '.join(TraversalChoice.choices))
234 p.add_argument(
235 '--rename-network',
236 dest='rename_network',
237 metavar='REPLACEMENT',
238 help="""
239Replace network code. REPLACEMENT can be a string for direct replacement, a
240mapping for selective replacement, or a regular expression for tricky
241replacements. Examples: Direct replacement: ```XX``` - set all network codes to
242```XX```. Mapping: ```AA:XX,BB:YY``` - replace ```AA``` with ```XX``` and
243```BB``` with ```YY```. Regular expression: ```/A(\\d)/X\\1/``` - replace
244```A1``` with ```X1``` and ```A2``` with ```X2``` etc.
245""".strip())
247 p.add_argument(
248 '--rename-station',
249 dest='rename_station',
250 metavar='REPLACEMENT',
251 help='Replace station code. See ``--rename-network``.')
253 p.add_argument(
254 '--rename-location',
255 dest='rename_location',
256 metavar='REPLACEMENT',
257 help='Replace location code. See ``--rename-network``.')
259 p.add_argument(
260 '--rename-channel',
261 dest='rename_channel',
262 metavar='REPLACEMENT',
263 help='Replace channel code. See ``--rename-network``.')
265 p.add_argument(
266 '--rename-extra',
267 dest='rename_extra',
268 metavar='REPLACEMENT',
269 help='Replace extra code. See ``--rename-network``. Note: the '
270 '```extra``` code is not available in Mini-SEED.')
272 @classmethod
273 def from_arguments(cls, args):
274 kwargs = args.squirrel_query
276 rename = {}
277 for (k, v) in [
278 ('network', args.rename_network),
279 ('station', args.rename_station),
280 ('location', args.rename_location),
281 ('channel', args.rename_channel),
282 ('extra', args.rename_extra)]:
284 if v is not None:
285 rename[k] = parse_rename_rule_from_string(v)
287 obj = cls(
288 downsample=args.downsample,
289 out_format=args.out_format,
290 out_path=args.out_path,
291 tinc=args.tinc,
292 out_sds_path=args.out_sds_path,
293 out_data_type=args.out_data_type,
294 out_mseed_record_length=args.out_mseed_record_length,
295 out_mseed_steim=args.out_mseed_steim,
296 out_meta_path=args.out_meta_path,
297 traversal=args.traversal,
298 rename=rename,
299 **kwargs)
301 obj.validate()
302 return obj
304 def add_dataset(self, sq):
305 if self.in_dataset is not None:
306 sq.add_dataset(self.in_dataset)
308 if self.in_path is not None:
309 ds = Dataset(sources=[LocalData(paths=[self.in_path])])
310 ds.set_basepath_from(self)
311 sq.add_dataset(ds)
313 if self.in_paths:
314 ds = Dataset(sources=[LocalData(paths=self.in_paths)])
315 ds.set_basepath_from(self)
316 sq.add_dataset(ds)
318 def get_effective_rename_rules(self, chain):
319 d = {}
320 for k in ['network', 'station', 'location', 'channel']:
321 v = chain.dget('rename', k)
322 if isinstance(v, str):
323 m = re.match(r'/([^/]+)/([^/]*)/', v)
324 if m:
325 try:
326 v = (re.compile(m.group(1)), m.group(2))
327 except Exception:
328 raise JackseisError(
329 'Invalid replacement pattern: /%s/' % m.group(1))
331 d[k] = v
333 return d
335 def get_effective_out_path(self):
336 nset = sum(x is not None for x in (
337 self.out_path,
338 self.out_sds_path))
340 if nset > 1:
341 raise JackseisError(
342 'More than one out of [out_path, out_sds_path] set.')
344 is_sds = False
345 if self.out_path:
346 out_path = self.out_path
348 elif self.out_sds_path:
349 out_path = op.join(
350 self.out_sds_path,
351 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D'
352 '/%(network)s.%(station)s.%(location)s.%(channel)s.D'
353 '.%(wmin_year)s.%(wmin_jday)s')
354 is_sds = True
355 else:
356 out_path = None
358 if out_path is not None:
359 return self.expand_path(out_path), is_sds
360 else:
361 return None
363 def get_effective_out_meta_path(self):
364 if self.out_meta_path is not None:
365 return self.expand_path(self.out_meta_path)
366 else:
367 return None
369 def do_rename(self, rules, tr):
370 rename = {}
371 for k in ['network', 'station', 'location', 'channel']:
372 v = rules.get(k, None)
373 if isinstance(v, str):
374 rename[k] = v
375 elif isinstance(v, dict):
376 try:
377 oldval = getattr(tr, k)
378 rename[k] = v[oldval]
379 except KeyError:
380 raise ToolError(
381 'No mapping defined for %s code "%s".' % (k, oldval))
383 elif isinstance(v, tuple):
384 pat, repl = v
385 oldval = getattr(tr, k)
386 newval, n = pat.subn(repl, oldval)
387 if n:
388 rename[k] = newval
390 tr.set_codes(**rename)
392 def convert(self, args, chain=None):
393 if chain is None:
394 defaults = clone(g_defaults)
395 defaults.set_basepath_from(self)
396 chain = Chain(defaults)
398 chain = Chain(self, chain)
400 if self.parts:
401 task = make_task('Jackseis parts')
402 for part in task(self.parts):
403 part.convert(args, chain)
405 del task
407 else:
408 sq = args.make_squirrel()
410 cli_overrides = Converter.from_arguments(args)
411 cli_overrides.set_basepath('.')
413 chain = Chain(cli_overrides, chain)
415 chain.mcall('add_dataset', sq)
417 tmin = chain.get('tmin')
418 tmax = chain.get('tmax')
419 tinc = chain.get('tinc')
420 codes = chain.get('codes')
421 downsample = chain.get('downsample')
422 out_path, is_sds = chain.fcall('get_effective_out_path') \
423 or (None, False)
425 if is_sds and tinc != 86400.:
426 logger.warning('Setting time window to 1 day to generate SDS.')
427 tinc = 86400.0
429 out_format = chain.get('out_format')
430 out_data_type = chain.get('out_data_type')
432 out_meta_path = chain.fcall('get_effective_out_meta_path')
434 if out_meta_path is not None:
435 sx = sq.get_stationxml(codes=codes, tmin=tmin, tmax=tmax)
436 util.ensuredirs(out_meta_path)
437 sx.dump_xml(filename=out_meta_path)
438 if out_path is None:
439 return
441 target_deltat = None
442 if downsample is not None:
443 target_deltat = 1.0 / float(downsample)
445 save_kwargs = {}
446 if out_format == 'mseed':
447 save_kwargs['record_length'] = chain.get(
448 'out_mseed_record_length')
449 save_kwargs['steim'] = chain.get(
450 'out_mseed_steim')
452 traversal = chain.get('traversal')
453 if traversal is not None:
454 grouping = TraversalChoice.name_to_grouping[traversal]
455 else:
456 grouping = None
458 tpad = 0.0
459 if target_deltat is not None:
460 tpad += target_deltat * 50.
462 task = None
463 rename_rules = self.get_effective_rename_rules(chain)
464 for batch in sq.chopper_waveforms(
465 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc,
466 codes=codes,
467 snap_window=True,
468 grouping=grouping):
470 if task is None:
471 task = make_task(
472 'Jackseis blocks', batch.n * batch.ngroups)
474 tlabel = '%s%s - %s' % (
475 'groups %i / %i: ' % (batch.igroup, batch.ngroups)
476 if batch.ngroups > 1 else '',
477 util.time_to_str(batch.tmin),
478 util.time_to_str(batch.tmax))
480 task.update(batch.i + batch.igroup * batch.n, tlabel)
482 twmin = batch.tmin
483 twmax = batch.tmax
485 traces = batch.traces
487 if target_deltat is not None:
488 out_traces = []
489 for tr in traces:
490 try:
491 tr.downsample_to(
492 target_deltat, snap=True, demean=False,
493 allow_upsample_max=4)
495 out_traces.append(tr)
497 except (trace.TraceTooShort, trace.NoData):
498 pass
500 traces = out_traces
502 for tr in traces:
503 self.do_rename(rename_rules, tr)
505 if out_data_type:
506 for tr in traces:
507 tr.ydata = tr.ydata.astype(
508 OutputDataTypeChoice.name_to_dtype[out_data_type])
510 chopped_traces = []
511 for tr in traces:
512 try:
513 otr = tr.chop(twmin, twmax, inplace=False)
514 chopped_traces.append(otr)
515 except trace.NoData:
516 pass
518 traces = chopped_traces
520 if out_path is not None:
521 try:
522 io.save(
523 traces, out_path,
524 format=out_format,
525 overwrite=args.force,
526 additional=dict(
527 wmin_year=tts(twmin, format='%Y'),
528 wmin_month=tts(twmin, format='%m'),
529 wmin_day=tts(twmin, format='%d'),
530 wmin_jday=tts(twmin, format='%j'),
531 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'),
532 wmax_year=tts(twmax, format='%Y'),
533 wmax_month=tts(twmax, format='%m'),
534 wmax_day=tts(twmax, format='%d'),
535 wmax_jday=tts(twmax, format='%j'),
536 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')),
537 **save_kwargs)
539 except io.FileSaveError as e:
540 raise JackseisError(str(e))
542 else:
543 for tr in traces:
544 print(tr.summary)
546 if task:
547 task.done()
550g_defaults = Converter(
551 out_mseed_record_length=4096,
552 out_format='mseed',
553 out_mseed_steim=2)
556headline = 'Convert waveform archive data.'
559def make_subparser(subparsers):
560 return subparsers.add_parser(
561 'jackseis',
562 help=headline,
563 description=headline)
566def setup(parser):
567 parser.add_squirrel_selection_arguments()
569 parser.add_argument(
570 '--config',
571 dest='config_path',
572 metavar='NAME',
573 help='File containing `jackseis.Converter` settings.')
575 parser.add_argument(
576 '--force',
577 dest='force',
578 action='store_true',
579 default=False,
580 help='Force overwriting of existing files.')
582 Converter.add_arguments(parser)
585def run(parser, args):
586 if args.config_path:
587 try:
588 converters = load_all(filename=args.config_path)
589 except Exception as e:
590 raise ToolError(str(e))
592 for converter in converters:
593 if not isinstance(converter, Converter):
594 raise ToolError(
595 'Config file should only contain '
596 '`jackseis.Converter` objects.')
598 converter.set_basepath(op.dirname(args.config_path))
600 else:
601 converter = Converter()
602 converter.set_basepath('.')
603 converters = [converter]
605 with progress.view():
606 task = make_task('Jackseis jobs')
607 for converter in task(converters):
608 converter.convert(args)