1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6from __future__ import absolute_import, print_function 

7 

8import re 

9import logging 

10import os.path as op 

11 

12import numpy as num 

13 

14from pyrocko import io, trace, util 

15from pyrocko.progress import progress 

16from pyrocko.has_paths import Path, HasPaths 

17from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \ 

18 StringChoice, IntChoice, Defer, load_all, clone 

19from pyrocko.squirrel.dataset import Dataset 

20from pyrocko.squirrel.client.local import LocalData 

21from pyrocko.squirrel.error import ToolError 

22from pyrocko.squirrel.model import CodesNSLCE 

23 

24tts = util.time_to_str 

25 

26guts_prefix = 'jackseis' 

27logger = logging.getLogger('psq.cli.jackseis') 

28 

29 

30def make_task(*args): 

31 return progress.task(*args, logger=logger) 

32 

33 

34class JackseisError(ToolError): 

35 pass 

36 

37 

38class Chain(object): 

39 def __init__(self, node, parent=None): 

40 self.node = node 

41 self.parent = parent 

42 

43 def mcall(self, name, *args, **kwargs): 

44 ret = [] 

45 if self.parent is not None: 

46 ret.append(self.parent.mcall(name, *args, **kwargs)) 

47 

48 ret.append(getattr(self.node, name)(*args, **kwargs)) 

49 return ret 

50 

51 def fcall(self, name, *args, **kwargs): 

52 v = getattr(self.node, name)(*args, **kwargs) 

53 if v is None and self.parent is not None: 

54 return self.parent.fcall(name, *args, **kwargs) 

55 else: 

56 return v 

57 

58 def get(self, name): 

59 v = getattr(self.node, name) 

60 if v is None and self.parent is not None: 

61 return self.parent.get(name) 

62 else: 

63 return v 

64 

65 def dget(self, name, k): 

66 v = getattr(self.node, name).get(k, None) 

67 if v is None and self.parent is not None: 

68 return self.parent.dget(name, k) 

69 else: 

70 return v 

71 

72 

73class OutputFormatChoice(StringChoice): 

74 choices = io.allowed_formats('save') 

75 

76 

77class OutputDataTypeChoice(StringChoice): 

78 choices = ['int32', 'int64', 'float32', 'float64'] 

79 name_to_dtype = { 

80 'int32': num.int32, 

81 'int64': num.int64, 

82 'float32': num.float32, 

83 'float64': num.float64} 

84 

85 

86class Converter(HasPaths): 

87 

88 in_dataset = Dataset.T(optional=True) 

89 in_path = String.T(optional=True) 

90 in_paths = List.T(String.T(optional=True)) 

91 

92 codes = List.T(CodesNSLCE.T(), optional=True) 

93 

94 rename = Dict.T( 

95 String.T(), 

96 Choice.T([ 

97 String.T(), 

98 Dict.T(String.T(), String.T())])) 

99 tmin = Timestamp.T(optional=True) 

100 tmax = Timestamp.T(optional=True) 

101 tinc = Float.T(optional=True) 

102 

103 downsample = Float.T(optional=True) 

104 

105 out_path = Path.T(optional=True) 

106 out_sds_path = Path.T(optional=True) 

107 out_format = OutputFormatChoice.T(optional=True) 

108 out_data_type = OutputDataTypeChoice.T(optional=True) 

109 out_mseed_record_length = IntChoice.T( 

110 optional=True, 

111 choices=list(io.mseed.VALID_RECORD_LENGTHS)) 

112 out_mseed_steim = IntChoice.T( 

113 optional=True, 

114 choices=[1, 2]) 

115 out_meta_path = Path.T(optional=True) 

116 

117 parts = List.T(Defer('Converter.T')) 

118 

119 @classmethod 

120 def add_arguments(cls, p): 

121 p.add_squirrel_query_arguments(without=['time']) 

122 

123 p.add_argument( 

124 '--downsample', 

125 dest='downsample', 

126 type=float, 

127 metavar='RATE', 

128 help='Downsample to RATE [Hz].') 

129 

