1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import re 

7import logging 

8import os.path as op 

9 

10import numpy as num 

11 

12from pyrocko import io, trace, util 

13from pyrocko.progress import progress 

14from pyrocko.has_paths import Path, HasPaths 

15from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \ 

16 StringChoice, IntChoice, Defer, load_all, clone 

17from pyrocko.squirrel.dataset import Dataset 

18from pyrocko.squirrel.client.local import LocalData 

19from pyrocko.squirrel.error import ToolError 

20from pyrocko.squirrel.model import CodesNSLCE 

21from pyrocko.squirrel.operators.base import NetworkGrouping, StationGrouping, \ 

22 ChannelGrouping, SensorGrouping 

23 

24tts = util.time_to_str 

25 

26guts_prefix = 'jackseis' 

27logger = logging.getLogger('psq.cli.jackseis') 

28 

29 

30def make_task(*args): 

31 return progress.task(*args, logger=logger) 

32 

33 

34def parse_rename_rule_from_string(s): 

35 s = s.strip() 

36 if re.match(r'^([^:,]*:[^:,]*,?)+', s): 

37 return dict( 

38 x.split(':') for x in s.strip(',').split(',')) 

39 else: 

40 return s 

41 

42 

43class JackseisError(ToolError): 

44 pass 

45 

46 

47class Chain(object): 

48 def __init__(self, node, parent=None): 

49 self.node = node 

50 self.parent = parent 

51 

52 def mcall(self, name, *args, **kwargs): 

53 ret = [] 

54 if self.parent is not None: 

55 ret.append(self.parent.mcall(name, *args, **kwargs)) 

56 

57 ret.append(getattr(self.node, name)(*args, **kwargs)) 

58 return ret 

59 

60 def fcall(self, name, *args, **kwargs): 

61 v = getattr(self.node, name)(*args, **kwargs) 

62 if v is None and self.parent is not None: 

63 return self.parent.fcall(name, *args, **kwargs) 

64 else: 

65 return v 

66 

67 def get(self, name): 

68 v = getattr(self.node, name) 

69 if v is None and self.parent is not None: 

70 return self.parent.get(name) 

71 else: 

72 return v 

73 

74 def dget(self, name, k): 

75 v = getattr(self.node, name).get(k, None) 

76 if v is None and self.parent is not None: 

77 return self.parent.dget(name, k) 

78 else: 

79 return v 

80 

81 

82class OutputFormatChoice(StringChoice): 

83 choices = io.allowed_formats('save') 

84 

85 

86class OutputDataTypeChoice(StringChoice): 

87 choices = ['int32', 'int64', 'float32', 'float64'] 

88 name_to_dtype = { 

89 'int32': num.int32, 

90 'int64': num.int64, 

91 'float32': num.float32, 

92 'float64': num.float64} 

93 

94 

95class TraversalChoice(StringChoice): 

96 choices = ['network', 'station', 'channel', 'sensor'] 

97 name_to_grouping = { 

98 'network': NetworkGrouping(), 

99 'station': StationGrouping(), 

100 'sensor': SensorGrouping(), 

101 'channel': ChannelGrouping()} 

102 

103 

104class Converter(HasPaths): 

105 

106 in_dataset = Dataset.T(optional=True) 

107 in_path = String.T(optional=True) 

108 in_paths = List.T(String.T(optional=True)) 

109 

110 codes = List.T(CodesNSLCE.T(), optional=True) 

111 

112 rename = Dict.T( 

113 String.T(), 

114 Choice.T([ 

115 String.T(), 

116 Dict.T(String.T(), String.T())])) 

117 tmin = Timestamp.T(optional=True) 

118 tmax = Timestamp.T(optional=True) 

119 tinc = Float.T(optional=True) 

120 

121 downsample = Float.T(optional=True) 

122 

123 out_path = Path.T(optional=True) 

124 out_sds_path = Path.T(optional=True) 

125 out_format = OutputFormatChoice.T(optional=True) 

126 out_data_type = OutputDataTypeChoice.T(optional=True) 

127 out_mseed_record_length = IntChoice.T( 

128 optional=True, 

129 choices=list(io.mseed.VALID_RECORD_LENGTHS)) 

