1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6from __future__ import absolute_import, print_function 

7 

8import re 

9import logging 

10import os.path as op 

11 

12import numpy as num 

13 

14from pyrocko import io, trace, util 

15from pyrocko.progress import progress 

16from pyrocko.has_paths import Path, HasPaths 

17from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \ 

18 StringChoice, IntChoice, Defer, load_all, clone 

19from pyrocko.squirrel.dataset import Dataset 

20from pyrocko.squirrel.client.local import LocalData 

21from pyrocko.squirrel.error import ToolError 

22 

23tts = util.time_to_str 

24 

25guts_prefix = 'jackseis' 

26logger = logging.getLogger('psq.cli.jackseis') 

27 

28 

29def make_task(*args): 

30 return progress.task(*args, logger=logger) 

31 

32 

33class JackseisError(ToolError): 

34 pass 

35 

36 

37class Chain(object): 

38 def __init__(self, node, parent=None): 

39 self.node = node 

40 self.parent = parent 

41 

42 def mcall(self, name, *args, **kwargs): 

43 ret = [] 

44 if self.parent is not None: 

45 ret.append(self.parent.mcall(name, *args, **kwargs)) 

46 

47 ret.append(getattr(self.node, name)(*args, **kwargs)) 

48 return ret 

49 

50 def fcall(self, name, *args, **kwargs): 

51 v = getattr(self.node, name)(*args, **kwargs) 

52 if v is None and self.parent is not None: 

53 return self.parent.fcall(name, *args, **kwargs) 

54 else: 

55 return v 

56 

57 def get(self, name): 

58 v = getattr(self.node, name) 

59 if v is None and self.parent is not None: 

60 return self.parent.get(name) 

61 else: 

62 return v 

63 

64 def dget(self, name, k): 

65 v = getattr(self.node, name).get(k, None) 

66 if v is None and self.parent is not None: 

67 return self.parent.dget(name, k) 

68 else: 

69 return v 

70 

71 

72class OutputFormatChoice(StringChoice): 

73 choices = io.allowed_formats('save') 

74 

75 

76class OutputDataTypeChoice(StringChoice): 

77 choices = ['int32', 'int64', 'float32', 'float64'] 

78 name_to_dtype = { 

79 'int32': num.int32, 

80 'int64': num.int64, 

81 'float32': num.float32, 

82 'float64': num.float64} 

83 

84 

85class Converter(HasPaths): 

86 

87 in_dataset = Dataset.T(optional=True) 

88 in_path = String.T(optional=True) 

89 in_paths = List.T(String.T(optional=True)) 

90 

91 codes = String.T(optional=True) 

92 

93 rename = Dict.T( 

94 String.T(), 

95 Choice.T([ 

96 String.T(), 

97 Dict.T(String.T(), String.T())])) 

98 tmin = Timestamp.T(optional=True) 

99 tmax = Timestamp.T(optional=True) 

100 tinc = Float.T(optional=True) 

101 

102 downsample = Float.T(optional=True) 

103 

104 out_path = Path.T(optional=True) 

105 out_sds_path = Path.T(optional=True) 

106 out_format = OutputFormatChoice.T(optional=True) 

107 out_data_type = OutputDataTypeChoice.T(optional=True) 

108 out_mseed_record_length = IntChoice.T( 

109 optional=True, 

110 choices=list(io.mseed.VALID_RECORD_LENGTHS)) 

111 out_mseed_steim = IntChoice.T( 

112 optional=True, 

113 choices=[1, 2]) 

114 

115 parts = List.T(Defer('Converter.T')) 

116 

117 @classmethod 

118 def add_arguments(cls, p): 

119 p.add_squirrel_query_arguments(without=['time', 'codes']) 

120 

121 p.add_argument( 

122 '--downsample', 

123 dest='downsample', 

124 type=float, 

125 metavar='RATE', 

126 help='Downsample to RATE [Hz].') 

127 

