Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/base.py: 84%

877 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-07-04 09:37 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel main classes. 

8''' 

9 

10import re 

11import sys 

12import os 

13import time 

14import math 

15import logging 

16import threading 

17import queue 

18from collections import defaultdict 

19 

20from pyrocko.guts import Object, Int, List, Tuple, String, Timestamp, Dict 

21from pyrocko import util, trace 

22from pyrocko import progress 

23from pyrocko.plot import nice_time_tick_inc_approx_secs 

24 

25from . import model, io, cache, dataset 

26 

27from .model import to_kind_id, WaveformOrder, to_kind, to_codes, \ 

28 STATION, CHANNEL, RESPONSE, EVENT, WAVEFORM, codes_patterns_list, \ 

29 codes_patterns_for_kind 

30from .client import fdsn, catalog 

31from .selection import Selection, filldocs 

32from .database import abspath 

33from .operators.base import Operator, CodesPatternFiltering 

34from . import client, environment, error 

35 

36logger = logging.getLogger('psq.base') 

37 

38guts_prefix = 'squirrel' 

39 

40 

41def nonef(f, xs): 

42 xs_ = [x for x in xs if x is not None] 

43 if xs_: 

44 return f(xs_) 

45 else: 

46 return None 

47 

48 

49def make_task(*args): 

50 return progress.task(*args, logger=logger) 

51 

52 

53def lpick(condition, seq): 

54 ft = [], [] 

55 for ele in seq: 

56 ft[int(bool(condition(ele)))].append(ele) 

57 

58 return ft 

59 

60 

61def len_plural(obj): 

62 return len(obj), '' if len(obj) == 1 else 's' 

63 

64 

65def blocks(tmin, tmax, deltat, nsamples_block=100000): 

66 tblock = nice_time_tick_inc_approx_secs( 

67 util.to_time_float(deltat * nsamples_block)) 

68 iblock_min = int(math.floor(tmin / tblock)) 

69 iblock_max = int(math.ceil(tmax / tblock)) 

70 for iblock in range(iblock_min, iblock_max): 

71 yield iblock * tblock, (iblock+1) * tblock 

72 

73 

74def gaps(avail, tmin, tmax): 

75 assert tmin < tmax 

76 

77 data = [(tmax, 1), (tmin, -1)] 

78 for (tmin_a, tmax_a) in avail: 

79 assert tmin_a < tmax_a 

80 data.append((tmin_a, 1)) 

81 data.append((tmax_a, -1)) 

82 

83 data.sort() 

84 s = 1 

85 gaps = [] 

86 tmin_g = None 

87 for t, x in data: 

88 if s == 1 and x == -1: 

89 tmin_g = t 

90 elif s == 0 and x == 1 and tmin_g is not None: 

91 tmax_g = t 

92 if tmin_g != tmax_g: 

93 gaps.append((tmin_g, tmax_g)) 

94 

95 s += x 

96 

97 return gaps 

98 

99 

100def order_key(order): 

101 return (order.codes, order.tmin, order.tmax) 

102 

103 

104def prefix_tree(tups): 

105 if not tups: 

106 return [] 

107 

108 if len(tups[0]) == 1: 

109 return sorted((tup[0], []) for tup in tups) 

110 

111 d = defaultdict(list) 

112 for tup in tups: 

113 d[tup[0]].append(tup[1:]) 

114 

115 sub = [] 

116 for k in sorted(d.keys()): 

117 sub.append((k, prefix_tree(d[k]))) 

118 

119 return sub 

120 

121 

122def match_time_span(tmin, tmax, obj): 

123 return (obj.tmin is None or tmax is None or obj.tmin <= tmax) \ 

124 and (tmin is None or obj.tmax is None or tmin < obj.tmax) 

125 

126 

127class Batch(object): 

128 ''' 

129 Batch of waveforms from window-wise data extraction. 

130 

131 Encapsulates state and results yielded for each window in window-wise 

132 waveform extraction with the :py:meth:`Squirrel.chopper_waveforms` method. 

133 

134 *Attributes:* 

135 

136 .. py:attribute:: tmin 

137 

138 Start of this time window. 

139 

140 .. py:attribute:: tmax 

141 

142 End of this time window. 

143 

144 .. py:attribute:: i 

145 

146 Index of this time window in sequence. 

147 

148 .. py:attribute:: n 

149 

150 Total number of time windows in sequence. 

151 

152 .. py:attribute:: igroup 

153 

154 Index of this time window's sequence group. 

155 

156 .. py:attribute:: ngroups 

157 

158 Total number of sequence groups. 

159 

160 .. py:attribute:: traces 

161 

162 Extracted waveforms for this time window. 

163 ''' 

164 

165 def __init__(self, tmin, tmax, i, n, igroup, ngroups, traces): 

166 self.tmin = tmin 

167 self.tmax = tmax 

168 self.i = i 

169 self.n = n 

170 self.igroup = igroup 

171 self.ngroups = ngroups 

172 self.traces = traces 

173 

174 

175class Squirrel(Selection): 

176 ''' 

177 Prompt, lazy, indexing, caching, dynamic seismological dataset access. 

178 

179 :param env: 

180 Squirrel environment instance or directory path to use as starting 

181 point for its detection. By default, the current directory is used as 

182 starting point. When searching for a usable environment the directory 

183 ``'.squirrel'`` or ``'squirrel'`` in the current (or starting point) 

184 directory is used if it exists, otherwise the parent directories are 

185 search upwards for the existence of such a directory. If no such 

186 directory is found, the user's global Squirrel environment 

187 ``'$HOME/.pyrocko/squirrel'`` is used. 

188 :type env: 

189 :py:class:`~pyrocko.squirrel.environment.Environment` or 

190 :py:class:`str` 

191 

192 :param database: 

193 Database instance or path to database. By default the 

194 database found in the detected Squirrel environment is used. 

195 :type database: 

