1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import re 

7import logging 

8import os.path as op 

9 

10import numpy as num 

11 

12from pyrocko import io, trace, util 

13from pyrocko.progress import progress 

14from pyrocko.has_paths import Path, HasPaths 

15from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \ 

16 StringChoice, IntChoice, Defer, load_all, clone 

17from pyrocko.squirrel.dataset import Dataset 

18from pyrocko.squirrel.client.local import LocalData 

19from pyrocko.squirrel.error import ToolError 

20from pyrocko.squirrel.model import CodesNSLCE 

21from pyrocko.squirrel.operators.base import NetworkGrouping, StationGrouping, \ 

22 ChannelGrouping, SensorGrouping 

23 

24tts = util.time_to_str 

25 

26guts_prefix = 'jackseis' 

27logger = logging.getLogger('psq.cli.jackseis') 

28 

29 

30def make_task(*args): 

31 return progress.task(*args, logger=logger) 

32 

33 

34class JackseisError(ToolError): 

35 pass 

36 

37 

38class Chain(object): 

39 def __init__(self, node, parent=None): 

40 self.node = node 

41 self.parent = parent 

42 

43 def mcall(self, name, *args, **kwargs): 

44 ret = [] 

45 if self.parent is not None: 

46 ret.append(self.parent.mcall(name, *args, **kwargs)) 

47 

48 ret.append(getattr(self.node, name)(*args, **kwargs)) 

49 return ret 

50 

51 def fcall(self, name, *args, **kwargs): 

52 v = getattr(self.node, name)(*args, **kwargs) 

53 if v is None and self.parent is not None: 

54 return self.parent.fcall(name, *args, **kwargs) 

55 else: 

56 return v 

57 

58 def get(self, name): 

59 v = getattr(self.node, name) 

60 if v is None and self.parent is not None: 

61 return self.parent.get(name) 

62 else: 

63 return v 

64 

65 def dget(self, name, k): 

66 v = getattr(self.node, name).get(k, None) 

67 if v is None and self.parent is not None: 

68 return self.parent.dget(name, k) 

69 else: 

70 return v 

71 

72 

73class OutputFormatChoice(StringChoice): 

74 choices = io.allowed_formats('save') 

75 

76 

77class OutputDataTypeChoice(StringChoice): 

78 choices = ['int32', 'int64', 'float32', 'float64'] 

79 name_to_dtype = { 

80 'int32': num.int32, 

81 'int64': num.int64, 

82 'float32': num.float32, 

83 'float64': num.float64} 

84 

85 

86class TraversalChoice(StringChoice): 

87 choices = ['network', 'station', 'channel', 'sensor'] 

88 name_to_grouping = { 

89 'network': NetworkGrouping(), 

90 'station': StationGrouping(), 

91 'sensor': SensorGrouping(), 

92 'channel': ChannelGrouping()} 

93 

94 

95class Converter(HasPaths): 

96 

97 in_dataset = Dataset.T(optional=True) 

98 in_path = String.T(optional=True) 

99 in_paths = List.T(String.T(optional=True)) 

100 

101 codes = List.T(CodesNSLCE.T(), optional=True) 

102 

103 rename = Dict.T( 

104 String.T(), 

105 Choice.T([ 

106 String.T(), 

107 Dict.T(String.T(), String.T())])) 

108 tmin = Timestamp.T(optional=True) 

109 tmax = Timestamp.T(optional=True) 

110 tinc = Float.T(optional=True) 

111 

112 downsample = Float.T(optional=True) 

113 

114 out_path = Path.T(optional=True) 

115 out_sds_path = Path.T(optional=True) 

116 out_format = OutputFormatChoice.T(optional=True) 

117 out_data_type = OutputDataTypeChoice.T(optional=True) 

118 out_mseed_record_length = IntChoice.T( 

119 optional=True, 

120 choices=list(io.mseed.VALID_RECORD_LENGTHS)) 

121 out_mseed_steim = IntChoice.T( 

122 optional=True, 

123 choices=[1, 2]) 

124 out_meta_path = Path.T(optional=True) 

125 

126 traversal = TraversalChoice.T(optional=True) 

127 

128 parts = List.T(Defer('Converter.T')) 

129 

130 @classmethod 

131 def add_arguments(cls, p): 

132 p.add_squirrel_query_arguments(without=['time']) 

133 

134 p.add_argument( 

135 '--tinc', 

136 dest='tinc', 

137 type=float, 

138 metavar='SECONDS', 

139 help='Set time length of output files [s].') 

140 