130 out_mseed_steim = IntChoice.T( 

131 optional=True, 

132 choices=[1, 2]) 

133 out_meta_path = Path.T(optional=True) 

134 

135 traversal = TraversalChoice.T(optional=True) 

136 

137 parts = List.T(Defer('Converter.T')) 

138 

139 @classmethod 

140 def add_arguments(cls, p): 

141 p.add_squirrel_query_arguments(without=['time']) 

142 

143 p.add_argument( 

144 '--tinc', 

145 dest='tinc', 

146 type=float, 

147 metavar='SECONDS', 

148 help='Set time length of output files [s].') 

149 

150 p.add_argument( 

151 '--downsample', 

152 dest='downsample', 

153 type=float, 

154 metavar='RATE', 

155 help='Downsample to RATE [Hz].') 

156 

157 p.add_argument( 

158 '--out-path', 

159 dest='out_path', 

160 metavar='TEMPLATE', 

161 help='Set output path to TEMPLATE. Available placeholders ' 

162 'are %%n: network, %%s: station, %%l: location, %%c: ' 

163 'channel, %%b: begin time, %%e: end time, %%j: julian day of ' 

164 'year. The following additional placeholders use the window ' 

165 'begin and end times rather than trace begin and end times ' 

166 '(to suppress producing many small files for gappy traces), ' 

167 '%%(wmin_year)s, %%(wmin_month)s, %%(wmin_day)s, %%(wmin)s, ' 

168 '%%(wmin_jday)s, %%(wmax_year)s, %%(wmax_month)s, ' 

169 '%%(wmax_day)s, %%(wmax)s, %%(wmax_jday)s. ' 

170 "Example: --out-path='data/%%s/trace-%%s-%%c.mseed'") 

171 

172 p.add_argument( 

173 '--out-sds-path', 

174 dest='out_sds_path', 

175 metavar='PATH', 

176 help='Set output path to create SDS (https://www.seiscomp.de' 

177 '/seiscomp3/doc/applications/slarchive/SDS.html), rooted at ' 

178 'the given path. Implies --tinc=86400. ' 

179 'Example: --out-sds-path=data/sds') 

180 

181 p.add_argument( 

182 '--out-format', 

183 dest='out_format', 

184 choices=io.allowed_formats('save'), 

185 metavar='FORMAT', 

186 help='Set output file format. Choices: %s' % io.allowed_formats( 

187 'save', 'cli_help', 'mseed')) 

188 

189 p.add_argument( 

190 '--out-data-type', 

191 dest='out_data_type', 

192 choices=OutputDataTypeChoice.choices, 

193 metavar='DTYPE', 

194 help='Set numerical data type. Choices: %s. The output file ' 

195 'format must support the given type. By default, the data ' 

196 'type is kept unchanged.' % ', '.join( 

197 OutputDataTypeChoice.choices)) 

198 

199 p.add_argument( 

200 '--out-mseed-record-length', 

201 dest='out_mseed_record_length', 

202 type=int, 

203 choices=io.mseed.VALID_RECORD_LENGTHS, 

204 metavar='INT', 

205 help='Set the mseed record length in bytes. Choices: %s. ' 

206 'Default is 4096 bytes, which is commonly used for archiving.' 

207 % ', '.join(str(b) for b in io.mseed.VALID_RECORD_LENGTHS)) 

208 

209 p.add_argument( 

210 '--out-mseed-steim', 

211 dest='out_mseed_steim', 

212 type=int, 

213 choices=(1, 2), 

214 metavar='INT', 

215 help='Set the mseed STEIM compression. Choices: 1 or 2. ' 

216 'Default is STEIM-2, which can compress full range int32. ' 

217 'Note: STEIM-2 is limited to 30 bit dynamic range.') 

218 

219 p.add_argument( 

220 '--out-meta-path', 

221 dest='out_meta_path', 

222 metavar='PATH', 

223 help='Set output path for station metadata (StationXML) export.') 

224 