196 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str` 

197 

198 :param cache_path: 

199 Directory path to use for data caching. By default, the ``'cache'`` 

200 directory in the detected Squirrel environment is used. 

201 :type cache_path: 

202 :py:class:`str` 

203 

204 :param persistent: 

205 If given a name, create a persistent selection. 

206 :type persistent: 

207 :py:class:`str` 

208 

209 This is the central class of the Squirrel framework. It provides a unified 

210 interface to query and access seismic waveforms, station meta-data and 

211 event information from local file collections and remote data sources. For 

212 prompt responses, a profound database setup is used under the hood. To 

213 speed up assemblage of ad-hoc data selections, files are indexed on first 

214 use and the extracted meta-data is remembered in the database for 

215 subsequent accesses. Bulk data is lazily loaded from disk and remote 

216 sources, just when requested. Once loaded, data is cached in memory to 

217 expedite typical access patterns. Files and data sources can be dynamically 

218 added to and removed from the Squirrel selection at runtime. 

219 

220 Queries are restricted to the contents of the files currently added to the 

221 Squirrel selection (usually a subset of the file meta-information 

222 collection in the database). This list of files is referred to here as the 

223 "selection". By default, temporary tables are created in the attached 

224 database to hold the names of the files in the selection as well as various 

225 indices and counters. These tables are only visible inside the application 

226 which created them and are deleted when the database connection is closed 

227 or the application exits. To create a selection which is not deleted at 

228 exit, supply a name to the ``persistent`` argument of the Squirrel 

229 constructor. Persistent selections are shared among applications using the 

230 same database. 

231 

232 **Method summary** 

233 

234 Some of the methods are implemented in :py:class:`Squirrel`'s base class 

235 :py:class:`~pyrocko.squirrel.selection.Selection`. 

236 

237 .. autosummary:: 

238 

239 ~Squirrel.add 

240 ~Squirrel.add_source 

241 ~Squirrel.add_fdsn 

242 ~Squirrel.add_catalog 

243 ~Squirrel.add_dataset 

244 ~Squirrel.add_virtual 

245 ~Squirrel.update 

246 ~Squirrel.update_waveform_promises 

247 ~Squirrel.advance_accessor 

248 ~Squirrel.clear_accessor 

249 ~Squirrel.reload 

250 ~pyrocko.squirrel.selection.Selection.iter_paths 

251 ~Squirrel.iter_nuts 

252 ~Squirrel.iter_kinds 

253 ~Squirrel.iter_deltats 

254 ~Squirrel.iter_codes 

255 ~pyrocko.squirrel.selection.Selection.get_paths 

256 ~Squirrel.get_nuts 

257 ~Squirrel.get_kinds 

258 ~Squirrel.get_deltats 

259 ~Squirrel.get_codes 

260 ~Squirrel.get_counts 

261 ~Squirrel.get_time_span 

262 ~Squirrel.get_deltat_span 

263 ~Squirrel.get_nfiles 

264 ~Squirrel.get_nnuts 

265 ~Squirrel.get_total_size 

266 ~Squirrel.get_stats 

267 ~Squirrel.get_content 

268 ~Squirrel.get_stations 

269 ~Squirrel.get_channels 

270 ~Squirrel.get_responses 

271 ~Squirrel.get_events 

272 ~Squirrel.get_waveform_nuts 

273 ~Squirrel.get_waveforms 

274 ~Squirrel.chopper_waveforms 

275 ~Squirrel.get_coverage 

276 ~Squirrel.pile 

277 ~Squirrel.snuffle 

278 ~Squirrel.glob_codes 

279 ~pyrocko.squirrel.selection.Selection.get_database 

280 ~Squirrel.print_tables 

281 ''' 

282 

283 def __init__( 

284 self, env=None, database=None, cache_path=None, persistent=None): 

285 

286 if not isinstance(env, environment.Environment): 

287 env = environment.get_environment(env) 

288 

289 if database is None: 

290 database = env.expand_path(env.database_path) 

291 

292 if cache_path is None: 

293 cache_path = env.expand_path(env.cache_path) 

294 

295 if persistent is None: 

296 persistent = env.persistent 

297 

298 Selection.__init__( 

299 self, database=database, persistent=persistent) 

300 

301 self.get_database().set_basepath(os.path.dirname(env.get_basepath())) 

302 

303 self._content_caches = { 

304 'waveform': cache.ContentCache(), 

305 'default': cache.ContentCache()} 

306 

307 self._cache_path = cache_path 

308 

309 self._sources = [] 

310 self._operators = [] 

311 self._operator_registry = {} 

312 

313 self._pending_orders = [] 

314 

315 self._pile = None 

316 self._n_choppers_active = 0 

317 

318 self.downloads_enabled = True 

319 

320 self._names.update({ 

321 'nuts': self.name + '_nuts', 

322 'kind_codes_count': self.name + '_kind_codes_count', 

323 'coverage': self.name + '_coverage'}) 

324 

325 with self.transaction('create tables') as cursor: 

326 self._create_tables_squirrel(cursor) 

327 

328 def _create_tables_squirrel(self, cursor): 

329 

330 cursor.execute(self._register_table(self._sql( 

331 ''' 

332 CREATE TABLE IF NOT EXISTS %(db)s.%(nuts)s ( 

333 nut_id integer PRIMARY KEY, 

334 file_id integer, 

335 file_segment integer, 

336 file_element integer, 

337 kind_id integer, 

338 kind_codes_id integer, 

339 tmin_seconds integer, 

340 tmin_offset integer, 

341 tmax_seconds integer, 

342 tmax_offset integer, 

343 kscale integer) 

344 '''))) 

345 

346 cursor.execute(self._register_table(self._sql( 

347 ''' 

348 CREATE TABLE IF NOT EXISTS %(db)s.%(kind_codes_count)s ( 

349 kind_codes_id integer PRIMARY KEY, 

350 count integer) 

351 '''))) 

352 

353 cursor.execute(self._sql( 

354 ''' 

355 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(nuts)s_file_element 

356 ON %(nuts)s (file_id, file_segment, file_element) 

357 ''')) 

358 

359 cursor.execute(self._sql( 

360 ''' 

361 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_file_id 

362 ON %(nuts)s (file_id) 

363 ''')) 

364 

365 cursor.execute(self._sql( 

366 ''' 

367 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmin_seconds 

368 ON %(nuts)s (kind_id, tmin_seconds) 

369 ''')) 

370 

371 cursor.execute(self._sql( 

372 ''' 

373 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmax_seconds 

374 ON %(nuts)s (kind_id, tmax_seconds) 

375 ''')) 

376 

377 cursor.execute(self._sql( 

378 ''' 

379 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_kscale 

380 ON %(nuts)s (kind_id, kscale, tmin_seconds) 

381 ''')) 

382 

383 cursor.execute(self._sql( 

384 ''' 

385 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts 

386 BEFORE DELETE ON main.files FOR EACH ROW 

387 BEGIN 

388 DELETE FROM %(nuts)s WHERE file_id == old.file_id; 

389 END 

390 ''')) 

391 

392 # trigger only on size to make silent update of mtime possible 

393 cursor.execute(self._sql( 

394 ''' 

395 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts2 

396 BEFORE UPDATE OF size ON main.files FOR EACH ROW 

397 BEGIN 

398 DELETE FROM %(nuts)s WHERE file_id == old.file_id; 

399 END 

400 ''')) 

401 

402 cursor.execute(self._sql( 

403 ''' 

404 CREATE TRIGGER IF NOT EXISTS 

405 %(db)s.%(file_states)s_delete_files 

406 BEFORE DELETE ON %(db)s.%(file_states)s FOR EACH ROW 

407 BEGIN 

408 DELETE FROM %(nuts)s WHERE file_id == old.file_id; 

409 END 

410 ''')) 

411 

412 cursor.execute(self._sql( 

413 ''' 

414 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_inc_kind_codes 

415 BEFORE INSERT ON %(nuts)s FOR EACH ROW 

416 BEGIN 

417 INSERT OR IGNORE INTO %(kind_codes_count)s VALUES 

418 (new.kind_codes_id, 0); 

419 UPDATE %(kind_codes_count)s 

420 SET count = count + 1 

421 WHERE new.kind_codes_id 

422 == %(kind_codes_count)s.kind_codes_id; 

423 END 

424 ''')) 

425 

426 cursor.execute(self._sql( 

427 ''' 

428 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_dec_kind_codes 

429 BEFORE DELETE ON %(nuts)s FOR EACH ROW 

430 BEGIN 

431 UPDATE %(kind_codes_count)s 

432 SET count = count - 1 

433 WHERE old.kind_codes_id 

434 == %(kind_codes_count)s.kind_codes_id; 

435 END 

436 ''')) 

437 

438 cursor.execute(self._register_table(self._sql( 

439 ''' 

440 CREATE TABLE IF NOT EXISTS %(db)s.%(coverage)s ( 

441 kind_codes_id integer, 

442 time_seconds integer, 

443 time_offset integer, 

444 step integer) 

445 '''))) 

446 

447 cursor.execute(self._sql( 

448 ''' 

449 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(coverage)s_time 

450 ON %(coverage)s (kind_codes_id, time_seconds, time_offset) 

451 ''')) 

452 

453 cursor.execute(self._sql( 

454 ''' 

455 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_add_coverage 

456 AFTER INSERT ON %(nuts)s FOR EACH ROW 

457 BEGIN 

458 INSERT OR IGNORE INTO %(coverage)s VALUES 

459 (new.kind_codes_id, new.tmin_seconds, new.tmin_offset, 0) 

460 ; 

461 UPDATE %(coverage)s 

462 SET step = step + 1 

463 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id 

464 AND new.tmin_seconds == %(coverage)s.time_seconds 

465 AND new.tmin_offset == %(coverage)s.time_offset 

466 ; 

467 INSERT OR IGNORE INTO %(coverage)s VALUES 

468 (new.kind_codes_id, new.tmax_seconds, new.tmax_offset, 0) 

469 ; 

470 UPDATE %(coverage)s 

471 SET step = step - 1 

472 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id 

473 AND new.tmax_seconds == %(coverage)s.time_seconds 

474 AND new.tmax_offset == %(coverage)s.time_offset 

475 ; 

476 DELETE FROM %(coverage)s 

477 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id 

478 AND new.tmin_seconds == %(coverage)s.time_seconds 

479 AND new.tmin_offset == %(coverage)s.time_offset 

480 AND step == 0 

481 ; 

482 DELETE FROM %(coverage)s 

483 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id 

484 AND new.tmax_seconds == %(coverage)s.time_seconds 

485 AND new.tmax_offset == %(coverage)s.time_offset 

486 AND step == 0 

487 ; 

488 END 

489 ''')) 

490 

491 cursor.execute(self._sql( 

492 ''' 

493 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_remove_coverage 

494 BEFORE DELETE ON %(nuts)s FOR EACH ROW 

495 BEGIN 

496 INSERT OR IGNORE INTO %(coverage)s VALUES 

497 (old.kind_codes_id, old.tmin_seconds, old.tmin_offset, 0) 

498 ; 

499 UPDATE %(coverage)s 

500 SET step = step - 1 

501 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id 

502 AND old.tmin_seconds == %(coverage)s.time_seconds 

503 AND old.tmin_offset == %(coverage)s.time_offset 

504 ; 

505 INSERT OR IGNORE INTO %(coverage)s VALUES 

506 (old.kind_codes_id, old.tmax_seconds, old.tmax_offset, 0) 

507 ; 

508 UPDATE %(coverage)s 

509 SET step = step + 1 

510 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id 

511 AND old.tmax_seconds == %(coverage)s.time_seconds 

512 AND old.tmax_offset == %(coverage)s.time_offset 

513 ; 

514 DELETE FROM %(coverage)s 

515 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id 

516 AND old.tmin_seconds == %(coverage)s.time_seconds 

517 AND old.tmin_offset == %(coverage)s.time_offset 

518 AND step == 0 

519 ; 

520 DELETE FROM %(coverage)s 

521 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id 

522 AND old.tmax_seconds == %(coverage)s.time_seconds 

523 AND old.tmax_offset == %(coverage)s.time_offset 

524 AND step == 0 

525 ; 

526 END 

527 ''')) 

528 

529 def _delete(self): 

530 '''Delete database tables associated with this Squirrel.''' 

531 

532 with self.transaction('delete tables') as cursor: 

533 for s in ''' 

534 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts; 

535 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts2; 

536 DROP TRIGGER %(db)s.%(file_states)s_delete_files; 

537 DROP TRIGGER %(db)s.%(nuts)s_inc_kind_codes; 

538 DROP TRIGGER %(db)s.%(nuts)s_dec_kind_codes; 

539 DROP TABLE %(db)s.%(nuts)s; 

540 DROP TABLE %(db)s.%(kind_codes_count)s; 

541 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_add_coverage; 

542 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_remove_coverage; 

543 DROP TABLE IF EXISTS %(db)s.%(coverage)s; 

544 '''.strip().splitlines(): 

545 

546 cursor.execute(self._sql(s)) 

547 

548 Selection._delete(self) 

549 

550 @filldocs 

551 def add(self, 

552 paths, 

553 kinds=None, 

554 format='detect', 

555 include=None, 

556 exclude=None, 

557 check=True): 

558 

559 ''' 

560 Add files to the selection. 

561 

562 :param paths: 

563 Iterator yielding paths to files or directories to be added to the 

564 selection. Recurses into directories. If given a ``str``, it 

565 is treated as a single path to be added. 

566 :type paths: 

567 :py:class:`list` of :py:class:`str` 

568 

569 :param kinds: 

570 Content types to be made available through the Squirrel selection. 

571 By default, all known content types are accepted. 

572 :type kinds: 

573 :py:class:`list` of :py:class:`str` 

574 

575 :param format: 

576 File format identifier or ``'detect'`` to enable auto-detection 

577 (available: %(file_formats)s). 

578 :type format: 

579 str 

580 

581 :param include: 

582 If not ``None``, files are only included if their paths match the 

583 given regular expression pattern. 

584 :type format: 

585 str 

586 

587 :param exclude: 

588 If not ``None``, files are only included if their paths do not 

589 match the given regular expression pattern. 

590 :type format: 

591 str 

592 

593 :param check: 

594 If ``True``, all file modification times are checked to see if 

595 cached information has to be updated (slow). If ``False``, only 

596 previously unknown files are indexed and cached information is used 

597 for known files, regardless of file state (fast, corrresponds to 

598 Squirrel's ``--optimistic`` mode). File deletions will go 

599 undetected in the latter case. 

600 :type check: 

601 bool 

602 

603 :Complexity: 

604 O(log N) 

605 ''' 

606 

607 if isinstance(kinds, str): 

608 kinds = (kinds,) 

609 

610 if isinstance(paths, str): 

611 paths = [paths] 

612 

613 kind_mask = model.to_kind_mask(kinds) 

614 

615 Selection.add( 

616 self, util.iter_select_files( 

617 paths, 

618 show_progress=False, 

619 include=include, 

620 exclude=exclude, 

621 pass_through=lambda path: path.startswith('virtual:') 

622 ), kind_mask, format) 

623 

624 self._load(check) 

625 self._update_nuts() 

626 

627 def reload(self): 

628 ''' 

629 Check for modifications and reindex modified files. 

630 

631 Based on file modification times. 

632 ''' 

633 

634 self._set_file_states_force_check() 

635 self._load(check=True) 

636 self._update_nuts() 

637 

638 def add_virtual(self, nuts, virtual_paths=None): 

639 ''' 

640 Add content which is not backed by files. 

641 

642 :param nuts: 

643 Content pieces to be added. 

644 :type nuts: 

645 iterator yielding :py:class:`~pyrocko.squirrel.model.Nut` objects 

646 

647 :param virtual_paths: 

648 List of virtual paths to prevent creating a temporary list of the 

649 nuts while aggregating the file paths for the selection. 

650 :type virtual_paths: 

651 :py:class:`list` of :py:class:`str` 

652 

653 Stores to the main database and the selection. 

654 ''' 

655 

656 if isinstance(virtual_paths, str): 

657 virtual_paths = [virtual_paths] 

658 

659 if virtual_paths is None: 

660 if not isinstance(nuts, list): 

661 nuts = list(nuts) 

662 virtual_paths = set(nut.file_path for nut in nuts) 

663 

664 Selection.add(self, virtual_paths) 

665 self.get_database().dig(nuts) 

666 self._update_nuts() 

667 

668 def add_volatile(self, nuts): 

669 if not isinstance(nuts, list): 

670 nuts = list(nuts) 

671 

672 paths = list(set(nut.file_path for nut in nuts)) 

673 io.backends.virtual.add_nuts(nuts) 

674 self.add_virtual(nuts, paths) 

675 self._volatile_paths.extend(paths) 

676 

677 def add_volatile_waveforms(self, traces): 

678 ''' 

679 Add in-memory waveforms which will be removed when the app closes. 

680 ''' 

681 

682 name = model.random_name() 

683 

684 path = 'virtual:volatile:%s' % name 

685 

686 nuts = [] 

687 for itr, tr in enumerate(traces): 

688 assert tr.tmin <= tr.tmax 

689 tmin_seconds, tmin_offset = model.tsplit(tr.tmin) 

690 tmax_seconds, tmax_offset = model.tsplit( 

691 tr.tmin + tr.data_len()*tr.deltat) 

692 

693 nuts.append(model.Nut( 

694 file_path=path, 

695 file_format='virtual', 

696 file_segment=itr, 

697 file_element=0, 

698 file_mtime=0, 

699 codes=tr.codes, 

700 tmin_seconds=tmin_seconds, 

701 tmin_offset=tmin_offset, 

702 tmax_seconds=tmax_seconds, 

703 tmax_offset=tmax_offset, 

704 deltat=tr.deltat, 

705 kind_id=to_kind_id('waveform'), 

706 content=tr)) 

707 

708 self.add_volatile(nuts) 

709 return path 

710 

711 def _load(self, check): 

712 for _ in io.iload( 

713 self, 

714 content=[], 

715 skip_unchanged=True, 

716 check=check): 

717 pass 

718 

719 def _update_nuts(self, transaction=None): 

720 transaction = transaction or self.transaction('update nuts') 

721 with make_task('Aggregating selection') as task, \ 

722 transaction as cursor: 

723 

724 self._conn.set_progress_handler(task.update, 100000) 

725 nrows = cursor.execute(self._sql( 

726 ''' 

727 INSERT INTO %(db)s.%(nuts)s 

728 SELECT NULL, 

729 nuts.file_id, nuts.file_segment, nuts.file_element, 

730 nuts.kind_id, nuts.kind_codes_id, 

731 nuts.tmin_seconds, nuts.tmin_offset, 

732 nuts.tmax_seconds, nuts.tmax_offset, 

733 nuts.kscale 

734 FROM %(db)s.%(file_states)s 

735 INNER JOIN nuts 

736 ON %(db)s.%(file_states)s.file_id == nuts.file_id 

737 INNER JOIN kind_codes 

738 ON nuts.kind_codes_id == 

739 kind_codes.kind_codes_id 

740 WHERE %(db)s.%(file_states)s.file_state != 2 

741 AND (((1 << kind_codes.kind_id) 

742 & %(db)s.%(file_states)s.kind_mask) != 0) 

743 ''')).rowcount 

744 

745 task.update(nrows) 

746 self._set_file_states_known(transaction) 

747 self._conn.set_progress_handler(None, 0) 

748 

749 def add_source(self, source, check=True): 

750 ''' 

751 Add remote resource. 

752 

753 :param source: 

754 Remote data access client instance. 

755 :type source: 

756 subclass of :py:class:`~pyrocko.squirrel.client.base.Source` 

757 ''' 

758 

759 self._sources.append(source) 

760 source.setup(self, check=check) 

761 

762 def add_fdsn(self, *args, **kwargs): 

763 ''' 

764 Add FDSN site for transparent remote data access. 

765 

766 Arguments are passed to 

767 :py:class:`~pyrocko.squirrel.client.fdsn.FDSNSource`. 

768 ''' 

769 

770 self.add_source(fdsn.FDSNSource(*args, **kwargs)) 

771 

772 def add_catalog(self, *args, **kwargs): 

773 ''' 

774 Add online catalog for transparent event data access. 

775 

776 Arguments are passed to 

777 :py:class:`~pyrocko.squirrel.client.catalog.CatalogSource`. 

778 ''' 

779 

780 self.add_source(catalog.CatalogSource(*args, **kwargs)) 

781 

782 def add_dataset(self, ds, check=True): 

783 ''' 

784 Read dataset description from file and add its contents. 

785 

786 :param ds: 

787 Path to dataset description file, dataset description object 

788 or name of a built-in dataset. See 

789 :py:mod:`~pyrocko.squirrel.dataset`. 

790 :type ds: 

791 :py:class:`str` or :py:class:`~pyrocko.squirrel.dataset.Dataset` 

792 

793 :param check: 

794 If ``True``, all file modification times are checked to see if 

795 cached information has to be updated (slow). If ``False``, only 

796 previously unknown files are indexed and cached information is used 

797 for known files, regardless of file state (fast, corrresponds to 

798 Squirrel's ``--optimistic`` mode). File deletions will go 

799 undetected in the latter case. 

800 :type check: 

801 bool 

802 ''' 

803 if isinstance(ds, str): 

804 ds = dataset.read_dataset(ds) 

805 

806 ds.setup(self, check=check) 

807 

808 def _get_selection_args( 

809 self, kind_id, 

810 obj=None, tmin=None, tmax=None, time=None, codes=None): 

811 

812 if codes is not None: 

813 codes = codes_patterns_for_kind(kind_id, codes) 

814 

815 if time is not None: 

816 tmin = time 

817 tmax = time 

818 

819 if obj is not None: 

820 tmin = tmin if tmin is not None else obj.tmin 

821 tmax = tmax if tmax is not None else obj.tmax 

822 codes = codes if codes is not None else codes_patterns_for_kind( 

823 kind_id, obj.codes) 

824 

825 return tmin, tmax, codes 

826 

827 def _get_selection_args_str(self, *args, **kwargs): 

828 

829 tmin, tmax, codes = self._get_selection_args(*args, **kwargs) 

830 return 'tmin: %s, tmax: %s, codes: %s' % ( 

831 util.time_to_str(tmin) if tmin is not None else 'none', 

832 util.time_to_str(tmax) if tmax is not None else 'none', 

833 ','.join(str(entry) for entry in codes)) 

834 

835 def _selection_args_to_kwargs( 

836 self, obj=None, tmin=None, tmax=None, time=None, codes=None): 

837 

838 return dict(obj=obj, tmin=tmin, tmax=tmax, time=time, codes=codes) 

839 

840 def _timerange_sql(self, tmin, tmax, kind, cond, args, naiv): 

841 

842 tmin_seconds, tmin_offset = model.tsplit(tmin) 

843 tmax_seconds, tmax_offset = model.tsplit(tmax) 

844 if naiv: 

845 cond.append('%(db)s.%(nuts)s.tmin_seconds <= ?') 

846 args.append(tmax_seconds) 

847 else: 

848 tscale_edges = model.tscale_edges 

849 tmin_cond = [] 

850 for kscale in range(tscale_edges.size + 1): 

851 if kscale != tscale_edges.size: 

852 tscale = int(tscale_edges[kscale]) 

853 tmin_cond.append(''' 

854 (%(db)s.%(nuts)s.kind_id = ? 

855 AND %(db)s.%(nuts)s.kscale == ? 

856 AND %(db)s.%(nuts)s.tmin_seconds BETWEEN ? AND ?) 

857 ''') 

858 args.extend( 

859 (to_kind_id(kind), kscale, 

860 tmin_seconds - tscale - 1, tmax_seconds + 1)) 

861 

862 else: 

863 tmin_cond.append(''' 

864 (%(db)s.%(nuts)s.kind_id == ? 

865 AND %(db)s.%(nuts)s.kscale == ? 

866 AND %(db)s.%(nuts)s.tmin_seconds <= ?) 

867 ''') 

868 

869 args.extend( 

870 (to_kind_id(kind), kscale, tmax_seconds + 1)) 

871 if tmin_cond: 

872 cond.append(' ( ' + ' OR '.join(tmin_cond) + ' ) ') 

873 

874 cond.append('%(db)s.%(nuts)s.tmax_seconds >= ?') 

875 args.append(tmin_seconds) 

876 

877 def _codes_match_sql(self, positive, kind_id, codes, cond, args): 

878 pats = codes_patterns_for_kind(kind_id, codes) 

879 if pats is None: 

880 return 

881 

882 pats_exact, pats_nonexact = model.classify_patterns(pats) 

883 

884 codes_cond = [] 

885 if pats_exact: 

886 codes_cond.append(' ( kind_codes.codes IN ( %s ) ) ' % ', '.join( 

887 '?'*len(pats_exact))) 

888 

889 args.extend(pats_exact) 

890 

891 if pats_nonexact: 

892 codes_cond.append(' ( %s ) ' % ' OR '.join( 

893 ('kind_codes.codes GLOB ?',) * len(pats_nonexact))) 

894 

895 args.extend(pats_nonexact) 

896 

897 if codes_cond: 

898 cond.append('%s ( %s )' % ( 

899 'NOT' if not positive else '', 

900 ' OR '.join(codes_cond))) 

901 

902 def iter_nuts( 

903 self, kind=None, tmin=None, tmax=None, codes=None, 

904 codes_exclude=None, sample_rate_min=None, sample_rate_max=None, 

905 naiv=False, kind_codes_ids=None, path=None, limit=None): 

906 

907 ''' 

908 Iterate over content entities matching given constraints. 

909 

910 :param kind: 

911 Content kind (or kinds) to extract. 

912 :type kind: 

913 :py:class:`str`, :py:class:`list` of :py:class:`str` 

914 

915 :param tmin: 

916 Start time of query interval. 

917 :type tmin: 

918 :py:func:`~pyrocko.util.get_time_float` 

919 

920 :param tmax: 

921 End time of query interval. 

922 :type tmax: 

923 :py:func:`~pyrocko.util.get_time_float` 

924 

925 :param codes: 

926 List of code patterns to query. 

927 :type codes: 

928 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes` 

