1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6from __future__ import absolute_import, print_function 

7 

8import re 

9import logging 

10import os.path as op 

11 

12import numpy as num 

13 

14from pyrocko import io, trace, util 

15from pyrocko.progress import progress 

16from pyrocko.has_paths import Path, HasPaths 

17from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \ 

18 StringChoice, IntChoice, Defer, load_all, clone 

19from pyrocko.squirrel.dataset import Dataset 

20from pyrocko.squirrel.client.local import LocalData 

21from pyrocko.squirrel.error import ToolError 

22from pyrocko.squirrel.model import CodesNSLCE 

23from pyrocko.squirrel.operators.base import NetworkGrouping, StationGrouping, \ 

24 ChannelGrouping, SensorGrouping 

25 

26tts = util.time_to_str 

27 

28guts_prefix = 'jackseis' 

29logger = logging.getLogger('psq.cli.jackseis') 

30 

31 

32def make_task(*args): 

33 return progress.task(*args, logger=logger) 

34 

35 

36class JackseisError(ToolError): 

37 pass 

38 

39 

40class Chain(object): 

41 def __init__(self, node, parent=None): 

42 self.node = node 

43 self.parent = parent 

44 

45 def mcall(self, name, *args, **kwargs): 

46 ret = [] 

47 if self.parent is not None: 

48 ret.append(self.parent.mcall(name, *args, **kwargs)) 

49 

50 ret.append(getattr(self.node, name)(*args, **kwargs)) 

51 return ret 

52 

53 def fcall(self, name, *args, **kwargs): 

54 v = getattr(self.node, name)(*args, **kwargs) 

55 if v is None and self.parent is not None: 

56 return self.parent.fcall(name, *args, **kwargs) 

57 else: 

58 return v 

59 

60 def get(self, name): 

61 v = getattr(self.node, name) 

62 if v is None and self.parent is not None: 

63 return self.parent.get(name) 

64 else: 

65 return v 

66 

67 def dget(self, name, k): 

68 v = getattr(self.node, name).get(k, None) 

69 if v is None and self.parent is not None: 

70 return self.parent.dget(name, k) 

71 else: 

72 return v 

73 

74 

75class OutputFormatChoice(StringChoice): 

76 choices = io.allowed_formats('save') 

77 

78 

79class OutputDataTypeChoice(StringChoice): 

80 choices = ['int32', 'int64', 'float32', 'float64'] 

81 name_to_dtype = { 

82 'int32': num.int32, 

83 'int64': num.int64, 

84 'float32': num.float32, 

85 'float64': num.float64} 

86 

87 

88class TraversalChoice(StringChoice): 

89 choices = ['network', 'station', 'channel', 'sensor'] 

90 name_to_grouping = { 

91 'network': NetworkGrouping(), 

92 'station': StationGrouping(), 

93 'sensor': SensorGrouping(), 

94 'channel': ChannelGrouping()} 

95 

96 

97class Converter(HasPaths): 

98 

99 in_dataset = Dataset.T(optional=True) 

100 in_path = String.T(optional=True) 

101 in_paths = List.T(String.T(optional=True)) 

102 

103 codes = List.T(CodesNSLCE.T(), optional=True) 

104 

105 rename = Dict.T( 

106 String.T(), 

107 Choice.T([ 

108 String.T(), 

109 Dict.T(String.T(), String.T())])) 

110 tmin = Timestamp.T(optional=True) 

111 tmax = Timestamp.T(optional=True) 

112 tinc = Float.T(optional=True) 

113 

114 downsample = Float.T(optional=True) 

115 

116 out_path = Path.T(optional=True) 

117 out_sds_path = Path.T(optional=True) 

118 out_format = OutputFormatChoice.T(optional=True) 

119 out_data_type = OutputDataTypeChoice.T(optional=True) 

120 out_mseed_record_length = IntChoice.T( 

121 optional=True, 

122 choices=list(io.mseed.VALID_RECORD_LENGTHS)) 

123 out_mseed_steim = IntChoice.T( 

124 optional=True, 

125 choices=[1, 2]) 

126 out_meta_path = Path.T(optional=True) 

127 

128 traversal = TraversalChoice.T(optional=True) 

129 

130 parts = List.T(Defer('Converter.T')) 

131 

132 @classmethod 

133 def add_arguments(cls, p): 

134 p.add_squirrel_query_arguments(without=['time']) 

135 

136 p.add_argument( 

137 '--tinc', 

138 dest='tinc', 

139 type=float, 

140 metavar='SECONDS', 

141 help='Set time length of output files [s].') 

142 

