1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import re
9import logging
10import os.path as op
12import numpy as num
14from pyrocko import io, trace, util
15from pyrocko.progress import progress
16from pyrocko.has_paths import Path, HasPaths
17from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \
18 StringChoice, IntChoice, Defer, load_all, clone
19from pyrocko.squirrel.dataset import Dataset
20from pyrocko.squirrel.client.local import LocalData
21from pyrocko.squirrel.error import ToolError
22from pyrocko.squirrel.model import CodesNSLCE
23from pyrocko.squirrel.operators.base import NetworkGrouping, StationGrouping, \
24 ChannelGrouping, SensorGrouping
26tts = util.time_to_str
28guts_prefix = 'jackseis'
29logger = logging.getLogger('psq.cli.jackseis')
32def make_task(*args):
33 return progress.task(*args, logger=logger)
36class JackseisError(ToolError):
37 pass
40class Chain(object):
41 def __init__(self, node, parent=None):
42 self.node = node
43 self.parent = parent
45 def mcall(self, name, *args, **kwargs):
46 ret = []
47 if self.parent is not None:
48 ret.append(self.parent.mcall(name, *args, **kwargs))
50 ret.append(getattr(self.node, name)(*args, **kwargs))
51 return ret
53 def fcall(self, name, *args, **kwargs):
54 v = getattr(self.node, name)(*args, **kwargs)
55 if v is None and self.parent is not None:
56 return self.parent.fcall(name, *args, **kwargs)
57 else:
58 return v
60 def get(self, name):
61 v = getattr(self.node, name)
62 if v is None and self.parent is not None:
63 return self.parent.get(name)
64 else:
65 return v
67 def dget(self, name, k):
68 v = getattr(self.node, name).get(k, None)
69 if v is None and self.parent is not None:
70 return self.parent.dget(name, k)
71 else:
72 return v
75class OutputFormatChoice(StringChoice):
76 choices = io.allowed_formats('save')
79class OutputDataTypeChoice(StringChoice):
80 choices = ['int32', 'int64', 'float32', 'float64']
81 name_to_dtype = {
82 'int32': num.int32,
83 'int64': num.int64,
84 'float32': num.float32,
85 'float64': num.float64}
88class TraversalChoice(StringChoice):
89 choices = ['network', 'station', 'channel', 'sensor']
90 name_to_grouping = {
91 'network': NetworkGrouping(),
92 'station': StationGrouping(),
93 'sensor': SensorGrouping(),
94 'channel': ChannelGrouping()}
97class Converter(HasPaths):
99 in_dataset = Dataset.T(optional=True)
100 in_path = String.T(optional=True)
101 in_paths = List.T(String.T(optional=True))
103 codes = List.T(CodesNSLCE.T(), optional=True)
105 rename = Dict.T(
106 String.T(),
107 Choice.T([
108 String.T(),
109 Dict.T(String.T(), String.T())]))
110 tmin = Timestamp.T(optional=True)
111 tmax = Timestamp.T(optional=True)
112 tinc = Float.T(optional=True)
114 downsample = Float.T(optional=True)
116 out_path = Path.T(optional=True)
117 out_sds_path = Path.T(optional=True)
118 out_format = OutputFormatChoice.T(optional=True)
119 out_data_type = OutputDataTypeChoice.T(optional=True)
120 out_mseed_record_length = IntChoice.T(
121 optional=True,
122 choices=list(io.mseed.VALID_RECORD_LENGTHS))
123 out_mseed_steim = IntChoice.T(
124 optional=True,
125 choices=[1, 2])
126 out_meta_path = Path.T(optional=True)
128 traversal = TraversalChoice.T(optional=True)
130 parts = List.T(Defer('Converter.T'))
132 @classmethod
133 def add_arguments(cls, p):
134 p.add_squirrel_query_arguments(without=['time'])
136 p.add_argument(
137 '--tinc',
138 dest='tinc',
139 type=float,
140 metavar='SECONDS',
141 help='Set time length of output files [s].')
143 p.add_argument(
144 '--downsample',
145 dest='downsample',
146 type=float,
147 metavar='RATE',
148 help='Downsample to RATE [Hz].')
150 p.add_argument(
151 '--out-path',
152 dest='out_path',
153 metavar='TEMPLATE',
154 help='Set output path to TEMPLATE. Available placeholders '
155 'are %%n: network, %%s: station, %%l: location, %%c: '
156 'channel, %%b: begin time, %%e: end time, %%j: julian day of '
157 'year. The following additional placeholders use the window '
158 'begin and end times rather than trace begin and end times '
159 '(to suppress producing many small files for gappy traces), '
160 '%%(wmin_year)s, %%(wmin_month)s, %%(wmin_day)s, %%(wmin)s, '
161 '%%(wmin_jday)s, %%(wmax_year)s, %%(wmax_month)s, '
162 '%%(wmax_day)s, %%(wmax)s, %%(wmax_jday)s. '
163 'Example: --output=\'data/%%s/trace-%%s-%%c.mseed\'')
165 p.add_argument(
166 '--out-sds-path',
167 dest='out_sds_path',
168 metavar='PATH',
169 help='Set output path to create SDS (https://www.seiscomp.de'
170 '/seiscomp3/doc/applications/slarchive/SDS.html), rooted at '
171 'the given path. Implies --tinc=86400. '
172 'Example: --output-sds-path=data/sds')
174 p.add_argument(
175 '--out-format',
176 dest='out_format',
177 choices=io.allowed_formats('save'),
178 metavar='FORMAT',
179 help='Set output file format. Choices: %s' % io.allowed_formats(
180 'save', 'cli_help', 'mseed'))
182 p.add_argument(
183 '--out-data-type',
184 dest='out_data_type',
185 choices=OutputDataTypeChoice.choices,
186 metavar='DTYPE',
187 help='Set numerical data type. Choices: %s. The output file '
188 'format must support the given type. By default, the data '
189 'type is kept unchanged.' % ', '.join(
190 OutputDataTypeChoice.choices))
192 p.add_argument(
193 '--out-mseed-record-length',
194 dest='out_mseed_record_length',
195 type=int,
196 choices=io.mseed.VALID_RECORD_LENGTHS,
197 metavar='INT',
198 help='Set the mseed record length in bytes. Choices: %s. '
199 'Default is 4096 bytes, which is commonly used for archiving.'
200 % ', '.join(str(b) for b in io.mseed.VALID_RECORD_LENGTHS))
202 p.add_argument(
203 '--out-mseed-steim',
204 dest='out_mseed_steim',
205 type=int,
206 choices=(1, 2),
207 metavar='INT',
208 help='Set the mseed STEIM compression. Choices: 1 or 2. '
209 'Default is STEIM-2, which can compress full range int32. '
210 'Note: STEIM-2 is limited to 30 bit dynamic range.')
212 p.add_argument(
213 '--out-meta-path',
214 dest='out_meta_path',
215 metavar='PATH',
216 help='Set output path for station metadata (StationXML) export.')
218 p.add_argument(
219 '--traversal',
220 dest='traversal',
221 metavar='GROUPING',
222 choices=TraversalChoice.choices,
223 help='By default the outermost processing loop is over time. '
224 'Add outer loop with given GROUPING. Choices: %s'
225 % ', '.join(TraversalChoice.choices))
227 @classmethod
228 def from_arguments(cls, args):
229 kwargs = args.squirrel_query
231 obj = cls(
232 downsample=args.downsample,
233 out_format=args.out_format,
234 out_path=args.out_path,
235 tinc=args.tinc,
236 out_sds_path=args.out_sds_path,
237 out_data_type=args.out_data_type,
238 out_mseed_record_length=args.out_mseed_record_length,
239 out_mseed_steim=args.out_mseed_steim,
240 out_meta_path=args.out_meta_path,
241 traversal=args.traversal,
242 **kwargs)
244 obj.validate()
245 return obj
247 def add_dataset(self, sq):
248 if self.in_dataset is not None:
249 sq.add_dataset(self.in_dataset)
251 if self.in_path is not None:
252 ds = Dataset(sources=[LocalData(paths=[self.in_path])])
253 ds.set_basepath_from(self)
254 sq.add_dataset(ds)
256 if self.in_paths:
257 ds = Dataset(sources=[LocalData(paths=self.in_paths)])
258 ds.set_basepath_from(self)
259 sq.add_dataset(ds)
261 def get_effective_rename_rules(self, chain):
262 d = {}
263 for k in ['network', 'station', 'location', 'channel']:
264 v = chain.dget('rename', k)
265 if isinstance(v, str):
266 m = re.match(r'/([^/]+)/([^/]*)/', v)
267 if m:
268 try:
269 v = (re.compile(m.group(1)), m.group(2))
270 except Exception:
271 raise JackseisError(
272 'Invalid replacement pattern: /%s/' % m.group(1))
274 d[k] = v
276 return d
278 def get_effective_out_path(self):
279 nset = sum(x is not None for x in (
280 self.out_path,
281 self.out_sds_path))
283 if nset > 1:
284 raise JackseisError(
285 'More than one out of [out_path, out_sds_path] set.')
287 is_sds = False
288 if self.out_path:
289 out_path = self.out_path
291 elif self.out_sds_path:
292 out_path = op.join(
293 self.out_sds_path,
294 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D'
295 '/%(network)s.%(station)s.%(location)s.%(channel)s.D'
296 '.%(wmin_year)s.%(wmin_jday)s')
297 is_sds = True
298 else:
299 out_path = None
301 if out_path is not None:
302 return self.expand_path(out_path), is_sds
303 else:
304 return None
306 def get_effective_out_meta_path(self):
307 if self.out_meta_path is not None:
308 return self.expand_path(self.out_meta_path)
309 else:
310 return None
312 def do_rename(self, rules, tr):
313 rename = {}
314 for k in ['network', 'station', 'location', 'channel']:
315 v = rules.get(k, None)
316 if isinstance(v, str):
317 rename[k] = v
318 elif isinstance(v, dict):
319 try:
320 oldval = getattr(tr, k)
321 rename[k] = v[oldval]
322 except KeyError:
323 raise ToolError(
324 'No mapping defined for %s code "%s".' % (k, oldval))
326 elif isinstance(v, tuple):
327 pat, repl = v
328 oldval = getattr(tr, k)
329 newval, n = pat.subn(repl, oldval)
330 if n:
331 rename[k] = newval
333 tr.set_codes(**rename)
335 def convert(self, args, chain=None):
336 if chain is None:
337 defaults = clone(g_defaults)
338 defaults.set_basepath_from(self)
339 chain = Chain(defaults)
341 chain = Chain(self, chain)
343 if self.parts:
344 task = make_task('Jackseis parts')
345 for part in task(self.parts):
346 part.convert(args, chain)
348 del task
350 else:
351 sq = args.make_squirrel()
353 cli_overrides = Converter.from_arguments(args)
354 cli_overrides.set_basepath('.')
356 chain = Chain(cli_overrides, chain)
358 chain.mcall('add_dataset', sq)
360 tmin = chain.get('tmin')
361 tmax = chain.get('tmax')
362 tinc = chain.get('tinc')
363 codes = chain.get('codes')
364 downsample = chain.get('downsample')
365 out_path, is_sds = chain.fcall('get_effective_out_path') \
366 or (None, False)
368 if is_sds and tinc != 86400.:
369 logger.warning('Setting time window to 1 day to generate SDS.')
370 tinc = 86400.0
372 out_format = chain.get('out_format')
373 out_data_type = chain.get('out_data_type')
375 out_meta_path = chain.fcall('get_effective_out_meta_path')
377 if out_meta_path is not None:
378 sx = sq.get_stationxml(codes=codes, tmin=tmin, tmax=tmax)
379 util.ensuredirs(out_meta_path)
380 sx.dump_xml(filename=out_meta_path)
381 if out_path is None:
382 return
384 target_deltat = None
385 if downsample is not None:
386 target_deltat = 1.0 / float(downsample)
388 save_kwargs = {}
389 if out_format == 'mseed':
390 save_kwargs['record_length'] = chain.get(
391 'out_mseed_record_length')
392 save_kwargs['steim'] = chain.get(
393 'out_mseed_steim')
395 traversal = chain.get('traversal')
396 if traversal is not None:
397 grouping = TraversalChoice.name_to_grouping[traversal]
398 else:
399 grouping = None
401 tpad = 0.0
402 if target_deltat is not None:
403 tpad += target_deltat * 50.
405 task = None
406 rename_rules = self.get_effective_rename_rules(chain)
407 for batch in sq.chopper_waveforms(
408 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc,
409 codes=codes,
410 snap_window=True,
411 grouping=grouping):
413 if task is None:
414 task = make_task(
415 'Jackseis blocks', batch.n * batch.ngroups)
417 tlabel = '%s%s - %s' % (
418 'groups %i / %i: ' % (batch.igroup, batch.ngroups)
419 if batch.ngroups > 1 else '',
420 util.time_to_str(batch.tmin),
421 util.time_to_str(batch.tmax))
423 task.update(batch.i + batch.igroup * batch.n, tlabel)
425 twmin = batch.tmin
426 twmax = batch.tmax
428 traces = batch.traces
430 if target_deltat is not None:
431 out_traces = []
432 for tr in traces:
433 try:
434 tr.downsample_to(
435 target_deltat, snap=True, demean=False)
437 out_traces.append(tr)
439 except (trace.TraceTooShort, trace.NoData):
440 pass
442 traces = out_traces
444 for tr in traces:
445 self.do_rename(rename_rules, tr)
447 if out_data_type:
448 for tr in traces:
449 tr.ydata = tr.ydata.astype(
450 OutputDataTypeChoice.name_to_dtype[out_data_type])
452 chopped_traces = []
453 for tr in traces:
454 try:
455 otr = tr.chop(twmin, twmax, inplace=False)
456 chopped_traces.append(otr)
457 except trace.NoData:
458 pass
460 traces = chopped_traces
462 if out_path is not None:
463 try:
464 io.save(
465 traces, out_path,
466 format=out_format,
467 overwrite=args.force,
468 additional=dict(
469 wmin_year=tts(twmin, format='%Y'),
470 wmin_month=tts(twmin, format='%m'),
471 wmin_day=tts(twmin, format='%d'),
472 wmin_jday=tts(twmin, format='%j'),
473 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'),
474 wmax_year=tts(twmax, format='%Y'),
475 wmax_month=tts(twmax, format='%m'),
476 wmax_day=tts(twmax, format='%d'),
477 wmax_jday=tts(twmax, format='%j'),
478 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')),
479 **save_kwargs)
481 except io.FileSaveError as e:
482 raise JackseisError(str(e))
484 else:
485 for tr in traces:
486 print(tr.summary)
488 if task:
489 task.done()
492g_defaults = Converter(
493 out_mseed_record_length=4096,
494 out_format='mseed',
495 out_mseed_steim=2)
498headline = 'Convert waveform archive data.'
501def make_subparser(subparsers):
502 return subparsers.add_parser(
503 'jackseis',
504 help=headline,
505 description=headline)
508def setup(parser):
509 parser.add_squirrel_selection_arguments()
511 parser.add_argument(
512 '--config',
513 dest='config_path',
514 metavar='NAME',
515 help='File containing `jackseis.Converter` settings.')
517 parser.add_argument(
518 '--force',
519 dest='force',
520 action='store_true',
521 default=False,
522 help='Force overwriting of existing files.')
524 Converter.add_arguments(parser)
527def run(parser, args):
528 if args.config_path:
529 try:
530 converters = load_all(filename=args.config_path)
531 except Exception as e:
532 raise ToolError(str(e))
534 for converter in converters:
535 if not isinstance(converter, Converter):
536 raise ToolError(
537 'Config file should only contain '
538 '`jackseis.Converter` objects.')
540 converter.set_basepath(op.dirname(args.config_path))
542 else:
543 converter = Converter()
544 converter.set_basepath('.')
545 converters = [converter]
547 with progress.view():
548 task = make_task('Jackseis jobs')
549 for converter in task(converters):
550 converter.convert(args)