130 p.add_argument( 

131 '--out-path', 

132 dest='out_path', 

133 metavar='TEMPLATE', 

134 help='Set output path to TEMPLATE. Available placeholders ' 

135 'are %%n: network, %%s: station, %%l: location, %%c: ' 

136 'channel, %%b: begin time, %%e: end time, %%j: julian day of ' 

137 'year. The following additional placeholders use the window ' 

138 'begin and end times rather than trace begin and end times ' 

139 '(to suppress producing many small files for gappy traces), ' 

140 '%%(wmin_year)s, %%(wmin_month)s, %%(wmin_day)s, %%(wmin)s, ' 

141 '%%(wmin_jday)s, %%(wmax_year)s, %%(wmax_month)s, ' 

142 '%%(wmax_day)s, %%(wmax)s, %%(wmax_jday)s. ' 

143 'Example: --output=\'data/%%s/trace-%%s-%%c.mseed\'') 

144 

145 p.add_argument( 

146 '--out-sds-path', 

147 dest='out_sds_path', 

148 metavar='PATH', 

149 help='Set output path to create SDS (https://www.seiscomp.de' 

150 '/seiscomp3/doc/applications/slarchive/SDS.html), rooted at ' 

151 'the given path. Implies --tinc=86400. ' 

152 'Example: --output-sds-path=data/sds') 

153 

154 p.add_argument( 

155 '--out-format', 

156 dest='out_format', 

157 choices=io.allowed_formats('save'), 

158 metavar='FORMAT', 

159 help='Set output file format. Choices: %s' % io.allowed_formats( 

160 'save', 'cli_help', 'mseed')) 

161 

162 p.add_argument( 

163 '--out-data-type', 

164 dest='out_data_type', 

165 choices=OutputDataTypeChoice.choices, 

166 metavar='DTYPE', 

167 help='Set numerical data type. Choices: %s. The output file ' 

168 'format must support the given type. By default, the data ' 

169 'type is kept unchanged.' % ', '.join( 

170 OutputDataTypeChoice.choices)) 

171 

172 p.add_argument( 

173 '--out-mseed-record-length', 

174 dest='out_mseed_record_length', 

175 type=int, 

176 choices=io.mseed.VALID_RECORD_LENGTHS, 

177 metavar='INT', 

178 help='Set the mseed record length in bytes. Choices: %s. ' 

179 'Default is 4096 bytes, which is commonly used for archiving.' 

180 % ', '.join(str(b) for b in io.mseed.VALID_RECORD_LENGTHS)) 

181 

182 p.add_argument( 

183 '--out-mseed-steim', 

184 dest='out_mseed_steim', 

185 type=int, 

186 choices=(1, 2), 

187 metavar='INT', 

188 help='Set the mseed STEIM compression. Choices: 1 or 2. ' 

189 'Default is STEIM-2, which can compress full range int32. ' 

190 'Note: STEIM-2 is limited to 30 bit dynamic range.') 

191 

192 p.add_argument( 

193 '--out-meta-path', 

194 dest='out_meta_path', 

195 metavar='PATH', 

196 help='Set output path for station metadata (StationXML) export.') 

197 

198 @classmethod 

199 def from_arguments(cls, args): 

200 kwargs = args.squirrel_query 

201 

202 obj = cls( 

203 downsample=args.downsample, 

204 out_format=args.out_format, 

205 out_path=args.out_path, 

206 out_sds_path=args.out_sds_path, 

207 out_data_type=args.out_data_type, 

208 out_mseed_record_length=args.out_mseed_record_length, 

209 out_mseed_steim=args.out_mseed_steim, 

210 out_meta_path=args.out_meta_path, 

211 **kwargs) 

212 

213 obj.validate() 

214 return obj 

215 

216 def add_dataset(self, sq): 

217 if self.in_dataset is not None: 

218 sq.add_dataset(self.in_dataset) 

219 

220 if self.in_path is not None: 

221 ds = Dataset(sources=[LocalData(paths=[self.in_path])]) 

222 ds.set_basepath_from(self) 

223 sq.add_dataset(ds) 

224 

225 if self.in_paths: 

226 ds = Dataset(sources=[LocalData(paths=self.in_paths)]) 

227 ds.set_basepath_from(self) 

228 sq.add_dataset(ds) 

229 

230 def get_effective_rename_rules(self, chain): 

231 d = {} 

232 for k in ['network', 'station', 'location', 'channel']: 