143 p.add_argument( 

144 '--downsample', 

145 dest='downsample', 

146 type=float, 

147 metavar='RATE', 

148 help='Downsample to RATE [Hz].') 

149 

150 p.add_argument( 

151 '--out-path', 

152 dest='out_path', 

153 metavar='TEMPLATE', 

154 help='Set output path to TEMPLATE. Available placeholders ' 

155 'are %%n: network, %%s: station, %%l: location, %%c: ' 

156 'channel, %%b: begin time, %%e: end time, %%j: julian day of ' 

157 'year. The following additional placeholders use the window ' 

158 'begin and end times rather than trace begin and end times ' 

159 '(to suppress producing many small files for gappy traces), ' 

160 '%%(wmin_year)s, %%(wmin_month)s, %%(wmin_day)s, %%(wmin)s, ' 

161 '%%(wmin_jday)s, %%(wmax_year)s, %%(wmax_month)s, ' 

162 '%%(wmax_day)s, %%(wmax)s, %%(wmax_jday)s. ' 

163 'Example: --out-path=\'data/%%s/trace-%%s-%%c.mseed\'') 

164 

165 p.add_argument( 

166 '--out-sds-path', 

167 dest='out_sds_path', 

168 metavar='PATH', 

169 help='Set output path to create SDS (https://www.seiscomp.de' 

170 '/seiscomp3/doc/applications/slarchive/SDS.html), rooted at ' 

171 'the given path. Implies --tinc=86400. ' 

172 'Example: --out-sds-path=data/sds') 

173 

174 p.add_argument( 

175 '--out-format', 

176 dest='out_format', 

177 choices=io.allowed_formats('save'), 

178 metavar='FORMAT', 

179 help='Set output file format. Choices: %s' % io.allowed_formats( 

180 'save', 'cli_help', 'mseed')) 

181 

182 p.add_argument( 

183 '--out-data-type', 

184 dest='out_data_type', 

185 choices=OutputDataTypeChoice.choices, 

186 metavar='DTYPE', 

187 help='Set numerical data type. Choices: %s. The output file ' 

188 'format must support the given type. By default, the data ' 

189 'type is kept unchanged.' % ', '.join( 

190 OutputDataTypeChoice.choices)) 

191 

192 p.add_argument( 

193 '--out-mseed-record-length', 

194 dest='out_mseed_record_length', 

195 type=int, 

196 choices=io.mseed.VALID_RECORD_LENGTHS, 

197 metavar='INT', 

198 help='Set the mseed record length in bytes. Choices: %s. ' 

199 'Default is 4096 bytes, which is commonly used for archiving.' 

200 % ', '.join(str(b) for b in io.mseed.VALID_RECORD_LENGTHS)) 

201 

202 p.add_argument( 

203 '--out-mseed-steim', 

204 dest='out_mseed_steim', 

205 type=int, 

206 choices=(1, 2), 

207 metavar='INT', 

208 help='Set the mseed STEIM compression. Choices: 1 or 2. ' 

209 'Default is STEIM-2, which can compress full range int32. ' 

210 'Note: STEIM-2 is limited to 30 bit dynamic range.') 

211 

212 p.add_argument( 

213 '--out-meta-path', 

214 dest='out_meta_path', 

215 metavar='PATH', 

216 help='Set output path for station metadata (StationXML) export.') 

217 

218 p.add_argument( 

219 '--traversal', 

220 dest='traversal', 

221 metavar='GROUPING', 

222 choices=TraversalChoice.choices, 

223 help='By default the outermost processing loop is over time. ' 

224 'Add outer loop with given GROUPING. Choices: %s' 

225 % ', '.join(TraversalChoice.choices)) 

226 

227 @classmethod 

228 def from_arguments(cls, args): 

229 kwargs = args.squirrel_query 

230 

231 obj = cls( 

232 downsample=args.downsample, 

233 out_format=args.out_format, 

234 out_path=args.out_path, 

235 tinc=args.tinc, 

236 out_sds_path=args.out_sds_path, 

237 out_data_type=args.out_data_type, 

238 out_mseed_record_length=args.out_mseed_record_length, 

239 out_mseed_steim=args.out_mseed_steim, 

240 out_meta_path=args.out_meta_path, 

241 traversal=args.traversal, 

242 **kwargs) 

243 

244 obj.validate() 

245 return obj 

246 

247 def add_dataset(self, sq): 

248 if self.in_dataset is not None: 

249 sq.add_dataset(self.in_dataset) 

250 

251 if self.in_path is not None: 

252 ds = Dataset(sources=[LocalData(paths=[self.in_path])]) 

253 ds.set_basepath_from(self) 