141 p.add_argument( 

142 '--downsample', 

143 dest='downsample', 

144 type=float, 

145 metavar='RATE', 

146 help='Downsample to RATE [Hz].') 

147 

148 p.add_argument( 

149 '--out-path', 

150 dest='out_path', 

151 metavar='TEMPLATE', 

152 help='Set output path to TEMPLATE. Available placeholders ' 

153 'are %%n: network, %%s: station, %%l: location, %%c: ' 

154 'channel, %%b: begin time, %%e: end time, %%j: julian day of ' 

155 'year. The following additional placeholders use the window ' 

156 'begin and end times rather than trace begin and end times ' 

157 '(to suppress producing many small files for gappy traces), ' 

158 '%%(wmin_year)s, %%(wmin_month)s, %%(wmin_day)s, %%(wmin)s, ' 

159 '%%(wmin_jday)s, %%(wmax_year)s, %%(wmax_month)s, ' 

160 '%%(wmax_day)s, %%(wmax)s, %%(wmax_jday)s. ' 

161 'Example: --out-path=\'data/%%s/trace-%%s-%%c.mseed\'') 

162 

163 p.add_argument( 

164 '--out-sds-path', 

165 dest='out_sds_path', 

166 metavar='PATH', 

167 help='Set output path to create SDS (https://www.seiscomp.de' 

168 '/seiscomp3/doc/applications/slarchive/SDS.html), rooted at ' 

169 'the given path. Implies --tinc=86400. ' 

170 'Example: --out-sds-path=data/sds') 

171 

172 p.add_argument( 

173 '--out-format', 

174 dest='out_format', 

175 choices=io.allowed_formats('save'), 

176 metavar='FORMAT', 

177 help='Set output file format. Choices: %s' % io.allowed_formats( 

178 'save', 'cli_help', 'mseed')) 

179 

180 p.add_argument( 

181 '--out-data-type', 

182 dest='out_data_type', 

183 choices=OutputDataTypeChoice.choices, 

184 metavar='DTYPE', 

185 help='Set numerical data type. Choices: %s. The output file ' 

186 'format must support the given type. By default, the data ' 

187 'type is kept unchanged.' % ', '.join( 

188 OutputDataTypeChoice.choices)) 

189 

190 p.add_argument( 

191 '--out-mseed-record-length', 

192 dest='out_mseed_record_length', 

193 type=int, 

194 choices=io.mseed.VALID_RECORD_LENGTHS, 

195 metavar='INT', 

196 help='Set the mseed record length in bytes. Choices: %s. ' 

197 'Default is 4096 bytes, which is commonly used for archiving.' 

198 % ', '.join(str(b) for b in io.mseed.VALID_RECORD_LENGTHS)) 

199 

200 p.add_argument( 

201 '--out-mseed-steim', 

202 dest='out_mseed_steim', 

203 type=int, 

204 choices=(1, 2), 

205 metavar='INT', 

206 help='Set the mseed STEIM compression. Choices: 1 or 2. ' 

207 'Default is STEIM-2, which can compress full range int32. ' 

208 'Note: STEIM-2 is limited to 30 bit dynamic range.') 

209 

210 p.add_argument( 

211 '--out-meta-path', 

212 dest='out_meta_path', 

213 metavar='PATH', 

214 help='Set output path for station metadata (StationXML) export.') 

215 

216 p.add_argument( 

217 '--traversal', 

218 dest='traversal', 

219 metavar='GROUPING', 

220 choices=TraversalChoice.choices, 

221 help='By default the outermost processing loop is over time. ' 

222 'Add outer loop with given GROUPING. Choices: %s' 

223 % ', '.join(TraversalChoice.choices)) 

224 

225 @classmethod 

226 def from_arguments(cls, args): 

227 kwargs = args.squirrel_query 

228 

229 obj = cls( 

230 downsample=args.downsample, 

231 out_format=args.out_format, 

232 out_path=args.out_path, 

233 tinc=args.tinc, 

234 out_sds_path=args.out_sds_path, 

235 out_data_type=args.out_data_type, 

236 out_mseed_record_length=args.out_mseed_record_length, 

237 out_mseed_steim=args.out_mseed_steim, 

238 out_meta_path=args.out_meta_path, 

239 traversal=args.traversal, 

240 **kwargs) 

241 

242 obj.validate() 

243 return obj 

244 

245 def add_dataset(self, sq): 

246 if self.in_dataset is not None: 

247 sq.add_dataset(self.in_dataset) 

248 

249 if self.in_path is not None: 

250 ds = Dataset(sources=[LocalData(paths=[self.in_path])]) 

251 ds.set_basepath_from(self) 

252 sq.add_dataset(ds) 

253 