128 p.add_argument( 

129 '--out-path', 

130 dest='out_path', 

131 metavar='TEMPLATE', 

132 help='Set output path to TEMPLATE. Available placeholders ' 

133 'are %%n: network, %%s: station, %%l: location, %%c: ' 

134 'channel, %%b: begin time, %%e: end time, %%j: julian day of ' 

135 'year. The following additional placeholders use the window ' 

136 'begin and end times rather than trace begin and end times ' 

137 '(to suppress producing many small files for gappy traces), ' 

138 '%%(wmin_year)s, %%(wmin_month)s, %%(wmin_day)s, %%(wmin)s, ' 

139 '%%(wmin_jday)s, %%(wmax_year)s, %%(wmax_month)s, ' 

140 '%%(wmax_day)s, %%(wmax)s, %%(wmax_jday)s. ' 

141 'Example: --output=\'data/%%s/trace-%%s-%%c.mseed\'') 

142 

143 p.add_argument( 

144 '--out-sds-path', 

145 dest='out_sds_path', 

146 metavar='PATH', 

147 help='Set output path to create SDS (https://www.seiscomp.de' 

148 '/seiscomp3/doc/applications/slarchive/SDS.html), rooted at ' 

149 'the given path. Implies --tinc=86400. ' 

150 'Example: --output-sds-path=data/sds') 

151 

152 p.add_argument( 

153 '--out-format', 

154 dest='out_format', 

155 choices=io.allowed_formats('save'), 

156 metavar='FORMAT', 

157 help='Set output file format. Choices: %s' % io.allowed_formats( 

158 'save', 'cli_help', 'mseed')) 

159 

160 p.add_argument( 

161 '--out-data-type', 

162 dest='out_data_type', 

163 choices=OutputDataTypeChoice.choices, 

164 metavar='DTYPE', 

165 help='Set numerical data type. Choices: %s. The output file ' 

166 'format must support the given type. By default, the data ' 

167 'type is kept unchanged.' % ', '.join( 

168 OutputDataTypeChoice.choices)) 

169 

170 p.add_argument( 

171 '--out-mseed-record-length', 

172 dest='out_mseed_record_length', 

173 type=int, 

174 choices=io.mseed.VALID_RECORD_LENGTHS, 

175 metavar='INT', 

176 help='Set the mseed record length in bytes. Choices: %s. ' 

177 'Default is 4096 bytes, which is commonly used for archiving.' 

178 % ', '.join(str(b) for b in io.mseed.VALID_RECORD_LENGTHS)) 

179 

180 p.add_argument( 

181 '--out-mseed-steim', 

182 dest='out_mseed_steim', 

183 type=int, 

184 choices=(1, 2), 

185 metavar='INT', 

186 help='Set the mseed STEIM compression. Choices: 1 or 2. ' 

187 'Default is STEIM-2, which can compress full range int32. ' 

188 'Note: STEIM-2 is limited to 30 bit dynamic range.') 

189 

190 @classmethod 

191 def from_arguments(cls, args): 

192 kwargs = args.squirrel_query 

193 

194 obj = cls( 

195 downsample=args.downsample, 

196 out_format=args.out_format, 

197 out_path=args.out_path, 

198 out_sds_path=args.out_sds_path, 

199 out_data_type=args.out_data_type, 

200 out_mseed_record_length=args.out_mseed_record_length, 

201 out_mseed_steim=args.out_mseed_steim, 

202 **kwargs) 

203 

204 obj.validate() 

205 return obj 

206 

207 def add_dataset(self, sq): 

208 if self.in_dataset is not None: 

209 sq.add_dataset(self.in_dataset) 

210 

211 if self.in_path is not None: 

212 ds = Dataset(sources=[LocalData(paths=[self.in_path])]) 

213 ds.set_basepath_from(self) 

214 sq.add_dataset(ds) 

215 

216 if self.in_paths: 

217 ds = Dataset(sources=[LocalData(paths=self.in_paths)]) 

218 ds.set_basepath_from(self) 

219 sq.add_dataset(ds) 

220 

221 def get_effective_rename_rules(self, chain): 