254 sq.add_dataset(ds) 

255 

256 if self.in_paths: 

257 ds = Dataset(sources=[LocalData(paths=self.in_paths)]) 

258 ds.set_basepath_from(self) 

259 sq.add_dataset(ds) 

260 

261 def get_effective_rename_rules(self, chain): 

262 d = {} 

263 for k in ['network', 'station', 'location', 'channel']: 

264 v = chain.dget('rename', k) 

265 if isinstance(v, str): 

266 m = re.match(r'/([^/]+)/([^/]*)/', v) 

267 if m: 

268 try: 

269 v = (re.compile(m.group(1)), m.group(2)) 

270 except Exception: 

271 raise JackseisError( 

272 'Invalid replacement pattern: /%s/' % m.group(1)) 

273 

274 d[k] = v 

275 

276 return d 

277 

278 def get_effective_out_path(self): 

279 nset = sum(x is not None for x in ( 

280 self.out_path, 

281 self.out_sds_path)) 

282 

283 if nset > 1: 

284 raise JackseisError( 

285 'More than one out of [out_path, out_sds_path] set.') 

286 

287 is_sds = False 

288 if self.out_path: 

289 out_path = self.out_path 

290 

291 elif self.out_sds_path: 

292 out_path = op.join( 

293 self.out_sds_path, 

294 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D' 

295 '/%(network)s.%(station)s.%(location)s.%(channel)s.D' 

296 '.%(wmin_year)s.%(wmin_jday)s') 

297 is_sds = True 

298 else: 

299 out_path = None 

300 

301 if out_path is not None: 

302 return self.expand_path(out_path), is_sds 

303 else: 

304 return None 

305 

306 def get_effective_out_meta_path(self): 

307 if self.out_meta_path is not None: 

308 return self.expand_path(self.out_meta_path) 

309 else: 

310 return None 

311 

312 def do_rename(self, rules, tr): 

313 rename = {} 

314 for k in ['network', 'station', 'location', 'channel']: 

315 v = rules.get(k, None) 

316 if isinstance(v, str): 

317 rename[k] = v 

318 elif isinstance(v, dict): 

319 try: 

320 oldval = getattr(tr, k) 

321 rename[k] = v[oldval] 

322 except KeyError: 

323 raise ToolError( 

324 'No mapping defined for %s code "%s".' % (k, oldval)) 

325 

326 elif isinstance(v, tuple): 

327 pat, repl = v 

328 oldval = getattr(tr, k) 

329 newval, n = pat.subn(repl, oldval) 

330 if n: 

331 rename[k] = newval 

332 

333 tr.set_codes(**rename) 

334 

335 def convert(self, args, chain=None): 

336 if chain is None: 

337 defaults = clone(g_defaults) 

338 defaults.set_basepath_from(self) 

339 chain = Chain(defaults) 

340 

341 chain = Chain(self, chain) 

342 

343 if self.parts: 

344 task = make_task('Jackseis parts') 

345 for part in task(self.parts): 

346 part.convert(args, chain) 

347 

348 del task 

349 

350 else: 

351 sq = args.make_squirrel() 

352 

353 cli_overrides = Converter.from_arguments(args) 

354 cli_overrides.set_basepath('.') 

355 

356 chain = Chain(cli_overrides, chain) 

357 

358 chain.mcall('add_dataset', sq) 

359 

360 tmin = chain.get('tmin') 

361 tmax = chain.get('tmax') 

362 tinc = chain.get('tinc') 

363 codes = chain.get('codes') 

364 downsample = chain.get('downsample') 

365 out_path, is_sds = chain.fcall('get_effective_out_path') \ 

366 or (None, False) 

367 

368 if is_sds and tinc != 86400.: 

369 logger.warning('Setting time window to 1 day to generate SDS.') 

370 tinc = 86400.0 

371 

372 out_format = chain.get('out_format') 

373 out_data_type = chain.get('out_data_type') 

374 

375 out_meta_path = chain.fcall('get_effective_out_meta_path') 

376 

377 if out_meta_path is not None: 

378 sx = sq.get_stationxml(codes=codes, tmin=tmin, tmax=tmax) 

379 util.ensuredirs(out_meta_path) 

380 sx.dump_xml(filename=out_meta_path) 

381 if out_path is None: 

382 return 

383 

384 target_deltat = None 

385 if downsample is not None: 

386 target_deltat = 1.0 / float(downsample) 

387 

388 save_kwargs = {} 

389 if out_format == 'mseed': 

390 save_kwargs['record_length'] = chain.get( 

391 'out_mseed_record_length') 

392 save_kwargs['steim'] = chain.get( 

393 'out_mseed_steim') 