254 if self.in_paths: 

255 ds = Dataset(sources=[LocalData(paths=self.in_paths)]) 

256 ds.set_basepath_from(self) 

257 sq.add_dataset(ds) 

258 

259 def get_effective_rename_rules(self, chain): 

260 d = {} 

261 for k in ['network', 'station', 'location', 'channel']: 

262 v = chain.dget('rename', k) 

263 if isinstance(v, str): 

264 m = re.match(r'/([^/]+)/([^/]*)/', v) 

265 if m: 

266 try: 

267 v = (re.compile(m.group(1)), m.group(2)) 

268 except Exception: 

269 raise JackseisError( 

270 'Invalid replacement pattern: /%s/' % m.group(1)) 

271 

272 d[k] = v 

273 

274 return d 

275 

276 def get_effective_out_path(self): 

277 nset = sum(x is not None for x in ( 

278 self.out_path, 

279 self.out_sds_path)) 

280 

281 if nset > 1: 

282 raise JackseisError( 

283 'More than one out of [out_path, out_sds_path] set.') 

284 

285 is_sds = False 

286 if self.out_path: 

287 out_path = self.out_path 

288 

289 elif self.out_sds_path: 

290 out_path = op.join( 

291 self.out_sds_path, 

292 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D' 

293 '/%(network)s.%(station)s.%(location)s.%(channel)s.D' 

294 '.%(wmin_year)s.%(wmin_jday)s') 

295 is_sds = True 

296 else: 

297 out_path = None 

298 

299 if out_path is not None: 

300 return self.expand_path(out_path), is_sds 

301 else: 

302 return None 

303 

304 def get_effective_out_meta_path(self): 

305 if self.out_meta_path is not None: 

306 return self.expand_path(self.out_meta_path) 

307 else: 

308 return None 

309 

310 def do_rename(self, rules, tr): 

311 rename = {} 

312 for k in ['network', 'station', 'location', 'channel']: 

313 v = rules.get(k, None) 

314 if isinstance(v, str): 

315 rename[k] = v 

316 elif isinstance(v, dict): 

317 try: 

318 oldval = getattr(tr, k) 

319 rename[k] = v[oldval] 

320 except KeyError: 

321 raise ToolError( 

322 'No mapping defined for %s code "%s".' % (k, oldval)) 

323 

324 elif isinstance(v, tuple): 

325 pat, repl = v 

326 oldval = getattr(tr, k) 

327 newval, n = pat.subn(repl, oldval) 

328 if n: 

329 rename[k] = newval 

330 

331 tr.set_codes(**rename) 

332 

333 def convert(self, args, chain=None): 

334 if chain is None: 

335 defaults = clone(g_defaults) 

336 defaults.set_basepath_from(self) 

337 chain = Chain(defaults) 

338 

339 chain = Chain(self, chain) 

340 

341 if self.parts: 

342 task = make_task('Jackseis parts') 

343 for part in task(self.parts): 

344 part.convert(args, chain) 

345 

346 del task 

347 

348 else: 

349 sq = args.make_squirrel() 

350 

351 cli_overrides = Converter.from_arguments(args) 

352 cli_overrides.set_basepath('.') 

353 

354 chain = Chain(cli_overrides, chain) 

355 

356 chain.mcall('add_dataset', sq) 

357 

358 tmin = chain.get('tmin') 

359 tmax = chain.get('tmax') 

360 tinc = chain.get('tinc') 

361 codes = chain.get('codes') 

362 downsample = chain.get('downsample') 

363 out_path, is_sds = chain.fcall('get_effective_out_path') \ 

364 or (None, False) 

365 

366 if is_sds and tinc != 86400.: 

367 logger.warning('Setting time window to 1 day to generate SDS.') 

368 tinc = 86400.0 

369 

370 out_format = chain.get('out_format') 

371 out_data_type = chain.get('out_data_type') 

372 

373 out_meta_path = chain.fcall('get_effective_out_meta_path') 

374 

375 if out_meta_path is not None: 

376 sx = sq.get_stationxml(codes=codes, tmin=tmin, tmax=tmax) 

377 util.ensuredirs(out_meta_path) 

378 sx.dump_xml(filename=out_meta_path) 

379 if out_path is None: 

380 return 

381 

382 target_deltat = None 

383 if downsample is not None: 

384 target_deltat = 1.0 / float(downsample) 

385 

386 save_kwargs = {} 

387 if out_format == 'mseed': 

388 save_kwargs['record_length'] = chain.get( 

389 'out_mseed_record_length') 

390 save_kwargs['steim'] = chain.get( 

391 'out_mseed_steim') 