222 d = {} 

223 for k in ['network', 'station', 'location', 'channel']: 

224 v = chain.dget('rename', k) 

225 if isinstance(v, str): 

226 m = re.match(r'/([^/]+)/([^/]*)/', v) 

227 if m: 

228 try: 

229 v = (re.compile(m.group(1)), m.group(2)) 

230 except Exception: 

231 raise JackseisError( 

232 'Invalid replacement pattern: /%s/' % m.group(1)) 

233 

234 d[k] = v 

235 

236 return d 

237 

238 def get_effective_out_path(self): 

239 nset = sum(x is not None for x in ( 

240 self.out_path, 

241 self.out_sds_path)) 

242 

243 if nset > 1: 

244 raise JackseisError( 

245 'More than one out of [out_path, out_sds_path] set.') 

246 

247 is_sds = False 

248 if self.out_path: 

249 out_path = self.out_path 

250 

251 elif self.out_sds_path: 

252 out_path = op.join( 

253 self.out_sds_path, 

254 '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D' 

255 '/%(network)s.%(station)s.%(location)s.%(channel)s.D' 

256 '.%(wmin_year)s.%(wmin_jday)s') 

257 is_sds = True 

258 else: 

259 out_path = None 

260 

261 if out_path is not None: 

262 return self.expand_path(out_path), is_sds 

263 else: 

264 return None 

265 

266 def do_rename(self, rules, tr): 

267 rename = {} 

268 for k in ['network', 'station', 'location', 'channel']: 

269 v = rules.get(k, None) 

270 if isinstance(v, str): 

271 rename[k] = v 

272 elif isinstance(v, dict): 

273 try: 

274 oldval = getattr(tr, k) 

275 rename[k] = v[oldval] 

276 except KeyError: 

277 raise ToolError( 

278 'No mapping defined for %s code "%s".' % (k, oldval)) 

279 

280 elif isinstance(v, tuple): 

281 pat, repl = v 

282 oldval = getattr(tr, k) 

283 newval, n = pat.subn(repl, oldval) 

284 if n: 

285 rename[k] = newval 

286 

287 tr.set_codes(**rename) 

288 

289 def convert(self, args, chain=None): 

290 if chain is None: 

291 defaults = clone(g_defaults) 

292 defaults.set_basepath_from(self) 

293 chain = Chain(defaults) 

294 

295 chain = Chain(self, chain) 

296 

297 if self.parts: 

298 task = make_task('Jackseis parts') 

299 for part in task(self.parts): 

300 part.convert(args, chain) 

301 

302 del task 

303 

304 else: 

305 sq = args.make_squirrel() 

306 

307 cli_overrides = Converter.from_arguments(args) 

308 cli_overrides.set_basepath('.') 

309 

310 chain = Chain(cli_overrides, chain) 

311 

312 chain.mcall('add_dataset', sq) 

313 

314 tmin = chain.get('tmin') 

315 tmax = chain.get('tmax') 

316 tinc = chain.get('tinc') 

317 codes = chain.get('codes') 

318 downsample = chain.get('downsample') 

319 out_path, is_sds = chain.fcall('get_effective_out_path') \ 

320 or (None, False) 

321 

322 if is_sds and tinc != 86400.: 

323 logger.warning('Setting time window to 1 day to generate SDS.') 

324 tinc = 86400.0 

325 

326 out_format = chain.get('out_format') 

327 out_data_type = chain.get('out_data_type') 

328 

329 target_deltat = None 

330 if downsample is not None: 

331 target_deltat = 1.0 / float(downsample) 

332 

333 save_kwargs = {} 

334 if out_format == 'mseed': 

335 save_kwargs['record_length'] = chain.get( 

336 'out_mseed_record_length') 

337 save_kwargs['steim'] = chain.get( 

338 'out_mseed_steim') 

339 

340 tpad = 0.0 

341 if target_deltat is not None: 

342 tpad += target_deltat * 50. 

343 

344 task = None 

345 rename_rules = self.get_effective_rename_rules(chain) 