233 v = chain.dget('rename', k) 

234 if isinstance(v, str): 

235 m = re.match(r'/([^/]+)/([^/]*)/', v) 

236 if m: 

237 try: 

238 v = (re.compile(m.group(1)), m.group(2)) 

239 except Exception: 

240 raise JackseisError( 

241 'Invalid replacement pattern: /%s/' % m.group(1)) 

242 

243 d[k] = v 

244 

245 return d 

246 

247 def get_effective_out_path(self): 

248 nset = sum(x is not None for x in ( 

249 self.out_path, 

250 self.out_sds_path)) 

251 

252 if nset > 1: 

253 raise JackseisError( 

254 'More than one out of [out_path, out_sds_path] set.') 

255 

256 is_sds = False 

257 if self.out_path: 

258 out_path = self.out_path 

259 

260 elif self.out_sds_path: 

261 out_path = op.join( 

262 self.out_sds_path, 

263 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D' 

264 '/%(network)s.%(station)s.%(location)s.%(channel)s.D' 

265 '.%(wmin_year)s.%(wmin_jday)s') 

266 is_sds = True 

267 else: 

268 out_path = None 

269 

270 if out_path is not None: 

271 return self.expand_path(out_path), is_sds 

272 else: 

273 return None 

274 

275 def get_effective_out_meta_path(self): 

276 if self.out_meta_path is not None: 

277 return self.expand_path(self.out_meta_path) 

278 else: 

279 return None 

280 

281 def do_rename(self, rules, tr): 

282 rename = {} 

283 for k in ['network', 'station', 'location', 'channel']: 

284 v = rules.get(k, None) 

285 if isinstance(v, str): 

286 rename[k] = v 

287 elif isinstance(v, dict): 

288 try: 

289 oldval = getattr(tr, k) 

290 rename[k] = v[oldval] 

291 except KeyError: 

292 raise ToolError( 

293 'No mapping defined for %s code "%s".' % (k, oldval)) 

294 

295 elif isinstance(v, tuple): 

296 pat, repl = v 

297 oldval = getattr(tr, k) 

298 newval, n = pat.subn(repl, oldval) 

299 if n: 

300 rename[k] = newval 

301 

302 tr.set_codes(**rename) 

303 

304 def convert(self, args, chain=None): 

305 if chain is None: 

306 defaults = clone(g_defaults) 

307 defaults.set_basepath_from(self) 

308 chain = Chain(defaults) 

309 

310 chain = Chain(self, chain) 

311 

312 if self.parts: 

313 task = make_task('Jackseis parts') 

314 for part in task(self.parts): 

315 part.convert(args, chain) 

316 

317 del task 

318 

319 else: 

320 sq = args.make_squirrel() 

321 

322 cli_overrides = Converter.from_arguments(args) 

323 cli_overrides.set_basepath('.') 

324 

325 chain = Chain(cli_overrides, chain) 

326 

327 chain.mcall('add_dataset', sq) 

328 

329 tmin = chain.get('tmin') 

330 tmax = chain.get('tmax') 

331 tinc = chain.get('tinc') 

332 codes = chain.get('codes') 

333 downsample = chain.get('downsample') 

334 out_path, is_sds = chain.fcall('get_effective_out_path') \ 

335 or (None, False) 

336 

337 if is_sds and tinc != 86400.: 

338 logger.warning('Setting time window to 1 day to generate SDS.') 

339 tinc = 86400.0 

340 

341 out_format = chain.get('out_format') 

342 out_data_type = chain.get('out_data_type') 

343 

344 out_meta_path = chain.fcall('get_effective_out_meta_path') 

345 

346 if out_meta_path is not None: 

347 sx = sq.get_stationxml(codes=codes, tmin=tmin, tmax=tmax) 

348 util.ensuredirs(out_meta_path) 

349 sx.dump_xml(filename=out_meta_path) 

350 if out_path is None: 

351 return 

352 

353 target_deltat = None 

354 if downsample is not None: 

355 target_deltat = 1.0 / float(downsample) 

356 

357 save_kwargs = {} 

358 if out_format == 'mseed': 

359 save_kwargs['record_length'] = chain.get( 

360 'out_mseed_record_length') 