225 p.add_argument( 

226 '--traversal', 

227 dest='traversal', 

228 metavar='GROUPING', 

229 choices=TraversalChoice.choices, 

230 help='By default the outermost processing loop is over time. ' 

231 'Add outer loop with given GROUPING. Choices: %s' 

232 % ', '.join(TraversalChoice.choices)) 

233 

234 p.add_argument( 

235 '--rename-network', 

236 dest='rename_network', 

237 metavar='REPLACEMENT', 

238 help=""" 

239Replace network code. REPLACEMENT can be a string for direct replacement, a 

240mapping for selective replacement, or a regular expression for tricky 

241replacements. Examples: Direct replacement: ```XX``` - set all network codes to 

242```XX```. Mapping: ```AA:XX,BB:YY``` - replace ```AA``` with ```XX``` and 

243```BB``` with ```YY```. Regular expression: ```/A(\\d)/X\\1/``` - replace 

244```A1``` with ```X1``` and ```A2``` with ```X2``` etc. 

245""".strip()) 

246 

247 p.add_argument( 

248 '--rename-station', 

249 dest='rename_station', 

250 metavar='REPLACEMENT', 

251 help='Replace station code. See ``--rename-network``.') 

252 

253 p.add_argument( 

254 '--rename-location', 

255 dest='rename_location', 

256 metavar='REPLACEMENT', 

257 help='Replace location code. See ``--rename-network``.') 

258 

259 p.add_argument( 

260 '--rename-channel', 

261 dest='rename_channel', 

262 metavar='REPLACEMENT', 

263 help='Replace channel code. See ``--rename-network``.') 

264 

265 p.add_argument( 

266 '--rename-extra', 

267 dest='rename_extra', 

268 metavar='REPLACEMENT', 

269 help='Replace extra code. See ``--rename-network``. Note: the ' 

270 '```extra``` code is not available in Mini-SEED.') 

271 

272 @classmethod 

273 def from_arguments(cls, args): 

274 kwargs = args.squirrel_query 

275 

276 rename = {} 

277 for (k, v) in [ 

278 ('network', args.rename_network), 

279 ('station', args.rename_station), 

280 ('location', args.rename_location), 

281 ('channel', args.rename_channel), 

282 ('extra', args.rename_extra)]: 

283 

284 if v is not None: 

285 rename[k] = parse_rename_rule_from_string(v) 

286 

287 obj = cls( 

288 downsample=args.downsample, 

289 out_format=args.out_format, 

290 out_path=args.out_path, 

291 tinc=args.tinc, 

292 out_sds_path=args.out_sds_path, 

293 out_data_type=args.out_data_type, 

294 out_mseed_record_length=args.out_mseed_record_length, 

295 out_mseed_steim=args.out_mseed_steim, 

296 out_meta_path=args.out_meta_path, 

297 traversal=args.traversal, 

298 rename=rename, 

299 **kwargs) 

300 

301 obj.validate() 

302 return obj 

303 

304 def add_dataset(self, sq): 

305 if self.in_dataset is not None: 

306 sq.add_dataset(self.in_dataset) 

307 

308 if self.in_path is not None: 

309 ds = Dataset(sources=[LocalData(paths=[self.in_path])]) 

310 ds.set_basepath_from(self) 

311 sq.add_dataset(ds) 

312 

313 if self.in_paths: 

314 ds = Dataset(sources=[LocalData(paths=self.in_paths)]) 

315 ds.set_basepath_from(self) 

316 sq.add_dataset(ds) 

317 

318 def get_effective_rename_rules(self, chain): 

319 d = {} 

320 for k in ['network', 'station', 'location', 'channel']: 

321 v = chain.dget('rename', k) 

322 if isinstance(v, str): 

323 m = re.match(r'/([^/]+)/([^/]*)/', v) 

324 if m: 

325 try: 

326 v = (re.compile(m.group(1)), m.group(2)) 

327 except Exception: 

328 raise JackseisError( 

329 'Invalid replacement pattern: /%s/' % m.group(1)) 

330 

331 d[k] = v 

332 

333 return d 

334 

335 def get_effective_out_path(self): 

336 nset = sum(x is not None for x in ( 

337 self.out_path, 

338 self.out_sds_path)) 

339 

340 if nset > 1: 

