1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import re
9import logging
10import os.path as op
12import numpy as num
14from pyrocko import io, trace, util
15from pyrocko.progress import progress
16from pyrocko.has_paths import Path, HasPaths
17from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \
18 StringChoice, IntChoice, Defer, load_all, clone
19from pyrocko.squirrel.dataset import Dataset
20from pyrocko.squirrel.client.local import LocalData
21from pyrocko.squirrel.tool import common
22from pyrocko.squirrel.error import ToolError
24tts = util.time_to_str
26guts_prefix = 'jackseis'
27logger = logging.getLogger('psq.cli.jackseis')
30def make_task(*args):
31 return progress.task(*args, logger=logger)
34class JackseisError(ToolError):
35 pass
38class Chain(object):
39 def __init__(self, node, parent=None):
40 self.node = node
41 self.parent = parent
43 def mcall(self, name, *args, **kwargs):
44 ret = []
45 if self.parent is not None:
46 ret.append(self.parent.mcall(name, *args, **kwargs))
48 ret.append(getattr(self.node, name)(*args, **kwargs))
49 return ret
51 def fcall(self, name, *args, **kwargs):
52 v = getattr(self.node, name)(*args, **kwargs)
53 if v is None and self.parent is not None:
54 return self.parent.fcall(name, *args, **kwargs)
55 else:
56 return v
58 def get(self, name):
59 v = getattr(self.node, name)
60 if v is None and self.parent is not None:
61 return self.parent.get(name)
62 else:
63 return v
65 def dget(self, name, k):
66 v = getattr(self.node, name).get(k, None)
67 if v is None and self.parent is not None:
68 return self.parent.dget(name, k)
69 else:
70 return v
73class OutputFormatChoice(StringChoice):
74 choices = io.allowed_formats('save')
77class OutputDataTypeChoice(StringChoice):
78 choices = ['int32', 'int64', 'float32', 'float64']
79 name_to_dtype = {
80 'int32': num.int32,
81 'int64': num.int64,
82 'float32': num.float32,
83 'float64': num.float64}
86class Converter(HasPaths):
88 in_dataset = Dataset.T(optional=True)
89 in_path = String.T(optional=True)
90 in_paths = List.T(String.T(optional=True))
92 rename = Dict.T(
93 String.T(),
94 Choice.T([
95 String.T(),
96 Dict.T(String.T(), String.T())]))
97 tmin = Timestamp.T(optional=True)
98 tmax = Timestamp.T(optional=True)
99 tinc = Float.T(optional=True)
101 downsample = Float.T(optional=True)
103 out_path = Path.T(optional=True)
104 out_sds_path = Path.T(optional=True)
105 out_format = OutputFormatChoice.T(optional=True)
106 out_data_type = OutputDataTypeChoice.T(optional=True)
107 out_mseed_record_length = IntChoice.T(
108 optional=True,
109 choices=list(io.mseed.VALID_RECORD_LENGTHS))
110 out_mseed_steim = IntChoice.T(
111 optional=True,
112 choices=[1, 2])
114 parts = List.T(Defer('Converter.T'))
116 @classmethod
117 def add_arguments(cls, p):
118 common.add_query_arguments(p, without=['time', 'codes'])
120 p.add_argument(
121 '--downsample',
122 dest='downsample',
123 type=float,
124 metavar='RATE',
125 help='Downsample to RATE [Hz].')
127 p.add_argument(
128 '--out-path',
129 dest='out_path',
130 metavar='TEMPLATE',
131 help='Set output path to TEMPLATE. Available placeholders '
132 'are %%n: network, %%s: station, %%l: location, %%c: '
133 'channel, %%b: begin time, %%e: end time, %%j: julian day of '
134 'year. The following additional placeholders use the window '
135 'begin and end times rather than trace begin and end times '
136 '(to suppress producing many small files for gappy traces), '
137 '%%(wmin_year)s, %%(wmin_month)s, %%(wmin_day)s, %%(wmin)s, '
138 '%%(wmin_jday)s, %%(wmax_year)s, %%(wmax_month)s, '
139 '%%(wmax_day)s, %%(wmax)s, %%(wmax_jday)s. '
140 'Example: --output=\'data/%%s/trace-%%s-%%c.mseed\'')
142 p.add_argument(
143 '--out-sds-path',
144 dest='out_sds_path',
145 metavar='PATH',
146 help='Set output path to create SDS (https://www.seiscomp.de'
147 '/seiscomp3/doc/applications/slarchive/SDS.html), rooted at '
148 'the given path. Implies --tinc=86400. '
149 'Example: --output-sds-path=data/sds')
151 p.add_argument(
152 '--out-format',
153 dest='out_format',
154 choices=io.allowed_formats('save'),
155 metavar='FORMAT',
156 help='Set output file format. Choices: %s' % io.allowed_formats(
157 'save', 'cli_help', 'mseed'))
159 p.add_argument(
160 '--out-data-type',
161 dest='out_data_type',
162 choices=OutputDataTypeChoice.choices,
163 metavar='DTYPE',
164 help='Set numerical data type. Choices: %s. The output file '
165 'format must support the given type. By default, the data '
166 'type is kept unchanged.' % ', '.join(
167 OutputDataTypeChoice.choices))
169 p.add_argument(
170 '--out-mseed-record-length',
171 dest='out_mseed_record_length',
172 type=int,
173 choices=io.mseed.VALID_RECORD_LENGTHS,
174 metavar='INT',
175 help='Set the mseed record length in bytes. Choices: %s. '
176 'Default is 4096 bytes, which is commonly used for archiving.'
177 % ', '.join(str(b) for b in io.mseed.VALID_RECORD_LENGTHS))
179 p.add_argument(
180 '--out-mseed-steim',
181 dest='out_mseed_steim',
182 type=int,
183 choices=(1, 2),
184 metavar='INT',
185 help='Set the mseed STEIM compression. Choices: 1 or 2. '
186 'Default is STEIM-2, which can compress full range int32. '
187 'Note: STEIM-2 is limited to 30 bit dynamic range.')
189 @classmethod
190 def from_arguments(cls, args):
191 kwargs = common.squirrel_query_from_arguments(args)
193 obj = cls(
194 downsample=args.downsample,
195 out_format=args.out_format,
196 out_path=args.out_path,
197 out_sds_path=args.out_sds_path,
198 out_data_type=args.out_data_type,
199 out_mseed_record_length=args.out_mseed_record_length,
200 out_mseed_steim=args.out_mseed_steim,
201 **kwargs)
203 obj.validate()
204 return obj
206 def add_dataset(self, sq):
207 if self.in_dataset is not None:
208 sq.add_dataset(self.in_dataset)
210 if self.in_path is not None:
211 ds = Dataset(sources=[LocalData(paths=[self.in_path])])
212 ds.set_basepath_from(self)
213 sq.add_dataset(ds)
215 if self.in_paths:
216 ds = Dataset(sources=[LocalData(paths=self.in_paths)])
217 ds.set_basepath_from(self)
218 sq.add_dataset(ds)
220 def get_effective_rename_rules(self, chain):
221 d = {}
222 for k in ['network', 'station', 'location', 'channel']:
223 v = chain.dget('rename', k)
224 if isinstance(v, str):
225 m = re.match(r'/([^/]+)/([^/]*)/', v)
226 if m:
227 try:
228 v = (re.compile(m.group(1)), m.group(2))
229 except Exception:
230 raise JackseisError(
231 'Invalid replacement pattern: /%s/' % m.group(1))
233 d[k] = v
235 return d
237 def get_effective_out_path(self):
238 nset = sum(x is not None for x in (
239 self.out_path,
240 self.out_sds_path))
242 if nset > 1:
243 raise JackseisError(
244 'More than one out of [out_path, out_sds_path] set.')
246 is_sds = False
247 if self.out_path:
248 out_path = self.out_path
250 elif self.out_sds_path:
251 out_path = op.join(
252 self.out_sds_path,
253 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D'
254 '/%(network)s.%(station)s.%(location)s.%(channel)s.D'
255 '.%(wmin_year)s.%(wmin_jday)s')
256 is_sds = True
257 else:
258 out_path = None
260 if out_path is not None:
261 return self.expand_path(out_path), is_sds
262 else:
263 return None
265 def do_rename(self, rules, tr):
266 rename = {}
267 for k in ['network', 'station', 'location', 'channel']:
268 v = rules.get(k, None)
269 if isinstance(v, str):
270 rename[k] = v
271 elif isinstance(v, dict):
272 try:
273 oldval = getattr(tr, k)
274 rename[k] = v[oldval]
275 except KeyError:
276 raise ToolError(
277 'No mapping defined for %s code "%s".' % (k, oldval))
279 elif isinstance(v, tuple):
280 pat, repl = v
281 oldval = getattr(tr, k)
282 newval, n = pat.subn(repl, oldval)
283 if n:
284 rename[k] = newval
286 tr.set_codes(**rename)
288 def convert(self, args, chain=None):
289 if chain is None:
290 defaults = clone(g_defaults)
291 defaults.set_basepath_from(self)
292 chain = Chain(defaults)
294 chain = Chain(self, chain)
296 if self.parts:
297 task = make_task('Jackseis parts')
298 for part in task(self.parts):
299 part.convert(args, chain)
301 del task
303 else:
304 sq = common.squirrel_from_selection_arguments(args)
306 cli_overrides = Converter.from_arguments(args)
307 cli_overrides.set_basepath('.')
309 chain = Chain(cli_overrides, chain)
311 chain.mcall('add_dataset', sq)
313 tmin = chain.get('tmin')
314 tmax = chain.get('tmax')
315 tinc = chain.get('tinc')
316 downsample = chain.get('downsample')
317 out_path, is_sds = chain.fcall('get_effective_out_path') \
318 or (None, False)
320 if is_sds and tinc != 86400.:
321 logger.warning('Setting time window to 1 day to generate SDS.')
322 tinc = 86400.0
324 out_format = chain.get('out_format')
325 out_data_type = chain.get('out_data_type')
327 target_deltat = None
328 if downsample is not None:
329 target_deltat = 1.0 / float(downsample)
331 save_kwargs = {}
332 if out_format == 'mseed':
333 save_kwargs['record_length'] = chain.get(
334 'out_mseed_record_length')
335 save_kwargs['steim'] = chain.get(
336 'out_mseed_steim')
338 tpad = 0.0
339 if target_deltat is not None:
340 tpad += target_deltat * 50.
342 task = None
343 rename_rules = self.get_effective_rename_rules(chain)
344 for batch in sq.chopper_waveforms(
345 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc,
346 snap_window=True):
348 if task is None:
349 task = make_task('Jackseis blocks', batch.n)
351 task.update(
352 batch.i,
353 '%s - %s' % (
354 util.time_to_str(batch.tmin),
355 util.time_to_str(batch.tmax)))
357 twmin = batch.tmin
358 twmax = batch.tmax
360 traces = batch.traces
362 if target_deltat is not None:
363 out_traces = []
364 for tr in traces:
365 try:
366 tr.downsample_to(
367 target_deltat, snap=True, demean=False)
369 out_traces.append(tr)
371 except (trace.TraceTooShort, trace.NoData):
372 pass
374 traces = out_traces
376 for tr in traces:
377 self.do_rename(rename_rules, tr)
379 if out_data_type:
380 for tr in traces:
381 tr.ydata = tr.ydata.astype(
382 OutputDataTypeChoice.name_to_dtype[out_data_type])
384 chopped_traces = []
385 for tr in traces:
386 try:
387 otr = tr.chop(twmin, twmax, inplace=False)
388 chopped_traces.append(otr)
389 except trace.NoData:
390 pass
392 traces = chopped_traces
394 if out_path is not None:
395 try:
396 io.save(
397 traces, out_path,
398 format=out_format,
399 overwrite=args.force,
400 additional=dict(
401 wmin_year=tts(twmin, format='%Y'),
402 wmin_month=tts(twmin, format='%m'),
403 wmin_day=tts(twmin, format='%d'),
404 wmin_jday=tts(twmin, format='%j'),
405 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'),
406 wmax_year=tts(twmax, format='%Y'),
407 wmax_month=tts(twmax, format='%m'),
408 wmax_day=tts(twmax, format='%d'),
409 wmax_jday=tts(twmax, format='%j'),
410 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')),
411 **save_kwargs)
413 except io.FileSaveError as e:
414 raise JackseisError(str(e))
416 else:
417 for tr in traces:
418 print(tr.summary)
420 if task:
421 task.done()
424g_defaults = Converter(
425 out_mseed_record_length=4096,
426 out_format='mseed',
427 out_mseed_steim=2)
430def setup(subparsers):
431 p = common.add_parser(
432 subparsers, 'jackseis',
433 help='Convert waveform archive data.')
435 common.add_selection_arguments(p)
437 p.add_argument(
438 '--config',
439 dest='config_path',
440 metavar='NAME',
441 help='File containing `jackseis2.Converter` settings.')
443 p.add_argument(
444 '--force',
445 dest='force',
446 action='store_true',
447 default=False,
448 help='Force overwriting of existing files.')
450 Converter.add_arguments(p)
452 return p
455def call(parser, args):
456 if args.config_path:
457 try:
458 converters = load_all(filename=args.config_path)
459 except Exception as e:
460 raise ToolError(str(e))
462 for converter in converters:
463 if not isinstance(converter, Converter):
464 raise ToolError(
465 'Config file should only contain '
466 '`jackseis.Converter` objects.')
468 converter.set_basepath(op.dirname(args.config_path))
470 else:
471 converter = Converter()
472 converter.set_basepath('.')
473 converters = [converter]
475 with progress.view():
476 task = make_task('Jackseis jobs')
477 for converter in task(converters):
478 converter.convert(args)