929 objects appropriate for the queried content type, or anything which 

930 can be converted to such objects. 

931 

932 :param naiv: 

933 Bypass time span lookup through indices (slow, for testing). 

934 :type naiv: 

935 :py:class:`bool` 

936 

937 :param kind_codes_ids: 

938 Kind-codes IDs of contents to be retrieved (internal use). 

939 :type kind_codes_ids: 

940 :py:class:`list` of :py:class:`int` 

941 

942 :yields: 

943 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the 

944 intersecting content. 

945 

946 :complexity: 

947 O(log N) for the time selection part due to heavy use of database 

948 indices. 

949 

950 Query time span is treated as a half-open interval ``[tmin, tmax)``. 

951 However, if ``tmin`` equals ``tmax``, the edge logics are modified to 

952 closed-interval so that content intersecting with the time instant ``t 

953 = tmin = tmax`` is returned (otherwise nothing would be returned as 

954 ``[t, t)`` never matches anything). 

955 

956 Time spans of content entities to be matched are also treated as half 

957 open intervals, e.g. content span ``[0, 1)`` is matched by query span 

958 ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``. Also here, logics are 

959 modified to closed-interval when the content time span is an empty 

960 interval, i.e. to indicate a time instant. E.g. time instant 0 is 

961 matched by ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``. 

962 ''' 

963 

964 if not isinstance(kind, str): 

965 if kind is None: 

966 kind = model.g_content_kinds 

967 for kind_ in kind: 

968 for nut in self.iter_nuts(kind_, tmin, tmax, codes): 

969 yield nut 

970 

971 return 

972 

973 kind_id = to_kind_id(kind) 

974 

975 cond = [] 

976 args = [] 

977 if tmin is not None or tmax is not None: 

978 assert kind is not None 

979 if tmin is None: 

980 tmin = self.get_time_span()[0] 

981 if tmax is None: 

982 tmax = self.get_time_span()[1] + 1.0 

983 

984 self._timerange_sql(tmin, tmax, kind, cond, args, naiv) 

985 

986 cond.append('kind_codes.kind_id == ?') 

987 args.append(kind_id) 

988 

989 if codes is not None: 

990 self._codes_match_sql(True, kind_id, codes, cond, args) 

991 

992 if codes_exclude is not None: 

993 self._codes_match_sql(False, kind_id, codes_exclude, cond, args) 

994 

995 if sample_rate_min is not None: 

996 cond.append('kind_codes.deltat <= ?') 

997 args.append(1.0/sample_rate_min) 

998 

999 if sample_rate_max is not None: 

1000 cond.append('? <= kind_codes.deltat') 

1001 args.append(1.0/sample_rate_max) 

1002 

1003 if kind_codes_ids is not None: 

1004 cond.append( 

1005 ' ( kind_codes.kind_codes_id IN ( %s ) ) ' % ', '.join( 

1006 '?'*len(kind_codes_ids))) 

1007 

1008 args.extend(kind_codes_ids) 

1009 

1010 db = self.get_database() 

1011 if path is not None: 

1012 cond.append('files.path == ?') 

1013 args.append(db.relpath(abspath(path))) 

1014 

1015 sql = (''' 

1016 SELECT 

1017 files.path, 

1018 files.format, 

1019 files.mtime, 

1020 files.size, 

1021 %(db)s.%(nuts)s.file_segment, 

1022 %(db)s.%(nuts)s.file_element, 

1023 kind_codes.kind_id, 

1024 kind_codes.codes, 

1025 %(db)s.%(nuts)s.tmin_seconds, 

1026 %(db)s.%(nuts)s.tmin_offset, 

1027 %(db)s.%(nuts)s.tmax_seconds, 

1028 %(db)s.%(nuts)s.tmax_offset, 

1029 kind_codes.deltat 

1030 FROM files 

1031 INNER JOIN %(db)s.%(nuts)s 

1032 ON files.file_id == %(db)s.%(nuts)s.file_id 

1033 INNER JOIN kind_codes 

1034 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id 

1035 ''') 

1036 

1037 if cond: 

1038 sql += ''' WHERE ''' + ' AND '.join(cond) 

1039 

1040 if limit is not None: 

1041 sql += ''' LIMIT %i''' % limit 

1042 

1043 sql = self._sql(sql) 

1044 if tmin is None and tmax is None: 

1045 for row in self._conn.execute(sql, args): 

1046 row = (db.abspath(row[0]),) + row[1:] 

1047 nut = model.Nut(values_nocheck=row) 

1048 yield nut 

1049 else: 

1050 assert tmin is not None and tmax is not None 

1051 if tmin == tmax: 

1052 for row in self._conn.execute(sql, args): 

1053 row = (db.abspath(row[0]),) + row[1:] 

1054 nut = model.Nut(values_nocheck=row) 

1055 if (nut.tmin <= tmin < nut.tmax) \ 

1056 or (nut.tmin == nut.tmax and tmin == nut.tmin): 

1057 

1058 yield nut 

1059 else: 

1060 for row in self._conn.execute(sql, args): 

1061 row = (db.abspath(row[0]),) + row[1:] 

1062 nut = model.Nut(values_nocheck=row) 

1063 if (tmin < nut.tmax and nut.tmin < tmax) \ 

1064 or (nut.tmin == nut.tmax 

1065 and tmin <= nut.tmin < tmax): 

1066 

1067 yield nut 

1068 

1069 def get_nuts(self, *args, **kwargs): 

1070 ''' 

1071 Get content entities matching given constraints. 

1072 

1073 Like :py:meth:`iter_nuts` but returns results as a list. 

1074 ''' 

1075 

1076 return list(self.iter_nuts(*args, **kwargs)) 

1077 

1078 def _split_nuts( 

1079 self, kind, tmin=None, tmax=None, codes=None, path=None): 

1080 

1081 kind_id = to_kind_id(kind) 

1082 tmin_seconds, tmin_offset = model.tsplit(tmin) 

1083 tmax_seconds, tmax_offset = model.tsplit(tmax) 

1084 

1085 names_main_nuts = dict(self._names) 

1086 names_main_nuts.update(db='main', nuts='nuts') 

1087 

1088 db = self.get_database() 

1089 

1090 def main_nuts(s): 

1091 return s % names_main_nuts 

1092 

1093 with self.transaction('split nuts') as cursor: 

1094 # modify selection and main 

1095 for sql_subst in [ 

1096 self._sql, main_nuts]: 

1097 

1098 cond = [] 

1099 args = [] 

1100 

1101 self._timerange_sql(tmin, tmax, kind, cond, args, False) 

1102 

1103 if codes is not None: 

1104 self._codes_match_sql(True, kind_id, codes, cond, args) 

1105 

1106 if path is not None: 

1107 cond.append('files.path == ?') 

1108 args.append(db.relpath(abspath(path))) 

1109 

1110 sql = sql_subst(''' 

1111 SELECT 

1112 %(db)s.%(nuts)s.nut_id, 

1113 %(db)s.%(nuts)s.tmin_seconds, 

1114 %(db)s.%(nuts)s.tmin_offset, 

1115 %(db)s.%(nuts)s.tmax_seconds, 

1116 %(db)s.%(nuts)s.tmax_offset, 

1117 kind_codes.deltat 

1118 FROM files 

1119 INNER JOIN %(db)s.%(nuts)s 

1120 ON files.file_id == %(db)s.%(nuts)s.file_id 

1121 INNER JOIN kind_codes 

1122 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id 

1123 WHERE ''' + ' AND '.join(cond)) # noqa 

1124 

1125 insert = [] 

1126 delete = [] 

1127 for row in cursor.execute(sql, args): 

1128 nut_id, nut_tmin_seconds, nut_tmin_offset, \ 

1129 nut_tmax_seconds, nut_tmax_offset, nut_deltat = row 

1130 

1131 nut_tmin = model.tjoin( 

1132 nut_tmin_seconds, nut_tmin_offset) 

1133 nut_tmax = model.tjoin( 

1134 nut_tmax_seconds, nut_tmax_offset) 

1135 

1136 if nut_tmin < tmax and tmin < nut_tmax: 

1137 if nut_tmin < tmin: 

1138 insert.append(( 

1139 nut_tmin_seconds, nut_tmin_offset, 

1140 tmin_seconds, tmin_offset, 

1141 model.tscale_to_kscale( 

1142 tmin_seconds - nut_tmin_seconds), 

1143 nut_id)) 

1144 

1145 if tmax < nut_tmax: 

1146 insert.append(( 

1147 tmax_seconds, tmax_offset, 

1148 nut_tmax_seconds, nut_tmax_offset, 

1149 model.tscale_to_kscale( 

1150 nut_tmax_seconds - tmax_seconds), 

1151 nut_id)) 

1152 

1153 delete.append((nut_id,)) 

1154 

1155 sql_add = ''' 

1156 INSERT INTO %(db)s.%(nuts)s ( 

1157 file_id, file_segment, file_element, kind_id, 

1158 kind_codes_id, tmin_seconds, tmin_offset, 

1159 tmax_seconds, tmax_offset, kscale ) 

1160 SELECT 

1161 file_id, file_segment, file_element, 

1162 kind_id, kind_codes_id, ?, ?, ?, ?, ? 

1163 FROM %(db)s.%(nuts)s 

1164 WHERE nut_id == ? 

1165 ''' 

1166 cursor.executemany(sql_subst(sql_add), insert) 

1167 

1168 sql_delete = ''' 

1169 DELETE FROM %(db)s.%(nuts)s WHERE nut_id == ? 

1170 ''' 

1171 cursor.executemany(sql_subst(sql_delete), delete) 

1172 

1173 def get_time_span(self, kinds=None, tight=True, dummy_limits=True): 

1174 ''' 

1175 Get time interval over all content in selection. 

1176 

1177 :param kinds: 

1178 If not ``None``, restrict query to given content kinds. 

1179 :type kind: 

1180 list of str 

1181 

1182 :complexity: 

1183 O(1), independent of the number of nuts. 

1184 

1185 :returns: 

1186 ``(tmin, tmax)``, combined time interval of queried content kinds. 

1187 ''' 

1188 

1189 sql_min = self._sql(''' 

1190 SELECT MIN(tmin_seconds), MIN(tmin_offset) 

1191 FROM %(db)s.%(nuts)s 

1192 WHERE kind_id == ? 

1193 AND tmin_seconds == ( 

1194 SELECT MIN(tmin_seconds) 

1195 FROM %(db)s.%(nuts)s 

1196 WHERE kind_id == ?) 

1197 ''') 

1198 

1199 sql_max = self._sql(''' 

1200 SELECT MAX(tmax_seconds), MAX(tmax_offset) 

1201 FROM %(db)s.%(nuts)s 

1202 WHERE kind_id == ? 

1203 AND tmax_seconds == ( 

1204 SELECT MAX(tmax_seconds) 

1205 FROM %(db)s.%(nuts)s 

1206 WHERE kind_id == ?) 