341 raise JackseisError( 

342 'More than one out of [out_path, out_sds_path] set.') 

343 

344 is_sds = False 

345 if self.out_path: 

346 out_path = self.out_path 

347 

348 elif self.out_sds_path: 

349 out_path = op.join( 

350 self.out_sds_path, 

351 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D' 

352 '/%(network)s.%(station)s.%(location)s.%(channel)s.D' 

353 '.%(wmin_year)s.%(wmin_jday)s') 

354 is_sds = True 

355 else: 

356 out_path = None 

357 

358 if out_path is not None: 

359 return self.expand_path(out_path), is_sds 

360 else: 

361 return None 

362 

363 def get_effective_out_meta_path(self): 

364 if self.out_meta_path is not None: 

365 return self.expand_path(self.out_meta_path) 

366 else: 

367 return None 

368 

369 def do_rename(self, rules, tr): 

370 rename = {} 

371 for k in ['network', 'station', 'location', 'channel']: 

372 v = rules.get(k, None) 

373 if isinstance(v, str): 

374 rename[k] = v 

375 elif isinstance(v, dict): 

376 try: 

377 oldval = getattr(tr, k) 

378 rename[k] = v[oldval] 

379 except KeyError: 

380 raise ToolError( 

381 'No mapping defined for %s code "%s".' % (k, oldval)) 

382 

383 elif isinstance(v, tuple): 

384 pat, repl = v 

385 oldval = getattr(tr, k) 

386 newval, n = pat.subn(repl, oldval) 

387 if n: 

388 rename[k] = newval 

389 

390 tr.set_codes(**rename) 

391 

392 def convert(self, args, chain=None): 

393 if chain is None: 

394 defaults = clone(g_defaults) 

395 defaults.set_basepath_from(self) 

396 chain = Chain(defaults) 

397 

398 chain = Chain(self, chain) 

399 

400 if self.parts: 

401 task = make_task('Jackseis parts') 

402 for part in task(self.parts): 

403 part.convert(args, chain) 

404 

405 del task 

406 

407 else: 

408 sq = args.make_squirrel() 

409 

410 cli_overrides = Converter.from_arguments(args) 

411 cli_overrides.set_basepath('.') 

412 

413 chain = Chain(cli_overrides, chain) 

414 

415 chain.mcall('add_dataset', sq) 

416 

417 tmin = chain.get('tmin') 

418 tmax = chain.get('tmax') 

419 tinc = chain.get('tinc') 

420 codes = chain.get('codes') 

421 downsample = chain.get('downsample') 

422 out_path, is_sds = chain.fcall('get_effective_out_path') \ 

423 or (None, False) 

424 

425 if is_sds and tinc != 86400.: 

426 logger.warning('Setting time window to 1 day to generate SDS.') 

427 tinc = 86400.0 

428 

429 out_format = chain.get('out_format') 

430 out_data_type = chain.get('out_data_type') 

431 

432 out_meta_path = chain.fcall('get_effective_out_meta_path') 

433 

434 if out_meta_path is not None: 

435 sx = sq.get_stationxml(codes=codes, tmin=tmin, tmax=tmax) 

436 util.ensuredirs(out_meta_path) 

437 sx.dump_xml(filename=out_meta_path) 

438 if out_path is None: 

439 return 

440 

441 target_deltat = None 

442 if downsample is not None: 

443 target_deltat = 1.0 / float(downsample) 

444 

445 save_kwargs = {} 

446 if out_format == 'mseed': 

447 save_kwargs['record_length'] = chain.get( 

448 'out_mseed_record_length') 

449 save_kwargs['steim'] = chain.get( 

450 'out_mseed_steim') 

451 

452 traversal = chain.get('traversal') 

453 if traversal is not None: 

454 grouping = TraversalChoice.name_to_grouping[traversal] 

455 else: 

456 grouping = None 

457 

458 tpad = 0.0 

459 if target_deltat is not None: 

460 tpad += target_deltat * 50. 

461 

462 task = None 

463 rename_rules = self.get_effective_rename_rules(chain) 

464 for batch in sq.chopper_waveforms( 

465 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc, 

466 codes=codes, 

467 snap_window=True, 

468 grouping=grouping): 