346 for batch in sq.chopper_waveforms( 

347 tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc, 

348 codes=codes, 

349 snap_window=True): 

350 

351 if task is None: 

352 task = make_task('Jackseis blocks', batch.n) 

353 

354 task.update( 

355 batch.i, 

356 '%s - %s' % ( 

357 util.time_to_str(batch.tmin), 

358 util.time_to_str(batch.tmax))) 

359 

360 twmin = batch.tmin 

361 twmax = batch.tmax 

362 

363 traces = batch.traces 

364 

365 if target_deltat is not None: 

366 out_traces = [] 

367 for tr in traces: 

368 try: 

369 tr.downsample_to( 

370 target_deltat, snap=True, demean=False) 

371 

372 out_traces.append(tr) 

373 

374 except (trace.TraceTooShort, trace.NoData): 

375 pass 

376 

377 traces = out_traces 

378 

379 for tr in traces: 

380 self.do_rename(rename_rules, tr) 

381 

382 if out_data_type: 

383 for tr in traces: 

384 tr.ydata = tr.ydata.astype( 

385 OutputDataTypeChoice.name_to_dtype[out_data_type]) 

386 

387 chopped_traces = [] 

388 for tr in traces: 

389 try: 

390 otr = tr.chop(twmin, twmax, inplace=False) 

391 chopped_traces.append(otr) 

392 except trace.NoData: 

393 pass 

394 

395 traces = chopped_traces 

396 

397 if out_path is not None: 

398 try: 

399 io.save( 

400 traces, out_path, 

401 format=out_format, 

402 overwrite=args.force, 

403 additional=dict( 

404 wmin_year=tts(twmin, format='%Y'), 

405 wmin_month=tts(twmin, format='%m'), 

406 wmin_day=tts(twmin, format='%d'), 

407 wmin_jday=tts(twmin, format='%j'), 

408 wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'), 

409 wmax_year=tts(twmax, format='%Y'), 

410 wmax_month=tts(twmax, format='%m'), 

411 wmax_day=tts(twmax, format='%d'), 

412 wmax_jday=tts(twmax, format='%j'), 

413 wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')), 

414 **save_kwargs) 

415 

416 except io.FileSaveError as e: 

417 raise JackseisError(str(e)) 

418 

419 else: 

420 for tr in traces: 

421 print(tr.summary) 

422 

423 if task: 

424 task.done() 

425 

426 

427g_defaults = Converter( 

428 out_mseed_record_length=4096, 

429 out_format='mseed', 

430 out_mseed_steim=2) 

431 

432 

433headline = 'Convert waveform archive data.' 

434 

435 

436def make_subparser(subparsers): 

437 return subparsers.add_parser( 

438 'jackseis', 

439 help=headline, 

440 description=headline) 

441 

442 

443def setup(parser): 

444 parser.add_squirrel_selection_arguments() 

445 

446 parser.add_argument( 

447 '--config', 

448 dest='config_path', 

449 metavar='NAME', 

450 help='File containing `jackseis2.Converter` settings.') 

451 

452 parser.add_argument( 

453 '--force', 

454 dest='force', 

455 action='store_true', 

456 default=False, 

457 help='Force overwriting of existing files.') 

458 

459 Converter.add_arguments(parser) 

460 

461 

462def run(parser, args): 

463 if args.config_path: 

464 try: 

465 converters = load_all(filename=args.config_path) 

466 except Exception as e: 

467 raise ToolError(str(e)) 

468 

469 for converter in converters: 

470 if not isinstance(converter, Converter): 

471 raise ToolError( 

472 'Config file should only contain ' 

473 '`jackseis.Converter` objects.') 

474 

475 converter.set_basepath(op.dirname(args.config_path)) 

476 

477 else: 

478 converter = Converter() 

479 converter.set_basepath('.') 

480 converters = [converter] 

481 

482 with progress.view(): 

483 task = make_task('Jackseis jobs') 

484 for converter in task(converters): 

485 converter.convert(args)