1207 ''') 

1208 

1209 gtmin = None 

1210 gtmax = None 

1211 

1212 if isinstance(kinds, str): 

1213 kinds = [kinds] 

1214 

1215 if kinds is None: 

1216 kind_ids = model.g_content_kind_ids 

1217 else: 

1218 kind_ids = model.to_kind_ids(kinds) 

1219 

1220 tmins = [] 

1221 tmaxs = [] 

1222 for kind_id in kind_ids: 

1223 for tmin_seconds, tmin_offset in self._conn.execute( 

1224 sql_min, (kind_id, kind_id)): 

1225 tmins.append(model.tjoin(tmin_seconds, tmin_offset)) 

1226 

1227 for (tmax_seconds, tmax_offset) in self._conn.execute( 

1228 sql_max, (kind_id, kind_id)): 

1229 tmaxs.append(model.tjoin(tmax_seconds, tmax_offset)) 

1230 

1231 tmins = [tmin if tmin != model.g_tmin else None for tmin in tmins] 

1232 tmaxs = [tmax if tmax != model.g_tmax else None for tmax in tmaxs] 

1233 

1234 if tight: 

1235 gtmin = nonef(min, tmins) 

1236 gtmax = nonef(max, tmaxs) 

1237 else: 

1238 gtmin = None if None in tmins else nonef(min, tmins) 

1239 gtmax = None if None in tmaxs else nonef(max, tmaxs) 

1240 

1241 if dummy_limits: 

1242 if gtmin is None: 

1243 gtmin = model.g_tmin 

1244 if gtmax is None: 

1245 gtmax = model.g_tmax 

1246 

1247 return gtmin, gtmax 

1248 

1249 def has(self, kinds): 

1250 ''' 

1251 Check availability of given content kinds. 

1252 

1253 :param kinds: 

1254 Content kinds to query. 

1255 :type kind: 

1256 list of str 

1257 

1258 :returns: 

1259 ``True`` if any of the queried content kinds is available 

1260 in the selection. 

1261 ''' 

1262 self_tmin, self_tmax = self.get_time_span( 

1263 kinds, dummy_limits=False) 

1264 

1265 return None not in (self_tmin, self_tmax) 

1266 

1267 def get_deltat_span(self, kind): 

1268 ''' 

1269 Get min and max sampling interval of all content of given kind. 

1270 

1271 :param kind: 

1272 Content kind 

1273 :type kind: 

1274 str 

1275 

1276 :returns: ``(deltat_min, deltat_max)`` 

1277 ''' 

1278 

1279 deltats = [ 

1280 deltat for deltat in self.get_deltats(kind) 

1281 if deltat is not None] 

1282 

1283 if deltats: 

1284 return min(deltats), max(deltats) 

1285 else: 

1286 return None, None 

1287 

1288 def iter_kinds(self, codes=None): 

1289 ''' 

1290 Iterate over content types available in selection. 

1291 

1292 :param codes: 

1293 If given, get kinds only for selected codes identifier. 

1294 Only a single identifier may be given here and no pattern matching 

1295 is done, currently. 

1296 :type codes: 

1297 :py:class:`~pyrocko.squirrel.model.Codes` 

1298 

1299 :yields: 

1300 Available content kinds as :py:class:`str`. 

1301 

1302 :complexity: 

1303 O(1), independent of number of nuts. 

1304 ''' 

1305 

1306 return self._database._iter_kinds( 

1307 codes=codes, 

1308 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names) 

1309 

1310 def iter_deltats(self, kind=None): 

1311 ''' 

1312 Iterate over sampling intervals available in selection. 

1313 

1314 :param kind: 

1315 If given, get sampling intervals only for a given content type. 

1316 :type kind: 

1317 str 

1318 

1319 :yields: 

1320 :py:class:`float` values. 

1321 

1322 :complexity: 

1323 O(1), independent of number of nuts. 

1324 ''' 

1325 return self._database._iter_deltats( 

1326 kind=kind, 

1327 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names) 

1328 

1329 def iter_codes(self, kind=None): 

1330 ''' 

1331 Iterate over content identifier code sequences available in selection. 

1332 

1333 :param kind: 

1334 If given, get codes only for a given content type. 

1335 :type kind: 

1336 str 

1337 

1338 :yields: 

1339 :py:class:`tuple` of :py:class:`str` 

1340 

1341 :complexity: 

1342 O(1), independent of number of nuts. 

1343 ''' 

1344 return self._database._iter_codes( 

1345 kind=kind, 

1346 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names) 

1347 

1348 def _iter_codes_info(self, kind=None, codes=None): 

1349 ''' 

1350 Iterate over number of occurrences of any (kind, codes) combination. 

1351 

1352 :param kind: 

1353 If given, get counts only for selected content type. 

1354 :type kind: 

1355 str 

1356 

1357 :yields: 

1358 Tuples of the form ``(kind, codes, deltat, kind_codes_id, count)``. 

1359 

1360 :complexity: 

1361 O(1), independent of number of nuts. 

1362 ''' 

1363 return self._database._iter_codes_info( 

1364 kind=kind, 

1365 codes=codes, 

1366 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names) 

1367 

1368 def get_kinds(self, codes=None): 

1369 ''' 

1370 Get content types available in selection. 

1371 

1372 :param codes: 

1373 If given, get kinds only for selected codes identifier. 

1374 Only a single identifier may be given here and no pattern matching 

1375 is done, currently. 

1376 :type codes: 

1377 :py:class:`~pyrocko.squirrel.model.Codes` 

1378 

1379 :returns: 

1380 Sorted list of available content types. 

1381 :rtype: 

1382 py:class:`list` of :py:class:`str` 

1383 

1384 :complexity: 

1385 O(1), independent of number of nuts. 

1386 

1387 ''' 

1388 return sorted(list(self.iter_kinds(codes=codes))) 

1389 

1390 def get_deltats(self, kind=None): 

1391 ''' 

1392 Get sampling intervals available in selection. 

1393 

1394 :param kind: 

1395 If given, get sampling intervals only for selected content type. 

1396 :type kind: 

1397 str 

1398 

1399 :complexity: 

1400 O(1), independent of number of nuts. 

1401 

1402 :returns: Sorted list of available sampling intervals. 

1403 ''' 

1404 return sorted(list(self.iter_deltats(kind=kind))) 

1405 

1406 def get_codes(self, kind=None): 

1407 ''' 

1408 Get identifier code sequences available in selection. 

1409 

1410 :param kind: 

1411 If given, get codes only for selected content type. 

1412 :type kind: 

1413 str 

1414 

1415 :complexity: 

1416 O(1), independent of number of nuts. 

1417 

1418 :returns: Sorted list of available codes as tuples of strings. 

1419 ''' 

1420 return sorted(list(self.iter_codes(kind=kind))) 

1421 

1422 def get_counts(self, kind=None): 

1423 ''' 

1424 Get number of occurrences of any (kind, codes) combination. 

1425 

1426 :param kind: 

1427 If given, get codes only for selected content type. 

1428 :type kind: 

1429 str 

1430 

1431 :complexity: 

1432 O(1), independent of number of nuts. 

1433 

1434 :returns: ``dict`` with ``counts[kind][codes]`` or ``counts[codes]`` 

1435 if kind is not ``None`` 

1436 ''' 

1437 d = {} 

1438 for kind_id, codes, _, _, count in self._iter_codes_info(kind=kind): 

1439 if kind_id not in d: 

1440 v = d[kind_id] = {} 

1441 else: 

1442 v = d[kind_id] 

1443 

1444 if codes not in v: 

1445 v[codes] = 0 

1446 

1447 v[codes] += count 

1448 

1449 if kind is not None: 

1450 return d[to_kind_id(kind)] 

1451 else: 

1452 return dict((to_kind(kind_id), v) for (kind_id, v) in d.items()) 

1453 

1454 def glob_codes(self, kind, codes): 

1455 ''' 

1456 Find codes matching given patterns. 

1457 

1458 :param kind: 

1459 Content kind to be queried. 

1460 :type kind: 

1461 str 

1462 

1463 :param codes: 

1464 List of code patterns to query. 

1465 :type codes: 

1466 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes` 

1467 objects appropriate for the queried content type, or anything which 

1468 can be converted to such objects. 

1469 

1470 :returns: 

1471 List of matches of the form ``[kind_codes_id, codes, deltat]``. 

1472 ''' 

1473 

1474 kind_id = to_kind_id(kind) 

1475 args = [kind_id] 

1476 pats = codes_patterns_for_kind(kind_id, codes) 

1477 

1478 if pats: 

1479 codes_cond = 'AND ( %s ) ' % ' OR '.join( 

1480 ('kind_codes.codes GLOB ?',) * len(pats)) 

1481 

1482 args.extend(pat.safe_str for pat in pats) 

1483 else: 

1484 codes_cond = '' 

1485 

1486 sql = self._sql(''' 

1487 SELECT kind_codes_id, codes, deltat FROM kind_codes 

1488 WHERE 

1489 kind_id == ? ''' + codes_cond) 

1490 

1491 return list(map(list, self._conn.execute(sql, args))) 

1492 

1493 def update(self, constraint=None, **kwargs): 

1494 ''' 

1495 Update or partially update channel and event inventories. 

1496 

1497 :param constraint: 

1498 Selection of times or areas to be brought up to date. 

1499 :type constraint: 

1500 :py:class:`~pyrocko.squirrel.client.base.Constraint` 

1501 

1502 :param \\*\\*kwargs: 

1503 Shortcut for setting ``constraint=Constraint(**kwargs)``. 

1504 

1505 This function triggers all attached remote sources, to check for 

1506 updates in the meta-data. The sources will only submit queries when 

1507 their expiration date has passed, or if the selection spans into 

1508 previously unseen times or areas. 

1509 ''' 

1510 

1511 if constraint is None: 

1512 constraint = client.Constraint(**kwargs) 

1513 

1514 task = make_task('Updating sources') 

1515 for source in task(self._sources): 

1516 source.update_channel_inventory(self, constraint) 

1517 source.update_event_inventory(self, constraint) 

1518 

1519 def update_waveform_promises(self, constraint=None, **kwargs): 

1520 ''' 

1521 Permit downloading of remote waveforms. 

1522 

1523 :param constraint: 

1524 Remote waveforms compatible with the given constraint are enabled 

1525 for download. 

1526 :type constraint: 

1527 :py:class:`~pyrocko.squirrel.client.base.Constraint` 

1528 

1529 :param \\*\\*kwargs: 

1530 Shortcut for setting ``constraint=Constraint(**kwargs)``. 

1531 

1532 Calling this method permits Squirrel to download waveforms from remote 

1533 sources when processing subsequent waveform requests. This works by 

1534 inserting so called waveform promises into the database. It will look 

1535 into the available channels for each remote source and create a promise 

1536 for each channel compatible with the given constraint. If the promise 

1537 then matches in a waveform request, Squirrel tries to download the 

1538 waveform. If the download is successful, the downloaded waveform is 

1539 added to the Squirrel and the promise is deleted. If the download 

1540 fails, the promise is kept if the reason of failure looks like being 

1541 temporary, e.g. because of a network failure. If the cause of failure 

1542 however seems to be permanent, the promise is deleted so that no 

1543 further attempts are made to download a waveform which might not be 

1544 available from that server at all. To force re-scheduling after a 

1545 permanent failure, call :py:meth:`update_waveform_promises` 

1546 yet another time. 

1547 ''' 

1548 

1549 if constraint is None: 

1550 constraint = client.Constraint(**kwargs) 

1551 

1552 for source in self._sources: 

1553 source.update_waveform_promises(self, constraint) 

1554 

1555 def remove_waveform_promises(self, from_database='selection'): 

1556 ''' 

1557 Remove waveform promises from live selection or global database. 

1558 

1559 Calling this function removes all waveform promises provided by the 

1560 attached sources. 

1561 

1562 :param from_database: 

1563 Remove from live selection ``'selection'`` or global database 

1564 ``'global'``. 

1565 ''' 

1566 for source in self._sources: 

1567 source.remove_waveform_promises(self, from_database=from_database) 

1568 

1569 def update_responses(self, constraint=None, **kwargs): 

1570 if constraint is None: 

1571 constraint = client.Constraint(**kwargs) 

1572 

1573 for source in self._sources: 

1574 source.update_response_inventory(self, constraint) 

1575 

1576 def get_nfiles(self): 

1577 ''' 

1578 Get number of files in selection. 

1579 ''' 

1580 

1581 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(file_states)s''') 

1582 for row in self._conn.execute(sql): 

1583 return row[0] 

1584 

1585 def get_nnuts(self): 

1586 ''' 

1587 Get number of nuts in selection. 

1588 ''' 

1589 

1590 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(nuts)s''') 

1591 for row in self._conn.execute(sql): 

1592 return row[0] 

1593 

1594 def get_total_size(self): 

1595 ''' 

1596 Get aggregated file size available in selection. 

1597 ''' 

1598 

1599 sql = self._sql(''' 

1600 SELECT SUM(files.size) FROM %(db)s.%(file_states)s 

1601 INNER JOIN files 

1602 ON %(db)s.%(file_states)s.file_id = files.file_id 

1603 ''') 

1604 

1605 for row in self._conn.execute(sql): 

1606 return row[0] or 0 

1607 

1608 def get_stats(self): 

1609 ''' 

1610 Get statistics on contents available through this selection. 

1611 ''' 

1612 

1613 kinds = self.get_kinds() 

1614 time_spans = {} 

1615 for kind in kinds: 

1616 time_spans[kind] = self.get_time_span([kind]) 

1617 

1618 return SquirrelStats( 

1619 nfiles=self.get_nfiles(), 

1620 nnuts=self.get_nnuts(), 

1621 kinds=kinds, 

1622 codes=self.get_codes(), 

1623 total_size=self.get_total_size(), 

1624 counts=self.get_counts(), 

1625 time_spans=time_spans, 

1626 sources=[s.describe() for s in self._sources], 

1627 operators=[op.describe() for op in self._operators]) 

1628 

1629 @filldocs 

1630 def check( 

1631 self, obj=None, tmin=None, tmax=None, time=None, codes=None, 

1632 ignore=[]): 

1633 ''' 

1634 Check for common data/metadata problems. 

1635 

1636 %(query_args)s 

1637 

1638 :param ignore: 

1639 Problem types to be ignored. 

1640 :type ignore: 

1641 :class:`list` of :class:`str` 

1642 (:py:class:`~pyrocko.squirrel.check.SquirrelCheckProblemType`) 

1643 

1644 :returns: 

1645 :py:class:`~pyrocko.squirrel.check.SquirrelCheck` object 

1646 containing the results of the check. 

1647 

1648 See :py:func:`~pyrocko.squirrel.check.do_check`. 

1649 ''' 

1650 

1651 from .check import do_check 

1652 tmin, tmax, codes = self._get_selection_args( 

1653 CHANNEL, obj, tmin, tmax, time, codes) 

1654 

1655 return do_check(self, tmin=tmin, tmax=tmax, codes=codes, ignore=ignore) 

1656 

1657 def get_content( 

1658 self, 

1659 nut, 

1660 cache_id='default', 

1661 accessor_id='default', 

1662 show_progress=False, 

1663 model='squirrel'): 

1664 

1665 ''' 

1666 Get and possibly load full content for a given index entry from file. 

1667 

1668 Loads the actual content objects (channel, station, waveform, ...) from 

1669 file. For efficiency, sibling content (all stuff in the same file 

1670 segment) will also be loaded as a side effect. The loaded contents are 

1671 cached in the Squirrel object. 

1672 ''' 

1673 

1674 content_cache = self._content_caches[cache_id] 

1675 if not content_cache.has(nut): 

1676 

1677 for nut_loaded in io.iload( 

1678 nut.file_path, 

1679 segment=nut.file_segment, 

1680 format=nut.file_format, 

1681 database=self._database, 

1682 update_selection=self, 

1683 show_progress=show_progress): 

1684 

1685 content_cache.put(nut_loaded) 

1686 

1687 try: 

1688 return content_cache.get(nut, accessor_id, model) 

1689 

1690 except KeyError: 

1691 raise error.NotAvailable( 

1692 'Unable to retrieve content: %s, %s, %s, %s' % nut.key) 

1693 

1694 def advance_accessor(self, accessor_id='default', cache_id=None): 

1695 ''' 

1696 Notify memory caches about consumer moving to a new data batch. 

1697 

1698 :param accessor_id: 

1699 Name of accessing consumer to be advanced. 

1700 :type accessor_id: 

1701 str 

1702 

1703 :param cache_id: 

1704 Name of cache to for which the accessor should be advanced. By 

1705 default the named accessor is advanced in all registered caches. 

1706 By default, two caches named ``'default'`` and ``'waveform'`` are 

1707 available. 

1708 :type cache_id: 

1709 str 

1710 

1711 See :py:class:`~pyrocko.squirrel.cache.ContentCache` for details on how 

1712 Squirrel's memory caching works and can be tuned. Default behaviour is 

1713 to release data when it has not been used in the latest data 

1714 window/batch. If the accessor is never advanced, data is cached 

1715 indefinitely - which is often desired e.g. for station meta-data. 

1716 Methods for consecutive data traversal, like 

1717 :py:meth:`chopper_waveforms` automatically advance and clear 

1718 their accessor. 

1719 ''' 

1720 for cache_ in ( 

1721 self._content_caches.keys() 

1722 if cache_id is None 

1723 else [cache_id]): 

1724 

1725 self._content_caches[cache_].advance_accessor(accessor_id) 

1726 

1727 def clear_accessor(self, accessor_id, cache_id=None): 

1728 ''' 

1729 Notify memory caches about a consumer having finished. 

1730 

1731 :param accessor_id: 

1732 Name of accessor to be cleared. 

1733 :type accessor_id: 

1734 str 

1735 

1736 :param cache_id: 

1737 Name of cache for which the accessor should be cleared. By default 

1738 the named accessor is cleared from all registered caches. By 

1739 default, two caches named ``'default'`` and ``'waveform'`` are 

1740 available. 

1741 :type cache_id: 

1742 str 

1743 

1744 Calling this method clears all references to cache entries held by the 

1745 named accessor. Cache entries are then freed if not referenced by any 

1746 other accessor. 

1747 ''' 

1748 

1749 for cache_ in ( 

1750 self._content_caches.keys() 

1751 if cache_id is None 

1752 else [cache_id]): 

1753 

1754 self._content_caches[cache_].clear_accessor(accessor_id) 

1755 

1756 def get_cache_stats(self, cache_id): 

1757 return self._content_caches[cache_id].get_stats() 

1758 

1759 @filldocs 

1760 def get_stations( 

1761 self, obj=None, tmin=None, tmax=None, time=None, codes=None, 

1762 model='squirrel', on_error='raise'): 

1763 

1764 ''' 

1765 Get stations matching given constraints. 

1766 

1767 %(query_args)s 

1768 

1769 :param model: 

1770 Select object model for returned values: ``'squirrel'`` to get 

1771 Squirrel station objects or ``'pyrocko'`` to get Pyrocko station 

1772 objects with channel information attached. 

1773 :type model: 

1774 str 

1775 

1776 :returns: 

1777 List of :py:class:`pyrocko.squirrel.Station 