469 

470 if task is None: 

471 task = make_task( 

472 'Jackseis blocks', batch.n * batch.ngroups) 

473 

474 tlabel = '%s%s - %s' % ( 

475 'groups %i / %i: ' % (batch.igroup, batch.ngroups) 

476 if batch.ngroups > 1 else '', 

477 util.time_to_str(batch.tmin), 

478 util.time_to_str(batch.tmax)) 

479 

480 task.update(batch.i + batch.igroup * batch.n, tlabel) 

481 

482 twmin = batch.tmin 

483 twmax = batch.tmax 

484 

485 traces = batch.traces 

486 

487 if target_deltat is not None: 

488 out_traces = [] 

489 for tr in traces: 

490 try: 

491 tr.downsample_to( 

492 target_deltat, snap=True, demean=False, 

493 allow_upsample_max=4) 

494 

495 out_traces.append(tr) 

496 

497 except (trace.TraceTooShort, trace.NoData): 

498 pass 

499 

500 traces = out_traces 

501 

502 for tr in traces: 

503 self.do_rename(rename_rules, tr) 

504 

505 if out_data_type: 

506 for tr in traces: 

507 tr.ydata = tr.ydata.astype( 

508 OutputDataTypeChoice.name_to_dtype[out_data_type]) 

509 

510 chopped_traces = [] 

511 for tr in traces: 

512 try: 

513 otr = tr.chop(twmin, twmax, inplace=False) 

514 chopped_traces.append(otr) 

515 except trace.NoData: 

516 pass 

517 

518 traces = chopped_traces 

519 

520 if out_path is not None: 

521 try: 

522 io.save( 

523 traces, out_path, 

524 format=out_format, 

525 overwrite=args.force, 

526 additional=dict( 

527 wmin_year=tts(twmin, format='%Y'), 

528 wmin_month=tts(twmin, format='%m'), 

529 wmin_day=tts(twmin, format='%d'), 

530 wmin_jday=tts(twmin, format='%j'), 

531 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'), 

532 wmax_year=tts(twmax, format='%Y'), 

533 wmax_month=tts(twmax, format='%m'), 

534 wmax_day=tts(twmax, format='%d'), 

535 wmax_jday=tts(twmax, format='%j'), 

536 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')), 

537 **save_kwargs) 

538 

539 except io.FileSaveError as e: 

540 raise JackseisError(str(e)) 

541 

542 else: 

543 for tr in traces: 

544 print(tr.summary) 

545 

546 if task: 

547 task.done() 

548 

549 

550g_defaults = Converter( 

551 out_mseed_record_length=4096, 

552 out_format='mseed', 

553 out_mseed_steim=2) 

554 

555 

556headline = 'Convert waveform archive data.' 

557 

558 

559def make_subparser(subparsers): 

560 return subparsers.add_parser( 

561 'jackseis', 

562 help=headline, 

563 description=headline) 

564 

565 

566def setup(parser): 

567 parser.add_squirrel_selection_arguments() 

568 

569 parser.add_argument( 

570 '--config', 

571 dest='config_path', 

572 metavar='NAME', 

573 help='File containing `jackseis.Converter` settings.') 

574 

575 parser.add_argument( 

576 '--force', 

577 dest='force', 

578 action='store_true', 

579 default=False, 

580 help='Force overwriting of existing files.') 

581 

582 Converter.add_arguments(parser) 

583 

584 

585def run(parser, args): 

586 if args.config_path: 

587 try: 

588 converters = load_all(filename=args.config_path) 

589 except Exception as e: 

590 raise ToolError(str(e)) 

591 

592 for converter in converters: 

593 if not isinstance(converter, Converter): 

594 raise ToolError( 

595 'Config file should only contain ' 

596 '`jackseis.Converter` objects.') 

597 

598 converter.set_basepath(op.dirname(args.config_path)) 

599 

600 else: 

601 converter = Converter() 

602 converter.set_basepath('.') 

603 converters = [converter] 

604 

605 with progress.view(): 

606 task = make_task('Jackseis jobs') 

607 for converter in task(converters): 

608 converter.convert(args)