392 

393 traversal = chain.get('traversal') 

394 if traversal is not None: 

395 grouping = TraversalChoice.name_to_grouping[traversal] 

396 else: 

397 grouping = None 

398 

399 tpad = 0.0 

400 if target_deltat is not None: 

401 tpad += target_deltat * 50. 

402 

403 task = None 

404 rename_rules = self.get_effective_rename_rules(chain) 

405 for batch in sq.chopper_waveforms( 

406 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc, 

407 codes=codes, 

408 snap_window=True, 

409 grouping=grouping): 

410 

411 if task is None: 

412 task = make_task( 

413 'Jackseis blocks', batch.n * batch.ngroups) 

414 

415 tlabel = '%s%s - %s' % ( 

416 'groups %i / %i: ' % (batch.igroup, batch.ngroups) 

417 if batch.ngroups > 1 else '', 

418 util.time_to_str(batch.tmin), 

419 util.time_to_str(batch.tmax)) 

420 

421 task.update(batch.i + batch.igroup * batch.n, tlabel) 

422 

423 twmin = batch.tmin 

424 twmax = batch.tmax 

425 

426 traces = batch.traces 

427 

428 if target_deltat is not None: 

429 out_traces = [] 

430 for tr in traces: 

431 try: 

432 tr.downsample_to( 

433 target_deltat, snap=True, demean=False) 

434 

435 out_traces.append(tr) 

436 

437 except (trace.TraceTooShort, trace.NoData): 

438 pass 

439 

440 traces = out_traces 

441 

442 for tr in traces: 

443 self.do_rename(rename_rules, tr) 

444 

445 if out_data_type: 

446 for tr in traces: 

447 tr.ydata = tr.ydata.astype( 

448 OutputDataTypeChoice.name_to_dtype[out_data_type]) 

449 

450 chopped_traces = [] 

451 for tr in traces: 

452 try: 

453 otr = tr.chop(twmin, twmax, inplace=False) 

454 chopped_traces.append(otr) 

455 except trace.NoData: 

456 pass 

457 

458 traces = chopped_traces 

459 

460 if out_path is not None: 

461 try: 

462 io.save( 

463 traces, out_path, 

464 format=out_format, 

465 overwrite=args.force, 

466 additional=dict( 

467 wmin_year=tts(twmin, format='%Y'), 

468 wmin_month=tts(twmin, format='%m'), 

469 wmin_day=tts(twmin, format='%d'), 

470 wmin_jday=tts(twmin, format='%j'), 

471 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'), 

472 wmax_year=tts(twmax, format='%Y'), 

473 wmax_month=tts(twmax, format='%m'), 

474 wmax_day=tts(twmax, format='%d'), 

475 wmax_jday=tts(twmax, format='%j'), 

476 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')), 

477 **save_kwargs) 

478 

479 except io.FileSaveError as e: 

480 raise JackseisError(str(e)) 

481 

482 else: 

483 for tr in traces: 

484 print(tr.summary) 

485 

486 if task: 

487 task.done() 

488 

489 

490g_defaults = Converter( 

491 out_mseed_record_length=4096, 

492 out_format='mseed', 

493 out_mseed_steim=2) 

494 

495 

496headline = 'Convert waveform archive data.' 

497 

498 

499def make_subparser(subparsers): 

500 return subparsers.add_parser( 

501 'jackseis', 

502 help=headline, 

503 description=headline) 

504 

505 

506def setup(parser): 

507 parser.add_squirrel_selection_arguments() 

508 

509 parser.add_argument( 

510 '--config', 

511 dest='config_path', 

512 metavar='NAME', 

513 help='File containing `jackseis.Converter` settings.') 

514 

515 parser.add_argument( 

516 '--force', 

517 dest='force', 

518 action='store_true', 

519 default=False, 

520 help='Force overwriting of existing files.') 

521 

522 Converter.add_arguments(parser) 

523 

524 

525def run(parser, args): 

526 if args.config_path: 

527 try: 

528 converters = load_all(filename=args.config_path) 

529 except Exception as e: 

530 raise ToolError(str(e)) 

531 

532 for converter in converters: 

533 if not isinstance(converter, Converter): 

534 raise ToolError( 

535 'Config file should only contain ' 

536 '`jackseis.Converter` objects.') 

537 

538 converter.set_basepath(op.dirname(args.config_path)) 

539 

540 else: 

541 converter = Converter() 

542 converter.set_basepath('.') 

543 converters = [converter] 

544 

545 with progress.view(): 

546 task = make_task('Jackseis jobs') 

547 for converter in task(converters): 

548 converter.convert(args)