1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import re
9import logging
10import os.path as op
12import numpy as num
14from pyrocko import io, trace, util
15from pyrocko.progress import progress
16from pyrocko.has_paths import Path, HasPaths
17from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \
18 StringChoice, IntChoice, Defer, load_all, clone
19from pyrocko.squirrel.dataset import Dataset
20from pyrocko.squirrel.client.local import LocalData
21from pyrocko.squirrel.error import ToolError
22from pyrocko.squirrel.model import CodesNSLCE
24tts = util.time_to_str
26guts_prefix = 'jackseis'
27logger = logging.getLogger('psq.cli.jackseis')
30def make_task(*args):
31 return progress.task(*args, logger=logger)
34class JackseisError(ToolError):
35 pass
38class Chain(object):
39 def __init__(self, node, parent=None):
40 self.node = node
41 self.parent = parent
43 def mcall(self, name, *args, **kwargs):
44 ret = []
45 if self.parent is not None:
46 ret.append(self.parent.mcall(name, *args, **kwargs))
48 ret.append(getattr(self.node, name)(*args, **kwargs))
49 return ret
51 def fcall(self, name, *args, **kwargs):
52 v = getattr(self.node, name)(*args, **kwargs)
53 if v is None and self.parent is not None:
54 return self.parent.fcall(name, *args, **kwargs)
55 else:
56 return v
58 def get(self, name):
59 v = getattr(self.node, name)
60 if v is None and self.parent is not None:
61 return self.parent.get(name)
62 else:
63 return v
65 def dget(self, name, k):
66 v = getattr(self.node, name).get(k, None)
67 if v is None and self.parent is not None:
68 return self.parent.dget(name, k)
69 else:
70 return v
73class OutputFormatChoice(StringChoice):
74 choices = io.allowed_formats('save')
77class OutputDataTypeChoice(StringChoice):
78 choices = ['int32', 'int64', 'float32', 'float64']
79 name_to_dtype = {
80 'int32': num.int32,
81 'int64': num.int64,
82 'float32': num.float32,
83 'float64': num.float64}
86class Converter(HasPaths):
88 in_dataset = Dataset.T(optional=True)
89 in_path = String.T(optional=True)
90 in_paths = List.T(String.T(optional=True))
92 codes = List.T(CodesNSLCE.T(), optional=True)
94 rename = Dict.T(
95 String.T(),
96 Choice.T([
97 String.T(),
98 Dict.T(String.T(), String.T())]))
99 tmin = Timestamp.T(optional=True)
100 tmax = Timestamp.T(optional=True)
101 tinc = Float.T(optional=True)
103 downsample = Float.T(optional=True)
105 out_path = Path.T(optional=True)
106 out_sds_path = Path.T(optional=True)
107 out_format = OutputFormatChoice.T(optional=True)
108 out_data_type = OutputDataTypeChoice.T(optional=True)
109 out_mseed_record_length = IntChoice.T(
110 optional=True,
111 choices=list(io.mseed.VALID_RECORD_LENGTHS))
112 out_mseed_steim = IntChoice.T(
113 optional=True,
114 choices=[1, 2])
115 out_meta_path = Path.T(optional=True)
117 parts = List.T(Defer('Converter.T'))
119 @classmethod
120 def add_arguments(cls, p):
121 p.add_squirrel_query_arguments(without=['time'])
123 p.add_argument(
124 '--downsample',
125 dest='downsample',
126 type=float,
127 metavar='RATE',
128 help='Downsample to RATE [Hz].')
130 p.add_argument(
131 '--out-path',
132 dest='out_path',
133 metavar='TEMPLATE',
134 help='Set output path to TEMPLATE. Available placeholders '
135 'are %%n: network, %%s: station, %%l: location, %%c: '
136 'channel, %%b: begin time, %%e: end time, %%j: julian day of '
137 'year. The following additional placeholders use the window '
138 'begin and end times rather than trace begin and end times '
139 '(to suppress producing many small files for gappy traces), '
140 '%%(wmin_year)s, %%(wmin_month)s, %%(wmin_day)s, %%(wmin)s, '
141 '%%(wmin_jday)s, %%(wmax_year)s, %%(wmax_month)s, '
142 '%%(wmax_day)s, %%(wmax)s, %%(wmax_jday)s. '
143 'Example: --output=\'data/%%s/trace-%%s-%%c.mseed\'')
145 p.add_argument(
146 '--out-sds-path',
147 dest='out_sds_path',
148 metavar='PATH',
149 help='Set output path to create SDS (https://www.seiscomp.de'
150 '/seiscomp3/doc/applications/slarchive/SDS.html), rooted at '
151 'the given path. Implies --tinc=86400. '
152 'Example: --output-sds-path=data/sds')
154 p.add_argument(
155 '--out-format',
156 dest='out_format',
157 choices=io.allowed_formats('save'),
158 metavar='FORMAT',
159 help='Set output file format. Choices: %s' % io.allowed_formats(
160 'save', 'cli_help', 'mseed'))
162 p.add_argument(
163 '--out-data-type',
164 dest='out_data_type',
165 choices=OutputDataTypeChoice.choices,
166 metavar='DTYPE',
167 help='Set numerical data type. Choices: %s. The output file '
168 'format must support the given type. By default, the data '
169 'type is kept unchanged.' % ', '.join(
170 OutputDataTypeChoice.choices))
172 p.add_argument(
173 '--out-mseed-record-length',
174 dest='out_mseed_record_length',
175 type=int,
176 choices=io.mseed.VALID_RECORD_LENGTHS,
177 metavar='INT',
178 help='Set the mseed record length in bytes. Choices: %s. '
179 'Default is 4096 bytes, which is commonly used for archiving.'
180 % ', '.join(str(b) for b in io.mseed.VALID_RECORD_LENGTHS))
182 p.add_argument(
183 '--out-mseed-steim',
184 dest='out_mseed_steim',
185 type=int,
186 choices=(1, 2),
187 metavar='INT',
188 help='Set the mseed STEIM compression. Choices: 1 or 2. '
189 'Default is STEIM-2, which can compress full range int32. '
190 'Note: STEIM-2 is limited to 30 bit dynamic range.')
192 p.add_argument(
193 '--out-meta-path',
194 dest='out_meta_path',
195 metavar='PATH',
196 help='Set output path for station metadata (StationXML) export.')
198 @classmethod
199 def from_arguments(cls, args):
200 kwargs = args.squirrel_query
202 obj = cls(
203 downsample=args.downsample,
204 out_format=args.out_format,
205 out_path=args.out_path,
206 out_sds_path=args.out_sds_path,
207 out_data_type=args.out_data_type,
208 out_mseed_record_length=args.out_mseed_record_length,
209 out_mseed_steim=args.out_mseed_steim,
210 out_meta_path=args.out_meta_path,
211 **kwargs)
213 obj.validate()
214 return obj
216 def add_dataset(self, sq):
217 if self.in_dataset is not None:
218 sq.add_dataset(self.in_dataset)
220 if self.in_path is not None:
221 ds = Dataset(sources=[LocalData(paths=[self.in_path])])
222 ds.set_basepath_from(self)
223 sq.add_dataset(ds)
225 if self.in_paths:
226 ds = Dataset(sources=[LocalData(paths=self.in_paths)])
227 ds.set_basepath_from(self)
228 sq.add_dataset(ds)
230 def get_effective_rename_rules(self, chain):
231 d = {}
232 for k in ['network', 'station', 'location', 'channel']:
233 v = chain.dget('rename', k)
234 if isinstance(v, str):
235 m = re.match(r'/([^/]+)/([^/]*)/', v)
236 if m:
237 try:
238 v = (re.compile(m.group(1)), m.group(2))
239 except Exception:
240 raise JackseisError(
241 'Invalid replacement pattern: /%s/' % m.group(1))
243 d[k] = v
245 return d
247 def get_effective_out_path(self):
248 nset = sum(x is not None for x in (
249 self.out_path,
250 self.out_sds_path))
252 if nset > 1:
253 raise JackseisError(
254 'More than one out of [out_path, out_sds_path] set.')
256 is_sds = False
257 if self.out_path:
258 out_path = self.out_path
260 elif self.out_sds_path:
261 out_path = op.join(
262 self.out_sds_path,
263 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D'
264 '/%(network)s.%(station)s.%(location)s.%(channel)s.D'
265 '.%(wmin_year)s.%(wmin_jday)s')
266 is_sds = True
267 else:
268 out_path = None
270 if out_path is not None:
271 return self.expand_path(out_path), is_sds
272 else:
273 return None
275 def get_effective_out_meta_path(self):
276 if self.out_meta_path is not None:
277 return self.expand_path(self.out_meta_path)
278 else:
279 return None
281 def do_rename(self, rules, tr):
282 rename = {}
283 for k in ['network', 'station', 'location', 'channel']:
284 v = rules.get(k, None)
285 if isinstance(v, str):
286 rename[k] = v
287 elif isinstance(v, dict):
288 try:
289 oldval = getattr(tr, k)
290 rename[k] = v[oldval]
291 except KeyError:
292 raise ToolError(
293 'No mapping defined for %s code "%s".' % (k, oldval))
295 elif isinstance(v, tuple):
296 pat, repl = v
297 oldval = getattr(tr, k)
298 newval, n = pat.subn(repl, oldval)
299 if n:
300 rename[k] = newval
302 tr.set_codes(**rename)
304 def convert(self, args, chain=None):
305 if chain is None:
306 defaults = clone(g_defaults)
307 defaults.set_basepath_from(self)
308 chain = Chain(defaults)
310 chain = Chain(self, chain)
312 if self.parts:
313 task = make_task('Jackseis parts')
314 for part in task(self.parts):
315 part.convert(args, chain)
317 del task
319 else:
320 sq = args.make_squirrel()
322 cli_overrides = Converter.from_arguments(args)
323 cli_overrides.set_basepath('.')
325 chain = Chain(cli_overrides, chain)
327 chain.mcall('add_dataset', sq)
329 tmin = chain.get('tmin')
330 tmax = chain.get('tmax')
331 tinc = chain.get('tinc')
332 codes = chain.get('codes')
333 downsample = chain.get('downsample')
334 out_path, is_sds = chain.fcall('get_effective_out_path') \
335 or (None, False)
337 if is_sds and tinc != 86400.:
338 logger.warning('Setting time window to 1 day to generate SDS.')
339 tinc = 86400.0
341 out_format = chain.get('out_format')
342 out_data_type = chain.get('out_data_type')
344 out_meta_path = chain.fcall('get_effective_out_meta_path')
346 if out_meta_path is not None:
347 sx = sq.get_stationxml(codes=codes, tmin=tmin, tmax=tmax)
348 util.ensuredirs(out_meta_path)
349 sx.dump_xml(filename=out_meta_path)
350 if out_path is None:
351 return
353 target_deltat = None
354 if downsample is not None:
355 target_deltat = 1.0 / float(downsample)
357 save_kwargs = {}
358 if out_format == 'mseed':
359 save_kwargs['record_length'] = chain.get(
360 'out_mseed_record_length')
361 save_kwargs['steim'] = chain.get(
362 'out_mseed_steim')
364 tpad = 0.0
365 if target_deltat is not None:
366 tpad += target_deltat * 50.
368 task = None
369 rename_rules = self.get_effective_rename_rules(chain)
370 for batch in sq.chopper_waveforms(
371 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc,
372 codes=codes,
373 snap_window=True):
375 if task is None:
376 task = make_task('Jackseis blocks', batch.n)
378 task.update(
379 batch.i,
380 '%s - %s' % (
381 util.time_to_str(batch.tmin),
382 util.time_to_str(batch.tmax)))
384 twmin = batch.tmin
385 twmax = batch.tmax
387 traces = batch.traces
389 if target_deltat is not None:
390 out_traces = []
391 for tr in traces:
392 try:
393 tr.downsample_to(
394 target_deltat, snap=True, demean=False)
396 out_traces.append(tr)
398 except (trace.TraceTooShort, trace.NoData):
399 pass
401 traces = out_traces
403 for tr in traces:
404 self.do_rename(rename_rules, tr)
406 if out_data_type:
407 for tr in traces:
408 tr.ydata = tr.ydata.astype(
409 OutputDataTypeChoice.name_to_dtype[out_data_type])
411 chopped_traces = []
412 for tr in traces:
413 try:
414 otr = tr.chop(twmin, twmax, inplace=False)
415 chopped_traces.append(otr)
416 except trace.NoData:
417 pass
419 traces = chopped_traces
421 if out_path is not None:
422 try:
423 io.save(
424 traces, out_path,
425 format=out_format,
426 overwrite=args.force,
427 additional=dict(
428 wmin_year=tts(twmin, format='%Y'),
429 wmin_month=tts(twmin, format='%m'),
430 wmin_day=tts(twmin, format='%d'),
431 wmin_jday=tts(twmin, format='%j'),
432 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'),
433 wmax_year=tts(twmax, format='%Y'),
434 wmax_month=tts(twmax, format='%m'),
435 wmax_day=tts(twmax, format='%d'),
436 wmax_jday=tts(twmax, format='%j'),
437 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')),
438 **save_kwargs)
440 except io.FileSaveError as e:
441 raise JackseisError(str(e))
443 else:
444 for tr in traces:
445 print(tr.summary)
447 if task:
448 task.done()
451g_defaults = Converter(
452 out_mseed_record_length=4096,
453 out_format='mseed',
454 out_mseed_steim=2)
457headline = 'Convert waveform archive data.'
460def make_subparser(subparsers):
461 return subparsers.add_parser(
462 'jackseis',
463 help=headline,
464 description=headline)
467def setup(parser):
468 parser.add_squirrel_selection_arguments()
470 parser.add_argument(
471 '--config',
472 dest='config_path',
473 metavar='NAME',
474 help='File containing `jackseis.Converter` settings.')
476 parser.add_argument(
477 '--force',
478 dest='force',
479 action='store_true',
480 default=False,
481 help='Force overwriting of existing files.')
483 Converter.add_arguments(parser)
486def run(parser, args):
487 if args.config_path:
488 try:
489 converters = load_all(filename=args.config_path)
490 except Exception as e:
491 raise ToolError(str(e))
493 for converter in converters:
494 if not isinstance(converter, Converter):
495 raise ToolError(
496 'Config file should only contain '
497 '`jackseis.Converter` objects.')
499 converter.set_basepath(op.dirname(args.config_path))
501 else:
502 converter = Converter()
503 converter.set_basepath('.')
504 converters = [converter]
506 with progress.view():
507 task = make_task('Jackseis jobs')
508 for converter in task(converters):
509 converter.convert(args)