1778 <pyrocko.squirrel.model.Station>` objects by default or list of 

1779 :py:class:`pyrocko.model.Station <pyrocko.model.station.Station>` 

1780 objects if ``model='pyrocko'`` is requested. 

1781 

1782 See :py:meth:`iter_nuts` for details on time span matching. 

1783 ''' 

1784 

1785 if model == 'pyrocko': 

1786 return self._get_pyrocko_stations( 

1787 obj, tmin, tmax, time, codes, on_error=on_error) 

1788 elif model in ('squirrel', 'stationxml', 'stationxml+'): 

1789 args = self._get_selection_args( 

1790 STATION, obj, tmin, tmax, time, codes) 

1791 

1792 nuts = sorted( 

1793 self.iter_nuts('station', *args), key=lambda nut: nut.dkey) 

1794 

1795 return [self.get_content(nut, model=model) for nut in nuts] 

1796 else: 

1797 raise ValueError('Invalid station model: %s' % model) 

1798 

1799 @filldocs 

1800 def get_channels( 

1801 self, obj=None, tmin=None, tmax=None, time=None, codes=None, 

1802 model='squirrel'): 

1803 

1804 ''' 

1805 Get channels matching given constraints. 

1806 

1807 %(query_args)s 

1808 

1809 :returns: 

1810 List of :py:class:`~pyrocko.squirrel.model.Channel` objects. 

1811 

1812 See :py:meth:`iter_nuts` for details on time span matching. 

1813 ''' 

1814 

1815 args = self._get_selection_args( 

1816 CHANNEL, obj, tmin, tmax, time, codes) 

1817 

1818 nuts = sorted( 

1819 self.iter_nuts('channel', *args), key=lambda nut: nut.dkey) 

1820 

1821 return [self.get_content(nut, model=model) for nut in nuts] 

1822 

1823 @filldocs 

1824 def get_sensors( 

1825 self, obj=None, tmin=None, tmax=None, time=None, codes=None): 

1826 

1827 ''' 

1828 Get sensors matching given constraints. 

1829 

1830 %(query_args)s 

1831 

1832 :returns: 

1833 List of :py:class:`~pyrocko.squirrel.model.Sensor` objects. 

1834 

1835 See :py:meth:`iter_nuts` for details on time span matching. 

1836 ''' 

1837 

1838 tmin, tmax, codes = self._get_selection_args( 

1839 CHANNEL, obj, tmin, tmax, time, codes) 

1840 

1841 if codes is not None: 

1842 codes = codes_patterns_list( 

1843 (entry.replace(channel=entry.channel[:-1] + '?') 

1844 if entry.channel != '*' else entry) 

1845 for entry in codes) 

1846 

1847 nuts = sorted( 

1848 self.iter_nuts( 

1849 'channel', tmin, tmax, codes), key=lambda nut: nut.dkey) 

1850 

1851 return [ 

1852 sensor for sensor in model.Sensor.from_channels( 

1853 self.get_content(nut) for nut in nuts) 

1854 if match_time_span(tmin, tmax, sensor)] 

1855 

1856 @filldocs 

1857 def get_responses( 

1858 self, obj=None, tmin=None, tmax=None, time=None, codes=None, 

1859 model='squirrel'): 

1860 

1861 ''' 

1862 Get instrument responses matching given constraints. 

1863 

1864 %(query_args)s 

1865 

1866 :param model: 

1867 Select data model for returned objects. Choices: ``'squirrel'``, 

1868 ``'stationxml'``, ``'stationxml+'``. See return value description. 

1869 :type model: 

1870 str 

1871 

1872 :returns: 

1873 List of :py:class:`~pyrocko.squirrel.model.Response` if ``model == 

1874 'squirrel'`` or list of 

1875 :py:class:`~pyrocko.io.stationxml.FDSNStationXML` 

1876 if ``model == 'stationxml'`` or list of 

1877 (:py:class:`~pyrocko.squirrel.model.Response`, 

1878 :py:class:`~pyrocko.io.stationxml.FDSNStationXML`) if ``model == 

1879 'stationxml+'``. 

1880 

1881 See :py:meth:`iter_nuts` for details on time span matching. 

1882 ''' 

1883 

1884 args = self._get_selection_args( 

1885 RESPONSE, obj, tmin, tmax, time, codes) 

1886 

1887 nuts = sorted( 

1888 self.iter_nuts('response', *args), key=lambda nut: nut.dkey) 

1889 

1890 return [self.get_content(nut, model=model) for nut in nuts] 

1891 

1892 @filldocs 

1893 def get_response( 

1894 self, obj=None, tmin=None, tmax=None, time=None, codes=None, 

1895 model='squirrel', on_duplicate='raise'): 

1896 

1897 ''' 

1898 Get instrument response matching given constraints. 

1899 

1900 %(query_args)s 

1901 

1902 :param model: 

1903 Select data model for returned object. Choices: ``'squirrel'``, 

1904 ``'stationxml'``, ``'stationxml+'``. See return value description. 

1905 :type model: 

1906 str 

1907 

1908 :param on_duplicate: 

1909 Determines how duplicates/multiple matching responses are handled. 

1910 Choices: ``'raise'`` - raise 

1911 :py:exc:`~pyrocko.squirrel.error.Duplicate`, ``'warn'`` - emit a 

1912 warning and return first match, ``'ignore'`` - silently return 

1913 first match. 

1914 :type on_duplicate: 

1915 str 

1916 

1917 :returns: 

1918 :py:class:`~pyrocko.squirrel.model.Response` if 

1919 ``model == 'squirrel'`` or 

1920 :py:class:`~pyrocko.io.stationxml.FDSNStationXML` if ``model == 

1921 'stationxml'`` or 

1922 (:py:class:`~pyrocko.squirrel.model.Response`, 

1923 :py:class:`~pyrocko.io.stationxml.FDSNStationXML`) if ``model == 

1924 'stationxml+'``. 

1925 

1926 Same as :py:meth:`get_responses` but returning exactly one response. 

1927 Raises :py:exc:`~pyrocko.squirrel.error.NotAvailable` if none is 

1928 available. Duplicates are handled according to the ``on_duplicate`` 

1929 argument. 

1930 

1931 See :py:meth:`iter_nuts` for details on time span matching. 

1932 ''' 

1933 

1934 if model == 'stationxml': 

1935 model_ = 'stationxml+' 

1936 else: 

1937 model_ = model 

1938 

1939 responses = self.get_responses( 

1940 obj, tmin, tmax, time, codes, model=model_) 

1941 if len(responses) == 0: 

1942 raise error.NotAvailable( 

1943 'No instrument response available (%s).' 

1944 % self._get_selection_args_str( 

1945 RESPONSE, obj, tmin, tmax, time, codes)) 

1946 

1947 elif len(responses) > 1: 

1948 

1949 if on_duplicate in ('raise', 'warn'): 

1950 if model_ == 'squirrel': 

1951 resps_sq = responses 

1952 elif model_ == 'stationxml+': 

1953 resps_sq = [resp[0] for resp in responses] 

1954 else: 

1955 raise ValueError('Invalid response model: %s' % model) 

1956 

1957 rinfo = ':\n' + '\n'.join( 

1958 ' ' + resp.summary for resp in resps_sq) 

1959 

1960 message = \ 

1961 'Multiple instrument responses matching given ' \ 

1962 'constraints (%s)%s%s' % ( 

1963 self._get_selection_args_str( 

1964 RESPONSE, obj, tmin, tmax, time, codes), 

1965 ' -> using first' if on_duplicate == 'warn' else '', 

1966 rinfo) 

1967 

1968 if on_duplicate == 'raise': 

1969 raise error.Duplicate(message) 

1970 

1971 elif on_duplicate == 'warn': 

1972 logger.warning(message) 

1973 

1974 elif on_duplicate == 'ignore': 

1975 pass 

1976 

1977 else: 

1978 ValueError( 

1979 'Invalid argument for on_duplicate: %s' % on_duplicate) 

1980 

1981 if model == 'stationxml': 

1982 return responses[0][1] 

1983 else: 

1984 return responses[0] 

1985 

1986 @filldocs 

1987 def get_events( 

1988 self, obj=None, tmin=None, tmax=None, time=None, codes=None): 

1989 

1990 ''' 

1991 Get events matching given constraints. 

1992 

1993 %(query_args)s 

1994 

1995 :returns: 

1996 List of :py:class:`~pyrocko.model.event.Event` objects. 

1997 

1998 See :py:meth:`iter_nuts` for details on time span matching. 

1999 ''' 

2000 

2001 args = self._get_selection_args(EVENT, obj, tmin, tmax, time, codes) 

2002 nuts = sorted( 

2003 self.iter_nuts('event', *args), key=lambda nut: nut.dkey) 

2004 

2005 return [self.get_content(nut) for nut in nuts] 

2006 

2007 def _redeem_promises(self, *args, order_only=False): 

2008 

2009 def split_promise(order, tmax=None): 

2010 self._split_nuts( 

2011 'waveform_promise', 

2012 order.tmin, tmax if tmax is not None else order.tmax, 

2013 codes=order.codes, 

2014 path=order.source_id) 

2015 

2016 tmin, tmax = args[:2] 

2017 

2018 waveforms = list(self.iter_nuts('waveform', *args)) 

2019 promises = list(self.iter_nuts('waveform_promise', *args)) 

2020 

2021 codes_to_avail = defaultdict(list) 

2022 for nut in waveforms: 

2023 codes_to_avail[nut.codes].append((nut.tmin, nut.tmax)) 

2024 

2025 def tts(x): 

2026 if isinstance(x, tuple): 

2027 return tuple(tts(e) for e in x) 

2028 elif isinstance(x, list): 

2029 return list(tts(e) for e in x) 

2030 else: 

2031 return util.time_to_str(x) 

2032 

2033 now = time.time() 

2034 orders = [] 

2035 for promise in promises: 

2036 waveforms_avail = codes_to_avail[promise.codes] 

2037 for block_tmin, block_tmax in blocks( 

2038 max(tmin, promise.tmin), 

2039 min(tmax, promise.tmax), 

2040 promise.deltat): 

2041 

2042 if block_tmin > now: 

2043 continue 

2044 

2045 orders.append( 

2046 WaveformOrder( 

2047 source_id=promise.file_path, 

2048 codes=promise.codes, 

2049 tmin=block_tmin, 

2050 tmax=block_tmax, 

2051 deltat=promise.deltat, 

2052 gaps=gaps(waveforms_avail, block_tmin, block_tmax), 

2053 time_created=now)) 

2054 

2055 orders_noop, orders = lpick(lambda order: order.gaps, orders) 

2056 

2057 order_keys_noop = set(order_key(order) for order in orders_noop) 

2058 if len(order_keys_noop) != 0 or len(orders_noop) != 0: 

2059 logger.info( 

2060 'Waveform orders already satisified with cached/local data: ' 

2061 '%i (%i)' % (len(order_keys_noop), len(orders_noop))) 

2062 

2063 for order in orders_noop: 

2064 split_promise(order) 

2065 

2066 if order_only: 

2067 if orders: 

2068 self._pending_orders.extend(orders) 

2069 logger.info( 

2070 'Enqueuing %i waveform order%s.' 

2071 % len_plural(orders)) 

2072 return 

2073 else: 

2074 if self._pending_orders: 

2075 orders.extend(self._pending_orders) 

2076 logger.info( 

2077 'Adding %i previously enqueued order%s.' 

2078 % len_plural(self._pending_orders)) 

2079 

2080 self._pending_orders = [] 

2081 

2082 source_ids = [] 

2083 sources = {} 

2084 for source in self._sources: 

2085 if isinstance(source, fdsn.FDSNSource): 

2086 source_ids.append(source._source_id) 

2087 sources[source._source_id] = source 

2088 

2089 source_priority = dict( 

2090 (source_id, i) for (i, source_id) in enumerate(source_ids)) 

2091 

2092 order_groups = defaultdict(list) 

2093 for order in orders: 

2094 order_groups[order_key(order)].append(order) 

2095 

2096 for k, order_group in order_groups.items(): 

2097 order_group.sort( 

2098 key=lambda order: source_priority[order.source_id]) 

2099 

2100 n_order_groups = len(order_groups) 

2101 

2102 if len(order_groups) != 0 or len(orders) != 0: 

2103 logger.info( 

2104 'Waveform orders standing for download: %i (%i)' 

2105 % (len(order_groups), len(orders))) 

2106 

2107 task = make_task('Waveform orders processed', n_order_groups) 

2108 else: 

2109 task = None 

2110 

2111 def release_order_group(order): 

2112 okey = order_key(order) 

2113 for followup in order_groups[okey]: 

2114 if followup is not order: 

2115 split_promise(followup) 

2116 

2117 del order_groups[okey] 

2118 

2119 if task: 

2120 task.update(n_order_groups - len(order_groups)) 

2121 

2122 def noop(order): 

2123 pass 

2124 

2125 def success(order, trs): 

2126 release_order_group(order) 

2127 if order.is_near_real_time(): 

2128 if not trs: 

2129 return # keep promise when no data received at real time 

2130 else: 

2131 tmax = max(tr.tmax+tr.deltat for tr in trs) 

2132 tmax = order.tmin \ 

2133 + round((tmax - order.tmin) / order.deltat) \ 

2134 * order.deltat 

2135 split_promise(order, tmax) 

2136 else: 

2137 split_promise(order) 

2138 

2139 def batch_add(paths): 

2140 self.add(paths) 

2141 

2142 calls = queue.Queue() 

2143 

2144 def enqueue(f): 

2145 def wrapper(*args): 

2146 calls.put((f, args)) 

2147 

2148 return wrapper 

2149 

2150 while order_groups: 

2151 

2152 orders_now = [] 

2153 empty = [] 

2154 for k, order_group in order_groups.items(): 

2155 try: 

2156 orders_now.append(order_group.pop(0)) 

2157 except IndexError: 

2158 empty.append(k) 

2159 

2160 for k in empty: 

2161 del order_groups[k] 

2162 

2163 by_source_id = defaultdict(list) 

2164 for order in orders_now: 

2165 by_source_id[order.source_id].append(order) 

2166 

2167 threads = [] 

2168 for source_id in by_source_id: 

2169 def download(): 

2170 try: 

2171 sources[source_id].download_waveforms( 

2172 by_source_id[source_id], 

2173 success=enqueue(success), 

2174 error_permanent=enqueue(split_promise), 

2175 error_temporary=noop, 

2176 batch_add=enqueue(batch_add)) 

2177 

2178 finally: 

2179 calls.put(None) 

2180 

2181 if len(by_source_id) > 1: 

2182 thread = threading.Thread(target=download) 

2183 thread.start() 

2184 threads.append(thread) 

2185 else: 

2186 download() 

2187 calls.put(None) 

2188 

2189 ndone = 0 

2190 while ndone < len(by_source_id): 

2191 ret = calls.get() 

2192 if ret is None: 

2193 ndone += 1 

2194 else: 

2195 ret[0](*ret[1]) 

2196 

2197 for thread in threads: 

2198 thread.join() 

2199 

2200 if task: 

2201 task.update(n_order_groups - len(order_groups)) 

2202 

2203 if task: 

2204 task.done() 

2205 

2206 @filldocs 

2207 def get_waveform_nuts( 

2208 self, obj=None, tmin=None, tmax=None, time=None, codes=None, 

2209 codes_exclude=None, sample_rate_min=None, sample_rate_max=None, 

2210 order_only=False): 

2211 

2212 ''' 

2213 Get waveform content entities matching given constraints. 

2214 

2215 %(query_args)s 

2216 

2217 Like :py:meth:`get_nuts` with ``kind='waveform'`` but additionally 

2218 resolves matching waveform promises (downloads waveforms from remote 

2219 sources). 

2220 

2221 See :py:meth:`iter_nuts` for details on time span matching. 

2222 ''' 

2223 

2224 args = self._get_selection_args(WAVEFORM, obj, tmin, tmax, time, codes) 

2225 

2226 if self.downloads_enabled: 

2227 self._redeem_promises( 

2228 *args, 

2229 codes_exclude, 

2230 sample_rate_min, 

2231 sample_rate_max, 

2232 order_only=order_only) 

2233 

2234 nuts = sorted( 

2235 self.iter_nuts('waveform', *args), key=lambda nut: nut.dkey) 

2236 

2237 return nuts 

2238 

2239 @filldocs 

2240 def have_waveforms( 

2241 self, obj=None, tmin=None, tmax=None, time=None, codes=None): 

2242 

2243 ''' 

2244 Check if any waveforms or waveform promises are available for given 

2245 constraints. 

2246 

2247 %(query_args)s 

2248 ''' 

2249 

2250 args = self._get_selection_args(WAVEFORM, obj, tmin, tmax, time, codes) 

2251 return bool(list( 

2252 self.iter_nuts('waveform', *args, limit=1))) \ 

2253 or (self.downloads_enabled and bool(list( 

2254 self.iter_nuts('waveform_promise', *args, limit=1)))) 

2255 

2256 @filldocs 

2257 def get_waveforms( 

2258 self, obj=None, tmin=None, tmax=None, time=None, codes=None, 

2259 codes_exclude=None, sample_rate_min=None, sample_rate_max=None, 

2260 uncut=False, want_incomplete=True, degap=True, 

2261 maxgap=5, maxlap=None, snap=None, include_last=False, 

2262 load_data=True, accessor_id='default', operator_params=None, 

2263 order_only=False, channel_priorities=None): 

2264 

2265 ''' 

2266 Get waveforms matching given constraints. 

2267 

2268 %(query_args)s 

2269 

2270 :param sample_rate_min: 

2271 Consider only waveforms with a sampling rate equal to or greater 

2272 than the given value [Hz]. 

2273 :type sample_rate_min: 

2274 float 

2275 

2276 :param sample_rate_max: 

2277 Consider only waveforms with a sampling rate equal to or less than 

2278 the given value [Hz]. 

2279 :type sample_rate_max: 

2280 float 

2281 

2282 :param uncut: 

2283 Set to ``True``, to disable cutting traces to [``tmin``, ``tmax``] 

2284 and to disable degapping/deoverlapping. Returns untouched traces as 

2285 they are read from file segment. File segments are always read in 

2286 their entirety. 

2287 :type uncut: 

2288 bool 

2289 

2290 :param want_incomplete: 

2291 If ``True``, gappy/incomplete traces are included in the result. 

2292 :type want_incomplete: 

2293 bool 

2294 

2295 :param degap: 

2296 If ``True``, connect traces and remove gaps and overlaps. 

2297 :type degap: 

2298 bool 

2299 

2300 :param maxgap: 

2301 Maximum gap size in samples which is filled with interpolated 

2302 samples when ``degap`` is ``True``. 

2303 :type maxgap: 

2304 int 

2305 

2306 :param maxlap: 

2307 Maximum overlap size in samples which is removed when ``degap`` is 

2308 ``True``. 

2309 :type maxlap: 

2310 int 

2311 

2312 :param snap: 

2313 Rounding functions used when computing sample index from time 

2314 instance, for trace start and trace end, respectively. By default, 

2315 ``(round, round)`` is used. 

2316 :type snap: 

2317 :py:class:`tuple` of 2 callables 

2318 

2319 :param include_last: 

2320 If ``True``, add one more sample to the returned traces (the sample 

2321 which would be the first sample of a query with ``tmin`` set to the 

2322 current value of ``tmax``). 

2323 :type include_last: 

2324 bool 

2325 

2326 :param load_data: 

2327 If ``True``, waveform data samples are read from files (or cache). 

2328 If ``False``, meta-information-only traces are returned (dummy 

2329 traces with no data samples). 

2330 :type load_data: 

2331 bool 

2332 

2333 :param accessor_id: 

2334 Name of consumer on who's behalf data is accessed. Used in cache 

2335 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key 

2336 to distinguish different points of extraction for the decision of 

2337 when to release cached waveform data. Should be used when data is 

2338 alternately extracted from more than one region / selection. 

2339 :type accessor_id: 

2340 str 

2341 

2342 :param channel_priorities: 

2343 List of band/instrument code combinations to try. For example, 

2344 giving ``['HH', 'BH']`` would first try to get ``HH?`` channels and 

2345 then fallback to ``BH?`` if these are not available. The first 

2346 matching waveforms are returned. Use in combination with 

2347 ``sample_rate_min`` and ``sample_rate_max`` to constrain the sample 

2348 rate. 

2349 :type channel_priorities: 

2350 :py:class:`list` of :py:class:`str` 

2351 

2352 See :py:meth:`iter_nuts` for details on time span matching. 

2353 

2354 Loaded data is kept in memory (at least) until 

2355 :py:meth:`clear_accessor` has been called or 

2356 :py:meth:`advance_accessor` has been called two consecutive times 

2357 without data being accessed between the two calls (by this accessor). 

2358 Data may still be further kept in the memory cache if held alive by 

2359 consumers with a different ``accessor_id``. 

2360 ''' 

2361 

2362 tmin, tmax, codes = self._get_selection_args( 

2363 WAVEFORM, obj, tmin, tmax, time, codes) 

2364 

2365 if channel_priorities is not None: 

2366 return self._get_waveforms_prioritized( 

2367 tmin=tmin, tmax=tmax, codes=codes, codes_exclude=codes_exclude, 

2368 sample_rate_min=sample_rate_min, 

2369 sample_rate_max=sample_rate_max, 

2370 uncut=uncut, want_incomplete=want_incomplete, degap=degap, 

2371 maxgap=maxgap, maxlap=maxlap, snap=snap, 

2372 include_last=include_last, load_data=load_data, 

2373 accessor_id=accessor_id, operator_params=operator_params, 

2374 order_only=order_only, channel_priorities=channel_priorities) 

2375 

2376 kinds = ['waveform'] 

2377 if self.downloads_enabled: 

2378 kinds.append('waveform_promise') 

2379 

2380 self_tmin, self_tmax = self.get_time_span(kinds) 

2381 

2382 if None in (self_tmin, self_tmax): 

2383 logger.warning( 

2384 'No waveforms available.') 

2385 return [] 

2386 

2387 tmin = tmin if tmin is not None else self_tmin 

2388 tmax = tmax if tmax is not None else self_tmax 

2389 

2390 if codes is not None and len(codes) == 1: 

2391 # TODO: fix for multiple / mixed codes 

2392 operator = self.get_operator(codes[0]) 

2393 if operator is not None: 

2394 return operator.get_waveforms( 

2395 self, codes[0], 

2396 tmin=tmin, tmax=tmax, 

2397 uncut=uncut, want_incomplete=want_incomplete, degap=degap, 

2398 maxgap=maxgap, maxlap=maxlap, snap=snap, 

2399 include_last=include_last, load_data=load_data, 

2400 accessor_id=accessor_id, params=operator_params) 

2401 

2402 nuts = self.get_waveform_nuts( 

2403 obj, tmin, tmax, time, codes, codes_exclude, sample_rate_min, 

2404 sample_rate_max, order_only=order_only) 

2405 

2406 if order_only: 

2407 return [] 

2408 

2409 if load_data: 

2410 traces = [ 

2411 self.get_content(nut, 'waveform', accessor_id) for nut in nuts] 

2412 

2413 else: 

2414 traces = [ 

2415 trace.Trace(**nut.trace_kwargs) for nut in nuts] 

2416 

2417 if uncut: 

2418 return traces 

2419 

2420 if snap is None: 

2421 snap = (round, round) 

2422 

2423 chopped = [] 

2424 for tr in traces: 

2425 if not load_data and tr.ydata is not None: 

2426 tr = tr.copy(data=False) 

2427 tr.ydata = None 

2428 

2429 try: 

2430 chopped.append(tr.chop( 

2431 tmin, tmax, 

2432 inplace=False, 

2433 snap=snap, 

2434 include_last=include_last)) 

2435 

2436 except trace.NoData: 

2437 pass 

2438 

2439 processed = self._process_chopped( 

2440 chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax) 

2441 

2442 return processed 

2443 

2444 def _get_waveforms_prioritized( 

2445 self, tmin=None, tmax=None, codes=None, codes_exclude=None, 

2446 channel_priorities=None, **kwargs): 

2447 

2448 trs_all = [] 

2449 codes_have = set() 

2450 for channel in channel_priorities: 

2451 assert len(channel) == 2 

2452 

2453 if codes is not None: 

2454 re_channel = re.compile( 

2455 r'^([' + channel[0] + r'?][' + channel[1] + r'?]|\*)') 

2456 

2457 codes_now = [] 

2458 for codes_ in codes: 

2459 if codes_.channel == '*': 

2460 channel_now, n = channel + '?', 1 

2461 else: 

2462 channel_now, n = re_channel.subn( 

2463 channel, codes_.channel) 

2464 

2465 if n == 1: 

2466 codes_now.append(codes_.replace(channel=channel_now)) 

2467 

2468 else: 

2469 codes_now = model.CodesNSLCE('*', '*', '*', channel+'?') 

2470 

2471 if not codes_now: 

2472 continue 

2473 

2474 codes_exclude_now = list(set( 

2475 codes_.replace(channel=channel+codes_.channel[-1]) 

2476 for codes_ in codes_have)) 

2477 

2478 if codes_exclude: 

2479 codes_exclude_now.extend(codes_exclude) 

2480 

2481 trs = self.get_waveforms( 

2482 tmin=tmin, 

2483 tmax=tmax, 

2484 codes=codes_now, 

2485 codes_exclude=codes_exclude_now, 

2486 **kwargs) 

2487 

2488 codes_have.update(set(tr.codes for tr in trs)) 

2489 trs_all.extend(trs) 

2490 

2491 return trs_all 

2492 

2493 @filldocs 

2494 def chopper_waveforms( 

2495 self, obj=None, tmin=None, tmax=None, time=None, codes=None, 

2496 codes_exclude=None, sample_rate_min=None, sample_rate_max=None, 

2497 tinc=None, tpad=0., 

2498 want_incomplete=True, snap_window=False, 

2499 degap=True, maxgap=5, maxlap=None, 

2500 snap=None, include_last=False, load_data=True, 

2501 accessor_id=None, clear_accessor=True, operator_params=None, 

2502 grouping=None, channel_priorities=None): 

2503 

2504 ''' 

2505 Iterate window-wise over waveform archive. 

2506 

2507 %(query_args)s 

2508 

2509 :param tinc: 

2510 Time increment (window shift time) (default uses ``tmax-tmin``). 

2511 :type tinc: 

2512 :py:func:`~pyrocko.util.get_time_float` 

2513 

2514 :param tpad: 

2515 Padding time appended on either side of the data window (window 

2516 overlap is ``2*tpad``). 

2517 :type tpad: 

2518 :py:func:`~pyrocko.util.get_time_float` 

2519 

2520 :param want_incomplete: 

2521 If ``True``, gappy/incomplete traces are included in the result. 

2522 :type want_incomplete: 

2523 bool 

2524 

2525 :param snap_window: 

2526 If ``True``, start time windows at multiples of tinc with respect 

2527 to system time zero. 

2528 :type snap_window: 

2529 bool 

2530 

2531 :param degap: 

2532 If ``True``, connect traces and remove gaps and overlaps. 

2533 :type degap: 

2534 bool 

2535 

2536 :param maxgap: 

2537 Maximum gap size in samples which is filled with interpolated 

2538 samples when ``degap`` is ``True``. 

2539 :type maxgap: 

2540 int 

2541 

2542 :param maxlap: 

2543 Maximum overlap size in samples which is removed when ``degap`` is 

2544 ``True``. 

2545 :type maxlap: 

2546 int 

2547 

2548 :param snap: 

2549 Rounding functions used when computing sample index from time 

2550 instance, for trace start and trace end, respectively. By default, 

2551 ``(round, round)`` is used. 

2552 :type snap: 

2553 :py:class:`tuple` of 2 callables 

2554 

2555 :param include_last: 

2556 If ``True``, add one more sample to the returned traces (the sample 

2557 which would be the first sample of a query with ``tmin`` set to the 

2558 current value of ``tmax``). 

2559 :type include_last: 

2560 bool 

2561 

2562 :param load_data: 

2563 If ``True``, waveform data samples are read from files (or cache). 

2564 If ``False``, meta-information-only traces are returned (dummy 

2565 traces with no data samples). 

2566 :type load_data: 

2567 bool 

2568 

2569 :param accessor_id: 

2570 Name of consumer on who's behalf data is accessed. Used in cache 

2571 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key 

2572 to distinguish different points of extraction for the decision of 

2573 when to release cached waveform data. Should be used when data is 

2574 alternately extracted from more than one region / selection. 

2575 :type accessor_id: 

2576 str 

2577 

2578 :param clear_accessor: 

2579 If ``True`` (default), :py:meth:`clear_accessor` is called when the 

2580 chopper finishes. Set to ``False`` to keep loaded waveforms in 

2581 memory when the generator returns. 

2582 :type clear_accessor: 

2583 bool 

2584 

2585 :param grouping: 

2586 By default, traversal over the data is over time and all matching 

2587 traces of a time window are yielded. Using this option, it is 

2588 possible to traverse the data first by group (e.g. station or 

2589 network) and second by time. This can reduce the number of traces 

2590 in each batch and thus reduce the memory footprint of the process. 

2591 :type grouping: 

2592 :py:class:`~pyrocko.squirrel.operators.base.Grouping` 

2593 

2594 :yields: 

2595 For each extracted time window or waveform group a 

2596 :py:class:`Batch` object is yielded. 

2597 

2598 See :py:meth:`iter_nuts` for details on time span matching. 

2599 ''' 

2600 

2601 tmin, tmax, codes = self._get_selection_args( 

2602 WAVEFORM, obj, tmin, tmax, time, codes) 

2603 

2604 kinds = ['waveform'] 

2605 if self.downloads_enabled: 

2606 kinds.append('waveform_promise') 

2607 

2608 self_tmin, self_tmax = self.get_time_span(kinds) 

2609 

2610 if None in (self_tmin, self_tmax): 

2611 logger.warning( 

2612 'Content has undefined time span. No waveforms and no ' 

2613 'waveform promises?') 

2614 return 

2615 

2616 if snap_window and tinc is not None: 

2617 tmin = tmin if tmin is not None else self_tmin 

2618 tmax = tmax if tmax is not None else self_tmax 

2619 tmin = math.floor(tmin / tinc) * tinc 

2620 tmax = math.ceil(tmax / tinc) * tinc 

2621 else: 

2622 tmin = tmin if tmin is not None else self_tmin + tpad 

2623 tmax = tmax if tmax is not None else self_tmax - tpad 

2624 

2625 if tinc is None: 

2626 tinc = tmax - tmin 

2627 nwin = 1 

2628 elif tinc == 0.0: 

2629 nwin = 1 

2630 else: 

2631 eps = 1e-6 

2632 nwin = max(1, int((tmax - tmin) / tinc - eps) + 1) 

2633 

2634 try: 

2635 if accessor_id is None: 

2636 accessor_id = 'chopper%i' % self._n_choppers_active 

2637 

2638 self._n_choppers_active += 1 

2639 

2640 if grouping is None: 

2641 codes_list = [codes] 

2642 else: 

2643 operator = Operator( 

2644 filtering=CodesPatternFiltering(codes=codes), 

2645 grouping=grouping) 

2646 

2647 available = set(self.get_codes(kind='waveform')) 

2648 if self.downloads_enabled: 

2649 available.update(self.get_codes(kind='waveform_promise')) 

2650 operator.update_mappings(sorted(available)) 

2651 

2652 codes_list = [ 

2653 codes_patterns_list(scl) 

2654 for scl in operator.iter_in_codes()] 

2655 

2656 ngroups = len(codes_list) 

2657 for igroup, scl in enumerate(codes_list): 

2658 for iwin in range(nwin): 

2659 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax) 

2660 

2661 chopped = self.get_waveforms( 

2662 tmin=wmin-tpad, 

2663 tmax=wmax+tpad, 

2664 codes=scl, 

2665 codes_exclude=codes_exclude, 

2666 sample_rate_min=sample_rate_min, 

2667 sample_rate_max=sample_rate_max, 

2668 snap=snap, 

2669 include_last=include_last, 

2670 load_data=load_data, 

2671 want_incomplete=want_incomplete, 

2672 degap=degap, 

2673 maxgap=maxgap, 

2674 maxlap=maxlap, 

2675 accessor_id=accessor_id, 

2676 operator_params=operator_params, 

2677 channel_priorities=channel_priorities) 

2678 

2679 self.advance_accessor(accessor_id) 

2680 

2681 yield Batch( 

2682 tmin=wmin, 

2683 tmax=wmax, 

2684 i=iwin, 

2685 n=nwin, 

2686 igroup=igroup, 

2687 ngroups=ngroups, 

2688 traces=chopped) 

2689 

2690 finally: 

2691 self._n_choppers_active -= 1 

2692 if clear_accessor: 

2693 self.clear_accessor(accessor_id, 'waveform') 

2694 

2695 def _process_chopped( 

2696 self, chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax): 

2697 

2698 chopped.sort(key=lambda a: a.full_id) 

2699 if degap: 

2700 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap) 

2701 

2702 if not want_incomplete: 

2703 chopped_weeded = [] 

2704 for tr in chopped: 

2705 emin = tr.tmin - tmin 

2706 emax = tr.tmax + tr.deltat - tmax 

2707 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat): 

2708 chopped_weeded.append(tr) 

2709 

2710 elif degap: 

2711 if (0. < emin <= 5. * tr.deltat 

2712 and -5. * tr.deltat <= emax < 0.): 

2713 

2714 tr.extend(tmin, tmax-tr.deltat, fillmethod='repeat') 

2715 chopped_weeded.append(tr) 

2716 

2717 chopped = chopped_weeded 

2718 

2719 return chopped 

2720 

2721 def _get_pyrocko_stations( 

2722 self, obj=None, tmin=None, tmax=None, time=None, codes=None, 

2723 on_error='raise'): 

2724 

2725 from pyrocko import model as pmodel 

2726 

2727 if codes is not None: 

2728 codes = codes_patterns_for_kind(STATION, codes) 

2729 

2730 by_nsl = defaultdict(lambda: (list(), list())) 

2731 for station in self.get_stations(obj, tmin, tmax, time, codes): 

2732 sargs = station._get_pyrocko_station_args() 

2733 by_nsl[station.codes.nsl][0].append(sargs) 

2734 

2735 if codes is not None: 

2736 codes = [model.CodesNSLCE(c) for c in codes] 

2737 

2738 for channel in self.get_channels(obj, tmin, tmax, time, codes): 

2739 sargs = channel._get_pyrocko_station_args() 

2740 sargs_list, channels_list = by_nsl[channel.codes.nsl] 

2741 sargs_list.append(sargs) 

2742 channels_list.append(channel) 

2743 

2744 pstations = [] 

2745 nsls = list(by_nsl.keys()) 

2746 nsls.sort() 

2747 for nsl in nsls: 

2748 sargs_list, channels_list = by_nsl[nsl] 

2749 sargs = util.consistency_merge( 

2750 [('',) + x for x in sargs_list], 

2751 error=on_error) 

2752 

2753 by_c = defaultdict(list) 

2754 for ch in channels_list: 

2755 by_c[ch.codes.channel].append(ch._get_pyrocko_channel_args()) 

2756 

2757 chas = list(by_c.keys()) 

2758 chas.sort() 

2759 pchannels = [] 

2760 for cha in chas: 

2761 list_of_cargs = by_c[cha] 

2762 cargs = util.consistency_merge( 

2763 [('',) + x for x in list_of_cargs], 

2764 error=on_error) 

2765 pchannels.append(pmodel.Channel(*cargs)) 

2766 

2767 pstations.append( 

2768 pmodel.Station(*sargs, channels=pchannels)) 

2769 

2770 return pstations 

2771 

2772 @property 

2773 def pile(self): 

2774 

2775 ''' 

2776 Emulates the older :py:class:`pyrocko.pile.Pile` interface. 

2777 

2778 This property exposes a :py:class:`pyrocko.squirrel.pile.Pile` object, 

2779 which emulates most of the older :py:class:`pyrocko.pile.Pile` methods 

2780 but uses the fluffy power of the Squirrel under the hood. 

2781 

2782 This interface can be used as a drop-in replacement for piles which are 

2783 used in existing scripts and programs for efficient waveform data 

2784 access. The Squirrel-based pile scales better for large datasets. Newer 

2785 scripts should use Squirrel's native methods to avoid the emulation 

2786 overhead. 

2787 ''' 

2788 from . import pile 

2789 

2790 if self._pile is None: 

2791 self._pile = pile.Pile(self) 

2792 

2793 return self._pile 

2794 

2795 def snuffle(self, **kwargs): 

2796 ''' 

2797 Look at dataset in Snuffler. 

2798 ''' 

2799 self.pile.snuffle(**kwargs) 

2800 

2801 def _gather_codes_keys(self, kind, gather, selector): 

2802 return set( 

2803 gather(codes) 

2804 for codes in self.iter_codes(kind) 

2805 if selector is None or selector(codes)) 

2806 

2807 def __str__(self): 

2808 return str(self.get_stats()) 

2809 

2810 def get_coverage( 

2811 self, kind, tmin=None, tmax=None, codes=None, limit=None): 

2812 

2813 ''' 

2814 Get coverage information. 

2815 

2816 Get information about strips of gapless data coverage. 

2817 

2818 :param kind: 

2819 Content kind to be queried. 

2820 :type kind: 

2821 str 

2822 

2823 :param tmin: 

2824 Start time of query interval. 

2825 :type tmin: 

2826 :py:func:`~pyrocko.util.get_time_float` 

2827 

2828 :param tmax: 

2829 End time of query interval. 

2830 :type tmax: 

2831 :py:func:`~pyrocko.util.get_time_float` 

2832 

2833 :param codes: 

2834 If given, restrict query to given content codes patterns. 

2835 :type codes: 

2836 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes` 

