1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import re
9import logging
10import os.path as op
12import numpy as num
14from pyrocko import io, trace, util
15from pyrocko.progress import progress
16from pyrocko.has_paths import Path, HasPaths
17from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \
18 StringChoice, IntChoice, Defer, load_all, clone
19from pyrocko.squirrel.dataset import Dataset
20from pyrocko.squirrel.client.local import LocalData
21from pyrocko.squirrel.error import ToolError
23tts = util.time_to_str
25guts_prefix = 'jackseis'
26logger = logging.getLogger('psq.cli.jackseis')
29def make_task(*args):
30 return progress.task(*args, logger=logger)
33class JackseisError(ToolError):
34 pass
37class Chain(object):
38 def __init__(self, node, parent=None):
39 self.node = node
40 self.parent = parent
42 def mcall(self, name, *args, **kwargs):
43 ret = []
44 if self.parent is not None:
45 ret.append(self.parent.mcall(name, *args, **kwargs))
47 ret.append(getattr(self.node, name)(*args, **kwargs))
48 return ret
50 def fcall(self, name, *args, **kwargs):
51 v = getattr(self.node, name)(*args, **kwargs)
52 if v is None and self.parent is not None:
53 return self.parent.fcall(name, *args, **kwargs)
54 else:
55 return v
57 def get(self, name):
58 v = getattr(self.node, name)
59 if v is None and self.parent is not None:
60 return self.parent.get(name)
61 else:
62 return v
64 def dget(self, name, k):
65 v = getattr(self.node, name).get(k, None)
66 if v is None and self.parent is not None:
67 return self.parent.dget(name, k)
68 else:
69 return v
72class OutputFormatChoice(StringChoice):
73 choices = io.allowed_formats('save')
76class OutputDataTypeChoice(StringChoice):
77 choices = ['int32', 'int64', 'float32', 'float64']
78 name_to_dtype = {
79 'int32': num.int32,
80 'int64': num.int64,
81 'float32': num.float32,
82 'float64': num.float64}
85class Converter(HasPaths):
87 in_dataset = Dataset.T(optional=True)
88 in_path = String.T(optional=True)
89 in_paths = List.T(String.T(optional=True))
91 codes = String.T(optional=True)
93 rename = Dict.T(
94 String.T(),
95 Choice.T([
96 String.T(),
97 Dict.T(String.T(), String.T())]))
98 tmin = Timestamp.T(optional=True)
99 tmax = Timestamp.T(optional=True)
100 tinc = Float.T(optional=True)
102 downsample = Float.T(optional=True)
104 out_path = Path.T(optional=True)
105 out_sds_path = Path.T(optional=True)
106 out_format = OutputFormatChoice.T(optional=True)
107 out_data_type = OutputDataTypeChoice.T(optional=True)
108 out_mseed_record_length = IntChoice.T(
109 optional=True,
110 choices=list(io.mseed.VALID_RECORD_LENGTHS))
111 out_mseed_steim = IntChoice.T(
112 optional=True,
113 choices=[1, 2])
115 parts = List.T(Defer('Converter.T'))
117 @classmethod
118 def add_arguments(cls, p):
119 p.add_squirrel_query_arguments(without=['time', 'codes'])
121 p.add_argument(
122 '--downsample',
123 dest='downsample',
124 type=float,
125 metavar='RATE',
126 help='Downsample to RATE [Hz].')
128 p.add_argument(
129 '--out-path',
130 dest='out_path',
131 metavar='TEMPLATE',
132 help='Set output path to TEMPLATE. Available placeholders '
133 'are %%n: network, %%s: station, %%l: location, %%c: '
134 'channel, %%b: begin time, %%e: end time, %%j: julian day of '
135 'year. The following additional placeholders use the window '
136 'begin and end times rather than trace begin and end times '
137 '(to suppress producing many small files for gappy traces), '
138 '%%(wmin_year)s, %%(wmin_month)s, %%(wmin_day)s, %%(wmin)s, '
139 '%%(wmin_jday)s, %%(wmax_year)s, %%(wmax_month)s, '
140 '%%(wmax_day)s, %%(wmax)s, %%(wmax_jday)s. '
141 'Example: --output=\'data/%%s/trace-%%s-%%c.mseed\'')
143 p.add_argument(
144 '--out-sds-path',
145 dest='out_sds_path',
146 metavar='PATH',
147 help='Set output path to create SDS (https://www.seiscomp.de'
148 '/seiscomp3/doc/applications/slarchive/SDS.html), rooted at '
149 'the given path. Implies --tinc=86400. '
150 'Example: --output-sds-path=data/sds')
152 p.add_argument(
153 '--out-format',
154 dest='out_format',
155 choices=io.allowed_formats('save'),
156 metavar='FORMAT',
157 help='Set output file format. Choices: %s' % io.allowed_formats(
158 'save', 'cli_help', 'mseed'))
160 p.add_argument(
161 '--out-data-type',
162 dest='out_data_type',
163 choices=OutputDataTypeChoice.choices,
164 metavar='DTYPE',
165 help='Set numerical data type. Choices: %s. The output file '
166 'format must support the given type. By default, the data '
167 'type is kept unchanged.' % ', '.join(
168 OutputDataTypeChoice.choices))
170 p.add_argument(
171 '--out-mseed-record-length',
172 dest='out_mseed_record_length',
173 type=int,
174 choices=io.mseed.VALID_RECORD_LENGTHS,
175 metavar='INT',
176 help='Set the mseed record length in bytes. Choices: %s. '
177 'Default is 4096 bytes, which is commonly used for archiving.'
178 % ', '.join(str(b) for b in io.mseed.VALID_RECORD_LENGTHS))
180 p.add_argument(
181 '--out-mseed-steim',
182 dest='out_mseed_steim',
183 type=int,
184 choices=(1, 2),
185 metavar='INT',
186 help='Set the mseed STEIM compression. Choices: 1 or 2. '
187 'Default is STEIM-2, which can compress full range int32. '
188 'Note: STEIM-2 is limited to 30 bit dynamic range.')
190 @classmethod
191 def from_arguments(cls, args):
192 kwargs = args.squirrel_query
194 obj = cls(
195 downsample=args.downsample,
196 out_format=args.out_format,
197 out_path=args.out_path,
198 out_sds_path=args.out_sds_path,
199 out_data_type=args.out_data_type,
200 out_mseed_record_length=args.out_mseed_record_length,
201 out_mseed_steim=args.out_mseed_steim,
202 **kwargs)
204 obj.validate()
205 return obj
207 def add_dataset(self, sq):
208 if self.in_dataset is not None:
209 sq.add_dataset(self.in_dataset)
211 if self.in_path is not None:
212 ds = Dataset(sources=[LocalData(paths=[self.in_path])])
213 ds.set_basepath_from(self)
214 sq.add_dataset(ds)
216 if self.in_paths:
217 ds = Dataset(sources=[LocalData(paths=self.in_paths)])
218 ds.set_basepath_from(self)
219 sq.add_dataset(ds)
221 def get_effective_rename_rules(self, chain):
222 d = {}
223 for k in ['network', 'station', 'location', 'channel']:
224 v = chain.dget('rename', k)
225 if isinstance(v, str):
226 m = re.match(r'/([^/]+)/([^/]*)/', v)
227 if m:
228 try:
229 v = (re.compile(m.group(1)), m.group(2))
230 except Exception:
231 raise JackseisError(
232 'Invalid replacement pattern: /%s/' % m.group(1))
234 d[k] = v
236 return d
238 def get_effective_out_path(self):
239 nset = sum(x is not None for x in (
240 self.out_path,
241 self.out_sds_path))
243 if nset > 1:
244 raise JackseisError(
245 'More than one out of [out_path, out_sds_path] set.')
247 is_sds = False
248 if self.out_path:
249 out_path = self.out_path
251 elif self.out_sds_path:
252 out_path = op.join(
253 self.out_sds_path,
254 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D'
255 '/%(network)s.%(station)s.%(location)s.%(channel)s.D'
256 '.%(wmin_year)s.%(wmin_jday)s')
257 is_sds = True
258 else:
259 out_path = None
261 if out_path is not None:
262 return self.expand_path(out_path), is_sds
263 else:
264 return None
266 def do_rename(self, rules, tr):
267 rename = {}
268 for k in ['network', 'station', 'location', 'channel']:
269 v = rules.get(k, None)
270 if isinstance(v, str):
271 rename[k] = v
272 elif isinstance(v, dict):
273 try:
274 oldval = getattr(tr, k)
275 rename[k] = v[oldval]
276 except KeyError:
277 raise ToolError(
278 'No mapping defined for %s code "%s".' % (k, oldval))
280 elif isinstance(v, tuple):
281 pat, repl = v
282 oldval = getattr(tr, k)
283 newval, n = pat.subn(repl, oldval)
284 if n:
285 rename[k] = newval
287 tr.set_codes(**rename)
289 def convert(self, args, chain=None):
290 if chain is None:
291 defaults = clone(g_defaults)
292 defaults.set_basepath_from(self)
293 chain = Chain(defaults)
295 chain = Chain(self, chain)
297 if self.parts:
298 task = make_task('Jackseis parts')
299 for part in task(self.parts):
300 part.convert(args, chain)
302 del task
304 else:
305 sq = args.make_squirrel()
307 cli_overrides = Converter.from_arguments(args)
308 cli_overrides.set_basepath('.')
310 chain = Chain(cli_overrides, chain)
312 chain.mcall('add_dataset', sq)
314 tmin = chain.get('tmin')
315 tmax = chain.get('tmax')
316 tinc = chain.get('tinc')
317 codes = chain.get('codes')
318 downsample = chain.get('downsample')
319 out_path, is_sds = chain.fcall('get_effective_out_path') \
320 or (None, False)
322 if is_sds and tinc != 86400.:
323 logger.warning('Setting time window to 1 day to generate SDS.')
324 tinc = 86400.0
326 out_format = chain.get('out_format')
327 out_data_type = chain.get('out_data_type')
329 target_deltat = None
330 if downsample is not None:
331 target_deltat = 1.0 / float(downsample)
333 save_kwargs = {}
334 if out_format == 'mseed':
335 save_kwargs['record_length'] = chain.get(
336 'out_mseed_record_length')
337 save_kwargs['steim'] = chain.get(
338 'out_mseed_steim')
340 tpad = 0.0
341 if target_deltat is not None:
342 tpad += target_deltat * 50.
344 task = None
345 rename_rules = self.get_effective_rename_rules(chain)
346 for batch in sq.chopper_waveforms(
347 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc,
348 codes=codes,
349 snap_window=True):
351 if task is None:
352 task = make_task('Jackseis blocks', batch.n)
354 task.update(
355 batch.i,
356 '%s - %s' % (
357 util.time_to_str(batch.tmin),
358 util.time_to_str(batch.tmax)))
360 twmin = batch.tmin
361 twmax = batch.tmax
363 traces = batch.traces
365 if target_deltat is not None:
366 out_traces = []
367 for tr in traces:
368 try:
369 tr.downsample_to(
370 target_deltat, snap=True, demean=False)
372 out_traces.append(tr)
374 except (trace.TraceTooShort, trace.NoData):
375 pass
377 traces = out_traces
379 for tr in traces:
380 self.do_rename(rename_rules, tr)
382 if out_data_type:
383 for tr in traces:
384 tr.ydata = tr.ydata.astype(
385 OutputDataTypeChoice.name_to_dtype[out_data_type])
387 chopped_traces = []
388 for tr in traces:
389 try:
390 otr = tr.chop(twmin, twmax, inplace=False)
391 chopped_traces.append(otr)
392 except trace.NoData:
393 pass
395 traces = chopped_traces
397 if out_path is not None:
398 try:
399 io.save(
400 traces, out_path,
401 format=out_format,
402 overwrite=args.force,
403 additional=dict(
404 wmin_year=tts(twmin, format='%Y'),
405 wmin_month=tts(twmin, format='%m'),
406 wmin_day=tts(twmin, format='%d'),
407 wmin_jday=tts(twmin, format='%j'),
408 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'),
409 wmax_year=tts(twmax, format='%Y'),
410 wmax_month=tts(twmax, format='%m'),
411 wmax_day=tts(twmax, format='%d'),
412 wmax_jday=tts(twmax, format='%j'),
413 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')),
414 **save_kwargs)
416 except io.FileSaveError as e:
417 raise JackseisError(str(e))
419 else:
420 for tr in traces:
421 print(tr.summary)
423 if task:
424 task.done()
427g_defaults = Converter(
428 out_mseed_record_length=4096,
429 out_format='mseed',
430 out_mseed_steim=2)
433def make_subparser(subparsers):
434 return subparsers.add_parser(
435 'jackseis',
436 help='Convert waveform archive data.')
439def setup(parser):
440 parser.add_squirrel_selection_arguments()
442 parser.add_argument(
443 '--config',
444 dest='config_path',
445 metavar='NAME',
446 help='File containing `jackseis2.Converter` settings.')
448 parser.add_argument(
449 '--force',
450 dest='force',
451 action='store_true',
452 default=False,
453 help='Force overwriting of existing files.')
455 Converter.add_arguments(parser)
458def run(parser, args):
459 if args.config_path:
460 try:
461 converters = load_all(filename=args.config_path)
462 except Exception as e:
463 raise ToolError(str(e))
465 for converter in converters:
466 if not isinstance(converter, Converter):
467 raise ToolError(
468 'Config file should only contain '
469 '`jackseis.Converter` objects.')
471 converter.set_basepath(op.dirname(args.config_path))
473 else:
474 converter = Converter()
475 converter.set_basepath('.')
476 converters = [converter]
478 with progress.view():
479 task = make_task('Jackseis jobs')
480 for converter in task(converters):
481 converter.convert(args)