361 save_kwargs['steim'] = chain.get( 

362 'out_mseed_steim') 

363 

364 tpad = 0.0 

365 if target_deltat is not None: 

366 tpad += target_deltat * 50. 

367 

368 task = None 

369 rename_rules = self.get_effective_rename_rules(chain) 

370 for batch in sq.chopper_waveforms( 

371 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc, 

372 codes=codes, 

373 snap_window=True): 

374 

375 if task is None: 

376 task = make_task('Jackseis blocks', batch.n) 

377 

378 task.update( 

379 batch.i, 

380 '%s - %s' % ( 

381 util.time_to_str(batch.tmin), 

382 util.time_to_str(batch.tmax))) 

383 

384 twmin = batch.tmin 

385 twmax = batch.tmax 

386 

387 traces = batch.traces 

388 

389 if target_deltat is not None: 

390 out_traces = [] 

391 for tr in traces: 

392 try: 

393 tr.downsample_to( 

394 target_deltat, snap=True, demean=False) 

395 

396 out_traces.append(tr) 

397 

398 except (trace.TraceTooShort, trace.NoData): 

399 pass 

400 

401 traces = out_traces 

402 

403 for tr in traces: 

404 self.do_rename(rename_rules, tr) 

405 

406 if out_data_type: 

407 for tr in traces: 

408 tr.ydata = tr.ydata.astype( 

409 OutputDataTypeChoice.name_to_dtype[out_data_type]) 

410 

411 chopped_traces = [] 

412 for tr in traces: 

413 try: 

414 otr = tr.chop(twmin, twmax, inplace=False) 

415 chopped_traces.append(otr) 

416 except trace.NoData: 

417 pass 

418 

419 traces = chopped_traces 

420 

421 if out_path is not None: 

422 try: 

423 io.save( 

424 traces, out_path, 

425 format=out_format, 

426 overwrite=args.force, 

427 additional=dict( 

428 wmin_year=tts(twmin, format='%Y'), 

429 wmin_month=tts(twmin, format='%m'), 

430 wmin_day=tts(twmin, format='%d'), 

431 wmin_jday=tts(twmin, format='%j'), 

432 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'), 

433 wmax_year=tts(twmax, format='%Y'), 

434 wmax_month=tts(twmax, format='%m'), 

435 wmax_day=tts(twmax, format='%d'), 

436 wmax_jday=tts(twmax, format='%j'), 

437 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')), 

438 **save_kwargs) 

439 

440 except io.FileSaveError as e: 

441 raise JackseisError(str(e)) 

442 

443 else: 

444 for tr in traces: 

445 print(tr.summary) 

446 

447 if task: 

448 task.done() 

449 

450 

451g_defaults = Converter( 

452 out_mseed_record_length=4096, 

453 out_format='mseed', 

454 out_mseed_steim=2) 

455 

456 

457headline = 'Convert waveform archive data.' 

458 

459 

460def make_subparser(subparsers): 

461 return subparsers.add_parser( 

462 'jackseis', 

463 help=headline, 

464 description=headline) 

465 

466 

467def setup(parser): 

468 parser.add_squirrel_selection_arguments() 

469 

470 parser.add_argument( 

471 '--config', 

472 dest='config_path', 

473 metavar='NAME', 

474 help='File containing `jackseis.Converter` settings.') 

475 

476 parser.add_argument( 

477 '--force', 

478 dest='force', 

479 action='store_true', 

480 default=False, 

481 help='Force overwriting of existing files.') 

482 

483 Converter.add_arguments(parser) 

484 

485 

486def run(parser, args): 

487 if args.config_path: 

488 try: 

489 converters = load_all(filename=args.config_path) 

490 except Exception as e: 

491 raise ToolError(str(e)) 

492 

493 for converter in converters: 

494 if not isinstance(converter, Converter): 

495 raise ToolError( 

496 'Config file should only contain ' 

497 '`jackseis.Converter` objects.') 

498 

499 converter.set_basepath(op.dirname(args.config_path)) 

500 

501 else: 

502 converter = Converter() 

503 converter.set_basepath('.') 

504 converters = [converter] 

505 

506 with progress.view(): 

507 task = make_task('Jackseis jobs') 

508 for converter in task(converters): 

509 converter.convert(args)