2837 objects appropriate for the queried content type, or anything which 

2838 can be converted to such objects. 

2839 

2840 :param limit: 

2841 Limit query to return only up to a given maximum number of entries 

2842 per matching time series (without setting this option, very gappy 

2843 data could cause the query to execute for a very long time). 

2844 :type limit: 

2845 int 

2846 

2847 :returns: 

2848 Information about time spans covered by the requested time series 

2849 data. 

2850 :rtype: 

2851 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Coverage` 

2852 ''' 

2853 

2854 tmin_seconds, tmin_offset = model.tsplit(tmin) 

2855 tmax_seconds, tmax_offset = model.tsplit(tmax) 

2856 kind_id = to_kind_id(kind) 

2857 

2858 codes_info = list(self._iter_codes_info(kind=kind)) 

2859 

2860 kdata_all = [] 

2861 if codes is None: 

2862 for _, codes_entry, deltat, kind_codes_id, _ in codes_info: 

2863 kdata_all.append( 

2864 (codes_entry, kind_codes_id, codes_entry, deltat)) 

2865 

2866 else: 

2867 for codes_entry in codes: 

2868 pattern = to_codes(kind_id, codes_entry) 

2869 for _, codes_entry, deltat, kind_codes_id, _ in codes_info: 

2870 if model.match_codes(pattern, codes_entry): 

2871 kdata_all.append( 

2872 (pattern, kind_codes_id, codes_entry, deltat)) 

2873 

2874 kind_codes_ids = [x[1] for x in kdata_all] 

2875 

2876 counts_at_tmin = {} 

2877 if tmin is not None: 

2878 for nut in self.iter_nuts( 

2879 kind, tmin, tmin, kind_codes_ids=kind_codes_ids): 

2880 

2881 k = nut.codes, nut.deltat 

2882 if k not in counts_at_tmin: 

2883 counts_at_tmin[k] = 0 

2884 

2885 counts_at_tmin[k] += 1 

2886 

2887 coverages = [] 

2888 for pattern, kind_codes_id, codes_entry, deltat in kdata_all: 

2889 entry = [pattern, codes_entry, deltat, None, None, []] 

2890 for i, order in [(0, 'ASC'), (1, 'DESC')]: 

2891 sql = self._sql(''' 

2892 SELECT 

2893 time_seconds, 

2894 time_offset 

2895 FROM %(db)s.%(coverage)s 

2896 WHERE 

2897 kind_codes_id == ? 

2898 ORDER BY 

2899 kind_codes_id ''' + order + ''', 

2900 time_seconds ''' + order + ''', 

2901 time_offset ''' + order + ''' 

2902 LIMIT 1 

2903 ''') 

2904 

2905 for row in self._conn.execute(sql, [kind_codes_id]): 

2906 entry[3+i] = model.tjoin(row[0], row[1]) 

2907 

2908 if None in entry[3:5]: 

2909 continue 

2910 

2911 args = [kind_codes_id] 

2912 

2913 sql_time = '' 

2914 if tmin is not None: 

2915 # intentionally < because (== tmin) is queried from nuts 

2916 sql_time += ' AND ( ? < time_seconds ' \ 

2917 'OR ( ? == time_seconds AND ? < time_offset ) ) ' 

2918 args.extend([tmin_seconds, tmin_seconds, tmin_offset]) 

2919 

2920 if tmax is not None: 

2921 sql_time += ' AND ( time_seconds < ? ' \ 

2922 'OR ( ? == time_seconds AND time_offset <= ? ) ) ' 

2923 args.extend([tmax_seconds, tmax_seconds, tmax_offset]) 

2924 

2925 sql_limit = '' 

2926 if limit is not None: 

2927 sql_limit = ' LIMIT ?' 

2928 args.append(limit) 

2929 

2930 sql = self._sql(''' 

2931 SELECT 

2932 time_seconds, 

2933 time_offset, 

2934 step 

2935 FROM %(db)s.%(coverage)s 

2936 WHERE 

2937 kind_codes_id == ? 

2938 ''' + sql_time + ''' 

2939 ORDER BY 

2940 kind_codes_id, 

2941 time_seconds, 

2942 time_offset 

2943 ''' + sql_limit) 

2944 

2945 rows = list(self._conn.execute(sql, args)) 

2946 

2947 if limit is not None and len(rows) == limit: 

2948 entry[-1] = None 

2949 else: 

2950 counts = counts_at_tmin.get((codes_entry, deltat), 0) 

2951 tlast = None 

2952 if tmin is not None: 

2953 entry[-1].append((tmin, counts)) 

2954 tlast = tmin 

2955 

2956 for row in rows: 

2957 t = model.tjoin(row[0], row[1]) 

2958 counts += row[2] 

2959 entry[-1].append((t, counts)) 

2960 tlast = t 

2961 

2962 if tmax is not None and (tlast is None or tlast != tmax): 

2963 entry[-1].append((tmax, counts)) 

2964 

2965 coverages.append(model.Coverage.from_values(entry + [kind_id])) 

2966 

2967 return coverages 

2968 

2969 def get_stationxml( 

2970 self, obj=None, tmin=None, tmax=None, time=None, codes=None, 

2971 level='response', on_error='raise'): 

2972 

2973 ''' 

2974 Get station/channel/response metadata in StationXML representation. 

2975 

2976 %(query_args)s 

2977 

2978 :returns: 

2979 :py:class:`~pyrocko.io.stationxml.FDSNStationXML` object. 

2980 ''' 

2981 

2982 if level not in ('network', 'station', 'channel', 'response'): 

2983 raise ValueError('Invalid level: %s' % level) 

2984 

2985 tmin, tmax, codes = self._get_selection_args( 

2986 CHANNEL, obj, tmin, tmax, time, codes) 

2987 

2988 def tts(t): 

2989 if t is None: 

2990 return '<none>' 

2991 else: 

2992 return util.tts(t, format='%Y-%m-%d %H:%M:%S') 

2993 

2994 if on_error == 'ignore': 

2995 def handle_error(exc): 

2996 pass 

2997 

2998 elif on_error == 'warn': 

2999 def handle_error(exc): 

3000 logger.warning(str(exc)) 

3001 

3002 elif on_error == 'raise': 

3003 def handle_error(exc): 

3004 raise exc 

3005 

3006 def use_first(node_type_name, codes, k, group): 

3007 if on_error == 'warn': 

3008 logger.warning( 

3009 'Duplicates for %s %s, %s - %s -> using first' % ( 

3010 node_type_name, 

3011 '.'.join(codes), 

3012 tts(k[0]), tts(k[1]))) 

3013 

3014 return group[0] 

3015 

3016 def deduplicate(node_type_name, codes, nodes): 

3017 groups = defaultdict(list) 

3018 for node in nodes: 

3019 k = (node.start_date, node.end_date) 

3020 groups[k].append(node) 

3021 

3022 return [ 

3023 use_first(node_type_name, codes, k, group) 

3024 for (k, group) in groups.items()] 

3025 

3026 filtering = CodesPatternFiltering(codes=codes) 

3027 

3028 nslcs = list(set( 

3029 codes.nslc for codes in 

3030 filtering.filter(self.get_codes(kind='channel')))) 

3031 

3032 from pyrocko.io import stationxml as sx 

3033 

3034 networks = [] 

3035 task_networks = make_task('StationXML: add networks') 

3036 for net, stas in task_networks(prefix_tree(nslcs)): 

3037 network = sx.Network(code=net) 

3038 networks.append(network) 

3039 

3040 if level not in ('station', 'channel', 'response'): 

3041 continue 

3042 

3043 task_stations = make_task('StationXML: add stations') 

3044 for sta, locs in task_stations(stas): 

3045 stations = self.get_stations( 

3046 tmin=tmin, 

3047 tmax=tmax, 

3048 codes=(net, sta, '*'), 

3049 model='stationxml') 

3050 

3051 if on_error != 'raise': 

3052 stations = deduplicate( 

3053 'Station', (net, sta), stations) 

3054 

3055 errors = sx.check_overlaps( 

3056 'Station', (net, sta), stations) 

3057 

3058 if errors: 

3059 handle_error(error.Duplicate( 

3060 'Overlapping/duplicate station info:\n %s' 

3061 % '\n '.join(errors))) 

3062 

3063 network.station_list.extend(stations) 

3064 

3065 if level not in ('channel', 'response'): 

3066 continue 

3067 

3068 for loc, chas in locs: 

3069 for cha, _ in chas: 

3070 channels = self.get_channels( 

3071 tmin=tmin, 

3072 tmax=tmax, 

3073 codes=(net, sta, loc, cha), 

3074 model='stationxml') 

3075 

3076 if on_error != 'raise': 

3077 channels = deduplicate( 

3078 'Channel', (net, sta, loc, cha), channels) 

3079 

3080 errors = sx.check_overlaps( 

3081 'Channel', (net, sta, loc, cha), channels) 

3082 

3083 if errors: 

3084 handle_error(error.Duplicate( 

3085 'Overlapping/duplicate channel info:\n %s' 

3086 % '\n '.join(errors))) 

3087 

3088 for channel in channels: 

3089 station = sx.find_containing(stations, channel) 

3090 if station is not None: 

3091 station.channel_list.append(channel) 

3092 else: 

3093 handle_error(error.NotAvailable( 

3094 'No station or station epoch found ' 

3095 'for channel: %s' % '.'.join( 

3096 (net, sta, loc, cha)))) 

3097 

3098 continue 

3099 

3100 if level != 'response': 

3101 continue 

3102 

3103 try: 

3104 response_sq, response_sx = self.get_response( 

3105 codes=(net, sta, loc, cha), 

3106 tmin=channel.start_date, 

3107 tmax=channel.end_date, 

3108 model='stationxml+', 

3109 on_duplicate=on_error) 

3110 

3111 except error.NotAvailable as e: 

3112 handle_error(e) 

3113 continue 

3114 

3115 if not ( 

3116 sx.eq_open( 

3117 channel.start_date, response_sq.tmin) 

3118 and sx.eq_open( 

3119 channel.end_date, response_sq.tmax)): 

3120 

3121 handle_error(error.Inconsistencies( 

3122 'Response time span does not match ' 

3123 'channel time span: %s' % '.'.join( 

3124 (net, sta, loc, cha)))) 

3125 

3126 channel.response = response_sx 

3127 

3128 return sx.FDSNStationXML( 

3129 source='Generated by Pyrocko Squirrel.', 

3130 network_list=networks) 

3131 

3132 def add_operator(self, op): 

3133 self._operators.append(op) 

3134 

3135 def update_operator_mappings(self): 

3136 available = self.get_codes(kind=('channel')) 

3137 

3138 for operator in self._operators: 

3139 operator.update_mappings(available, self._operator_registry) 

3140 

3141 def iter_operator_mappings(self): 

3142 for operator in self._operators: 

3143 for in_codes, out_codes in operator.iter_mappings(): 

3144 yield operator, in_codes, out_codes 

3145 

3146 def get_operator_mappings(self): 

3147 return list(self.iter_operator_mappings()) 

3148 

3149 def get_operator(self, codes): 

3150 try: 

3151 return self._operator_registry[codes][0] 

3152 except KeyError: 

3153 return None 

3154 

3155 def get_operator_group(self, codes): 

3156 try: 

3157 return self._operator_registry[codes] 

3158 except KeyError: 

3159 return None, (None, None, None) 

3160 

3161 def iter_operator_codes(self): 

3162 for _, _, out_codes in self.iter_operator_mappings(): 

3163 for codes in out_codes: 

3164 yield codes 

3165 

3166 def get_operator_codes(self): 

3167 return list(self.iter_operator_codes()) 

3168 

3169 def print_tables(self, table_names=None, stream=None): 

3170 ''' 

3171 Dump raw database tables in textual form (for debugging purposes). 

3172 

3173 :param table_names: 

3174 Names of tables to be dumped or ``None`` to dump all. 

3175 :type table_names: 

3176 :py:class:`list` of :py:class:`str` 

3177 

3178 :param stream: 

3179 Open file or ``None`` to dump to standard output. 

3180 ''' 

3181 

3182 if stream is None: 

3183 stream = sys.stdout 

3184 

3185 if isinstance(table_names, str): 

3186 table_names = [table_names] 

3187 

3188 if table_names is None: 

3189 table_names = [ 

3190 'selection_file_states', 

3191 'selection_nuts', 

3192 'selection_kind_codes_count', 

3193 'files', 'nuts', 'kind_codes', 'kind_codes_count'] 

3194 

3195 m = { 

3196 'selection_file_states': '%(db)s.%(file_states)s', 

3197 'selection_nuts': '%(db)s.%(nuts)s', 

3198 'selection_kind_codes_count': '%(db)s.%(kind_codes_count)s', 

3199 'files': 'files', 

3200 'nuts': 'nuts', 

3201 'kind_codes': 'kind_codes', 

3202 'kind_codes_count': 'kind_codes_count'} 

3203 

3204 for table_name in table_names: 

3205 self._database.print_table( 

3206 m[table_name] % self._names, stream=stream) 

3207 

3208 

3209class SquirrelStats(Object): 

3210 ''' 

3211 Container to hold statistics about contents available from a Squirrel. 

3212 

3213 See also :py:meth:`Squirrel.get_stats`. 

3214 ''' 

3215 

3216 nfiles = Int.T( 

3217 help='Number of files in selection.') 

3218 nnuts = Int.T( 

3219 help='Number of index nuts in selection.') 

3220 codes = List.T( 

3221 Tuple.T(content_t=String.T()), 

3222 help='Available code sequences in selection, e.g. ' 

3223 '(agency, network, station, location) for stations nuts.') 

3224 kinds = List.T( 

3225 String.T(), 

3226 help='Available content types in selection.') 

3227 total_size = Int.T( 

3228 help='Aggregated file size of files is selection.') 

3229 counts = Dict.T( 

3230 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()), 

3231 help='Breakdown of how many nuts of any content type and code ' 

3232 'sequence are available in selection, ``counts[kind][codes]``.') 

3233 time_spans = Dict.T( 

3234 String.T(), Tuple.T(content_t=Timestamp.T()), 

3235 help='Time spans by content type.') 

3236 sources = List.T( 

3237 String.T(), 

3238 help='Descriptions of attached sources.') 

3239 operators = List.T( 

3240 String.T(), 

3241 help='Descriptions of attached operators.') 

3242 

3243 def __str__(self): 

3244 kind_counts = dict( 

3245 (kind, sum(self.counts[kind].values())) for kind in self.kinds) 

3246 

3247 scodes = model.codes_to_str_abbreviated(self.codes) 

3248 

3249 ssources = '<none>' if not self.sources else '\n' + '\n'.join( 

3250 ' ' + s for s in self.sources) 

3251 

3252 soperators = '<none>' if not self.operators else '\n' + '\n'.join( 

3253 ' ' + s for s in self.operators) 

3254 

3255 def stime(t): 

3256 return util.tts(t) if t is not None and t not in ( 

3257 model.g_tmin, model.g_tmax) else '<none>' 

3258 

3259 def stable(rows): 

3260 ns = [max(len(w) for w in col) for col in zip(*rows)] 

3261 return '\n'.join( 

3262 ' '.join(w.ljust(n) for n, w in zip(ns, row)) 

3263 for row in rows) 

3264 

3265 def indent(s): 

3266 return '\n'.join(' '+line for line in s.splitlines()) 

3267 

3268 stspans = '<none>' if not self.kinds else '\n' + indent(stable([( 

3269 kind + ':', 

3270 str(kind_counts[kind]), 

3271 stime(self.time_spans[kind][0]), 

3272 '-', 

3273 stime(self.time_spans[kind][1])) for kind in sorted(self.kinds)])) 

3274 

3275 s = ''' 

3276Number of files: %i 

3277Total size of known files: %s 

3278Number of index nuts: %i 

3279Available content kinds: %s 

3280Available codes: %s 

3281Sources: %s 

3282Operators: %s''' % ( 

3283 self.nfiles, 

3284 util.human_bytesize(self.total_size), 

3285 self.nnuts, 

3286 stspans, scodes, ssources, soperators) 

3287 

3288 return s.lstrip() 

3289 

3290 

3291__all__ = [ 

3292 'Squirrel', 

3293 'SquirrelStats', 

3294]