1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import re
7import logging
8import os.path as op
10import numpy as num
12from pyrocko import io, trace, util
13from pyrocko.progress import progress
14from pyrocko.has_paths import Path, HasPaths
15from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \
16 StringChoice, IntChoice, Defer, load_all, clone
17from pyrocko.squirrel.dataset import Dataset
18from pyrocko.squirrel.client.local import LocalData
19from pyrocko.squirrel.error import ToolError
20from pyrocko.squirrel.model import CodesNSLCE
21from pyrocko.squirrel.operators.base import NetworkGrouping, StationGrouping, \
22 ChannelGrouping, SensorGrouping
24tts = util.time_to_str
26guts_prefix = 'jackseis'
27logger = logging.getLogger('psq.cli.jackseis')
30def make_task(*args):
31 return progress.task(*args, logger=logger)
34class JackseisError(ToolError):
35 pass
38class Chain(object):
39 def __init__(self, node, parent=None):
40 self.node = node
41 self.parent = parent
43 def mcall(self, name, *args, **kwargs):
44 ret = []
45 if self.parent is not None:
46 ret.append(self.parent.mcall(name, *args, **kwargs))
48 ret.append(getattr(self.node, name)(*args, **kwargs))
49 return ret
51 def fcall(self, name, *args, **kwargs):
52 v = getattr(self.node, name)(*args, **kwargs)
53 if v is None and self.parent is not None:
54 return self.parent.fcall(name, *args, **kwargs)
55 else:
56 return v
58 def get(self, name):
59 v = getattr(self.node, name)
60 if v is None and self.parent is not None:
61 return self.parent.get(name)
62 else:
63 return v
65 def dget(self, name, k):
66 v = getattr(self.node, name).get(k, None)
67 if v is None and self.parent is not None:
68 return self.parent.dget(name, k)
69 else:
70 return v
73class OutputFormatChoice(StringChoice):
74 choices = io.allowed_formats('save')
77class OutputDataTypeChoice(StringChoice):
78 choices = ['int32', 'int64', 'float32', 'float64']
79 name_to_dtype = {
80 'int32': num.int32,
81 'int64': num.int64,
82 'float32': num.float32,
83 'float64': num.float64}
86class TraversalChoice(StringChoice):
87 choices = ['network', 'station', 'channel', 'sensor']
88 name_to_grouping = {
89 'network': NetworkGrouping(),
90 'station': StationGrouping(),
91 'sensor': SensorGrouping(),
92 'channel': ChannelGrouping()}
95class Converter(HasPaths):
97 in_dataset = Dataset.T(optional=True)
98 in_path = String.T(optional=True)
99 in_paths = List.T(String.T(optional=True))
101 codes = List.T(CodesNSLCE.T(), optional=True)
103 rename = Dict.T(
104 String.T(),
105 Choice.T([
106 String.T(),
107 Dict.T(String.T(), String.T())]))
108 tmin = Timestamp.T(optional=True)
109 tmax = Timestamp.T(optional=True)
110 tinc = Float.T(optional=True)
112 downsample = Float.T(optional=True)
114 out_path = Path.T(optional=True)
115 out_sds_path = Path.T(optional=True)
116 out_format = OutputFormatChoice.T(optional=True)
117 out_data_type = OutputDataTypeChoice.T(optional=True)
118 out_mseed_record_length = IntChoice.T(
119 optional=True,
120 choices=list(io.mseed.VALID_RECORD_LENGTHS))
121 out_mseed_steim = IntChoice.T(
122 optional=True,
123 choices=[1, 2])
124 out_meta_path = Path.T(optional=True)
126 traversal = TraversalChoice.T(optional=True)
128 parts = List.T(Defer('Converter.T'))
130 @classmethod
131 def add_arguments(cls, p):
132 p.add_squirrel_query_arguments(without=['time'])
134 p.add_argument(
135 '--tinc',
136 dest='tinc',
137 type=float,
138 metavar='SECONDS',
139 help='Set time length of output files [s].')
141 p.add_argument(
142 '--downsample',
143 dest='downsample',
144 type=float,
145 metavar='RATE',
146 help='Downsample to RATE [Hz].')
148 p.add_argument(
149 '--out-path',
150 dest='out_path',
151 metavar='TEMPLATE',
152 help='Set output path to TEMPLATE. Available placeholders '
153 'are %%n: network, %%s: station, %%l: location, %%c: '
154 'channel, %%b: begin time, %%e: end time, %%j: julian day of '
155 'year. The following additional placeholders use the window '
156 'begin and end times rather than trace begin and end times '
157 '(to suppress producing many small files for gappy traces), '
158 '%%(wmin_year)s, %%(wmin_month)s, %%(wmin_day)s, %%(wmin)s, '
159 '%%(wmin_jday)s, %%(wmax_year)s, %%(wmax_month)s, '
160 '%%(wmax_day)s, %%(wmax)s, %%(wmax_jday)s. '
161 "Example: --out-path='data/%%s/trace-%%s-%%c.mseed'")
163 p.add_argument(
164 '--out-sds-path',
165 dest='out_sds_path',
166 metavar='PATH',
167 help='Set output path to create SDS (https://www.seiscomp.de'
168 '/seiscomp3/doc/applications/slarchive/SDS.html), rooted at '
169 'the given path. Implies --tinc=86400. '
170 'Example: --out-sds-path=data/sds')
172 p.add_argument(
173 '--out-format',
174 dest='out_format',
175 choices=io.allowed_formats('save'),
176 metavar='FORMAT',
177 help='Set output file format. Choices: %s' % io.allowed_formats(
178 'save', 'cli_help', 'mseed'))
180 p.add_argument(
181 '--out-data-type',
182 dest='out_data_type',
183 choices=OutputDataTypeChoice.choices,
184 metavar='DTYPE',
185 help='Set numerical data type. Choices: %s. The output file '
186 'format must support the given type. By default, the data '
187 'type is kept unchanged.' % ', '.join(
188 OutputDataTypeChoice.choices))
190 p.add_argument(
191 '--out-mseed-record-length',
192 dest='out_mseed_record_length',
193 type=int,
194 choices=io.mseed.VALID_RECORD_LENGTHS,
195 metavar='INT',
196 help='Set the mseed record length in bytes. Choices: %s. '
197 'Default is 4096 bytes, which is commonly used for archiving.'
198 % ', '.join(str(b) for b in io.mseed.VALID_RECORD_LENGTHS))
200 p.add_argument(
201 '--out-mseed-steim',
202 dest='out_mseed_steim',
203 type=int,
204 choices=(1, 2),
205 metavar='INT',
206 help='Set the mseed STEIM compression. Choices: 1 or 2. '
207 'Default is STEIM-2, which can compress full range int32. '
208 'Note: STEIM-2 is limited to 30 bit dynamic range.')
210 p.add_argument(
211 '--out-meta-path',
212 dest='out_meta_path',
213 metavar='PATH',
214 help='Set output path for station metadata (StationXML) export.')
216 p.add_argument(
217 '--traversal',
218 dest='traversal',
219 metavar='GROUPING',
220 choices=TraversalChoice.choices,
221 help='By default the outermost processing loop is over time. '
222 'Add outer loop with given GROUPING. Choices: %s'
223 % ', '.join(TraversalChoice.choices))
225 @classmethod
226 def from_arguments(cls, args):
227 kwargs = args.squirrel_query
229 obj = cls(
230 downsample=args.downsample,
231 out_format=args.out_format,
232 out_path=args.out_path,
233 tinc=args.tinc,
234 out_sds_path=args.out_sds_path,
235 out_data_type=args.out_data_type,
236 out_mseed_record_length=args.out_mseed_record_length,
237 out_mseed_steim=args.out_mseed_steim,
238 out_meta_path=args.out_meta_path,
239 traversal=args.traversal,
240 **kwargs)
242 obj.validate()
243 return obj
245 def add_dataset(self, sq):
246 if self.in_dataset is not None:
247 sq.add_dataset(self.in_dataset)
249 if self.in_path is not None:
250 ds = Dataset(sources=[LocalData(paths=[self.in_path])])
251 ds.set_basepath_from(self)
252 sq.add_dataset(ds)
254 if self.in_paths:
255 ds = Dataset(sources=[LocalData(paths=self.in_paths)])
256 ds.set_basepath_from(self)
257 sq.add_dataset(ds)
259 def get_effective_rename_rules(self, chain):
260 d = {}
261 for k in ['network', 'station', 'location', 'channel']:
262 v = chain.dget('rename', k)
263 if isinstance(v, str):
264 m = re.match(r'/([^/]+)/([^/]*)/', v)
265 if m:
266 try:
267 v = (re.compile(m.group(1)), m.group(2))
268 except Exception:
269 raise JackseisError(
270 'Invalid replacement pattern: /%s/' % m.group(1))
272 d[k] = v
274 return d
276 def get_effective_out_path(self):
277 nset = sum(x is not None for x in (
278 self.out_path,
279 self.out_sds_path))
281 if nset > 1:
282 raise JackseisError(
283 'More than one out of [out_path, out_sds_path] set.')
285 is_sds = False
286 if self.out_path:
287 out_path = self.out_path
289 elif self.out_sds_path:
290 out_path = op.join(
291 self.out_sds_path,
292 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D'
293 '/%(network)s.%(station)s.%(location)s.%(channel)s.D'
294 '.%(wmin_year)s.%(wmin_jday)s')
295 is_sds = True
296 else:
297 out_path = None
299 if out_path is not None:
300 return self.expand_path(out_path), is_sds
301 else:
302 return None
304 def get_effective_out_meta_path(self):
305 if self.out_meta_path is not None:
306 return self.expand_path(self.out_meta_path)
307 else:
308 return None
310 def do_rename(self, rules, tr):
311 rename = {}
312 for k in ['network', 'station', 'location', 'channel']:
313 v = rules.get(k, None)
314 if isinstance(v, str):
315 rename[k] = v
316 elif isinstance(v, dict):
317 try:
318 oldval = getattr(tr, k)
319 rename[k] = v[oldval]
320 except KeyError:
321 raise ToolError(
322 'No mapping defined for %s code "%s".' % (k, oldval))
324 elif isinstance(v, tuple):
325 pat, repl = v
326 oldval = getattr(tr, k)
327 newval, n = pat.subn(repl, oldval)
328 if n:
329 rename[k] = newval
331 tr.set_codes(**rename)
333 def convert(self, args, chain=None):
334 if chain is None:
335 defaults = clone(g_defaults)
336 defaults.set_basepath_from(self)
337 chain = Chain(defaults)
339 chain = Chain(self, chain)
341 if self.parts:
342 task = make_task('Jackseis parts')
343 for part in task(self.parts):
344 part.convert(args, chain)
346 del task
348 else:
349 sq = args.make_squirrel()
351 cli_overrides = Converter.from_arguments(args)
352 cli_overrides.set_basepath('.')
354 chain = Chain(cli_overrides, chain)
356 chain.mcall('add_dataset', sq)
358 tmin = chain.get('tmin')
359 tmax = chain.get('tmax')
360 tinc = chain.get('tinc')
361 codes = chain.get('codes')
362 downsample = chain.get('downsample')
363 out_path, is_sds = chain.fcall('get_effective_out_path') \
364 or (None, False)
366 if is_sds and tinc != 86400.:
367 logger.warning('Setting time window to 1 day to generate SDS.')
368 tinc = 86400.0
370 out_format = chain.get('out_format')
371 out_data_type = chain.get('out_data_type')
373 out_meta_path = chain.fcall('get_effective_out_meta_path')
375 if out_meta_path is not None:
376 sx = sq.get_stationxml(codes=codes, tmin=tmin, tmax=tmax)
377 util.ensuredirs(out_meta_path)
378 sx.dump_xml(filename=out_meta_path)
379 if out_path is None:
380 return
382 target_deltat = None
383 if downsample is not None:
384 target_deltat = 1.0 / float(downsample)
386 save_kwargs = {}
387 if out_format == 'mseed':
388 save_kwargs['record_length'] = chain.get(
389 'out_mseed_record_length')
390 save_kwargs['steim'] = chain.get(
391 'out_mseed_steim')
393 traversal = chain.get('traversal')
394 if traversal is not None:
395 grouping = TraversalChoice.name_to_grouping[traversal]
396 else:
397 grouping = None
399 tpad = 0.0
400 if target_deltat is not None:
401 tpad += target_deltat * 50.
403 task = None
404 rename_rules = self.get_effective_rename_rules(chain)
405 for batch in sq.chopper_waveforms(
406 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc,
407 codes=codes,
408 snap_window=True,
409 grouping=grouping):
411 if task is None:
412 task = make_task(
413 'Jackseis blocks', batch.n * batch.ngroups)
415 tlabel = '%s%s - %s' % (
416 'groups %i / %i: ' % (batch.igroup, batch.ngroups)
417 if batch.ngroups > 1 else '',
418 util.time_to_str(batch.tmin),
419 util.time_to_str(batch.tmax))
421 task.update(batch.i + batch.igroup * batch.n, tlabel)
423 twmin = batch.tmin
424 twmax = batch.tmax
426 traces = batch.traces
428 if target_deltat is not None:
429 out_traces = []
430 for tr in traces:
431 try:
432 tr.downsample_to(
433 target_deltat, snap=True, demean=False)
435 out_traces.append(tr)
437 except (trace.TraceTooShort, trace.NoData):
438 pass
440 traces = out_traces
442 for tr in traces:
443 self.do_rename(rename_rules, tr)
445 if out_data_type:
446 for tr in traces:
447 tr.ydata = tr.ydata.astype(
448 OutputDataTypeChoice.name_to_dtype[out_data_type])
450 chopped_traces = []
451 for tr in traces:
452 try:
453 otr = tr.chop(twmin, twmax, inplace=False)
454 chopped_traces.append(otr)
455 except trace.NoData:
456 pass
458 traces = chopped_traces
460 if out_path is not None:
461 try:
462 io.save(
463 traces, out_path,
464 format=out_format,
465 overwrite=args.force,
466 additional=dict(
467 wmin_year=tts(twmin, format='%Y'),
468 wmin_month=tts(twmin, format='%m'),
469 wmin_day=tts(twmin, format='%d'),
470 wmin_jday=tts(twmin, format='%j'),
471 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'),
472 wmax_year=tts(twmax, format='%Y'),
473 wmax_month=tts(twmax, format='%m'),
474 wmax_day=tts(twmax, format='%d'),
475 wmax_jday=tts(twmax, format='%j'),
476 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')),
477 **save_kwargs)
479 except io.FileSaveError as e:
480 raise JackseisError(str(e))
482 else:
483 for tr in traces:
484 print(tr.summary)
486 if task:
487 task.done()
490g_defaults = Converter(
491 out_mseed_record_length=4096,
492 out_format='mseed',
493 out_mseed_steim=2)
496headline = 'Convert waveform archive data.'
499def make_subparser(subparsers):
500 return subparsers.add_parser(
501 'jackseis',
502 help=headline,
503 description=headline)
506def setup(parser):
507 parser.add_squirrel_selection_arguments()
509 parser.add_argument(
510 '--config',
511 dest='config_path',
512 metavar='NAME',
513 help='File containing `jackseis.Converter` settings.')
515 parser.add_argument(
516 '--force',
517 dest='force',
518 action='store_true',
519 default=False,
520 help='Force overwriting of existing files.')
522 Converter.add_arguments(parser)
525def run(parser, args):
526 if args.config_path:
527 try:
528 converters = load_all(filename=args.config_path)
529 except Exception as e:
530 raise ToolError(str(e))
532 for converter in converters:
533 if not isinstance(converter, Converter):
534 raise ToolError(
535 'Config file should only contain '
536 '`jackseis.Converter` objects.')
538 converter.set_basepath(op.dirname(args.config_path))
540 else:
541 converter = Converter()
542 converter.set_basepath('.')
543 converters = [converter]
545 with progress.view():
546 task = make_task('Jackseis jobs')
547 for converter in task(converters):
548 converter.convert(args)