394 

395 traversal = chain.get('traversal') 

396 if traversal is not None: 

397 grouping = TraversalChoice.name_to_grouping[traversal] 

398 else: 

399 grouping = None 

400 

401 tpad = 0.0 

402 if target_deltat is not None: 

403 tpad += target_deltat * 50. 

404 

405 task = None 

406 rename_rules = self.get_effective_rename_rules(chain) 

407 for batch in sq.chopper_waveforms( 

408 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc, 

409 codes=codes, 

410 snap_window=True, 

411 grouping=grouping): 

412 

413 if task is None: 

414 task = make_task( 

415 'Jackseis blocks', batch.n * batch.ngroups) 

416 

417 tlabel = '%s%s - %s' % ( 

418 'groups %i / %i: ' % (batch.igroup, batch.ngroups) 

419 if batch.ngroups > 1 else '', 

420 util.time_to_str(batch.tmin), 

421 util.time_to_str(batch.tmax)) 

422 

423 task.update(batch.i + batch.igroup * batch.n, tlabel) 

424 

425 twmin = batch.tmin 

426 twmax = batch.tmax 

427 

428 traces = batch.traces 

429 

430 if target_deltat is not None: 

431 out_traces = [] 

432 for tr in traces: 

433 try: 

434 tr.downsample_to( 

435 target_deltat, snap=True, demean=False) 

436 

437 out_traces.append(tr) 

438 

439 except (trace.TraceTooShort, trace.NoData): 

440 pass 

441 

442 traces = out_traces 

443 

444 for tr in traces: 

445 self.do_rename(rename_rules, tr) 

446 

447 if out_data_type: 

448 for tr in traces: 

449 tr.ydata = tr.ydata.astype( 

450 OutputDataTypeChoice.name_to_dtype[out_data_type]) 

451 

452 chopped_traces = [] 

453 for tr in traces: 

454 try: 

455 otr = tr.chop(twmin, twmax, inplace=False) 

456 chopped_traces.append(otr) 

457 except trace.NoData: 

458 pass 

459 

460 traces = chopped_traces 

461 

462 if out_path is not None: 

463 try: 

464 io.save( 

465 traces, out_path, 

466 format=out_format, 

467 overwrite=args.force, 

468 additional=dict( 

469 wmin_year=tts(twmin, format='%Y'), 

470 wmin_month=tts(twmin, format='%m'), 

471 wmin_day=tts(twmin, format='%d'), 

472 wmin_jday=tts(twmin, format='%j'), 

473 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'), 

474 wmax_year=tts(twmax, format='%Y'), 

475 wmax_month=tts(twmax, format='%m'), 

476 wmax_day=tts(twmax, format='%d'), 

477 wmax_jday=tts(twmax, format='%j'), 

478 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')), 

479 **save_kwargs) 

480 

481 except io.FileSaveError as e: 

482 raise JackseisError(str(e)) 

483 

484 else: 

485 for tr in traces: 

486 print(tr.summary) 

487 

488 if task: 

489 task.done() 

490 

491 

492g_defaults = Converter( 

493 out_mseed_record_length=4096, 

494 out_format='mseed', 

495 out_mseed_steim=2) 

496 

497 

498headline = 'Convert waveform archive data.' 

499 

500 

501def make_subparser(subparsers): 

502 return subparsers.add_parser( 

503 'jackseis', 

504 help=headline, 

505 description=headline) 

506 

507 

508def setup(parser): 

509 parser.add_squirrel_selection_arguments() 

510 

511 parser.add_argument( 

512 '--config', 

513 dest='config_path', 

514 metavar='NAME', 

515 help='File containing `jackseis.Converter` settings.') 

516 

517 parser.add_argument( 

518 '--force', 

519 dest='force', 

520 action='store_true', 

521 default=False, 

522 help='Force overwriting of existing files.') 

523 

524 Converter.add_arguments(parser) 

525 

526 

527def run(parser, args): 

528 if args.config_path: 

529 try: 

530 converters = load_all(filename=args.config_path) 

531 except Exception as e: 

532 raise ToolError(str(e)) 

533 

534 for converter in converters: 

535 if not isinstance(converter, Converter): 

536 raise ToolError( 

537 'Config file should only contain ' 

538 '`jackseis.Converter` objects.') 

539 

540 converter.set_basepath(op.dirname(args.config_path)) 

541 

542 else: 

543 converter = Converter() 

544 converter.set_basepath('.') 

545 converters = [converter] 

546 

547 with progress.view(): 

548 task = make_task('Jackseis jobs') 

549 for converter in task(converters): 

550 converter.convert(args)