Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/selection.py: 89%

230 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-12-04 10:41 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Meta-data caching for flexible file selections. 

8''' 

9 

10import os 

11import re 

12import threading 

13import logging 

14 

15from pyrocko import util 

16from pyrocko.io.io_common import FileLoadError 

17from pyrocko import progress 

18 

19from . import error, io, model 

20from .database import Database, get_database, execute_get1, abspath 

21 

22logger = logging.getLogger('psq.selection') 

23 

24g_icount = 0 

25g_lock = threading.Lock() 

26 

27re_persistent_name = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]{0,64}$') 

28 

29 

30def make_unique_name(): 

31 with g_lock: 

32 global g_icount 

33 name = '%i_%i' % (os.getpid(), g_icount) 

34 g_icount += 1 

35 

36 return name 

37 

38 

39def make_task(*args): 

40 return progress.task(*args, logger=logger) 

41 

42 

43doc_snippets = dict( 

44 query_args=''' 

45 :param obj: 

46 Object providing ``tmin``, ``tmax`` and ``codes`` to be used to 

47 constrain the query. Direct arguments override those from ``obj``. 

48 :type obj: 

49 any object with attributes ``tmin``, ``tmax`` and ``codes`` 

50 

51 :param tmin: 

52 Start time of query interval. 

53 :type tmin: 

54 :py:func:`~pyrocko.util.get_time_float` 

55 

56 :param tmax: 

57 End time of query interval. 

58 :type tmax: 

59 :py:func:`~pyrocko.util.get_time_float` 

60 

61 :param time: 

62 Time instant to query. Equivalent to setting ``tmin`` and ``tmax`` 

63 to the same value. 

64 :type time: 

65 :py:func:`~pyrocko.util.get_time_float` 

66 

67 :param codes: 

68 Pattern of content codes to query. 

69 :type codes: 

70 :class:`list` of :py:class:`~pyrocko.squirrel.model.Codes` 

71 objects appropriate for the queried content type, or anything which 

72 can be converted to such objects. 

73''', 

74 file_formats=', '.join( 

75 "``'%s'``" % fmt for fmt in io.supported_formats())) 

76 

77 

78def filldocs(meth): 

79 meth.__doc__ %= doc_snippets 

80 return meth 

81 

82 

83class GeneratorWithLen(object): 

84 

85 def __init__(self, gen, length): 

86 self.gen = gen 

87 self.length = length 

88 

89 def __len__(self): 

90 return self.length 

91 

92 def __iter__(self): 

93 return self.gen 

94 

95 

96class Selection(object): 

97 

98 ''' 

99 Database backed file selection (base class for 

100 :py:class:`~pyrocko.squirrel.base.Squirrel`). 

101 

102 :param database: 

103 Database instance or file path to database. 

104 :type database: 

105 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str` 

106 

107 :param persistent: 

108 If given a name, create a persistent selection. 

109 :type persistent: 

110 :py:class:`str` 

111 

112 A selection in this context represents the list of files available to the 

113 application. Instead of using :py:class:`Selection` directly, user 

114 applications should usually use its subclass 

115 :py:class:`~pyrocko.squirrel.base.Squirrel` which adds content indices to 

116 the selection and provides high level data querying. 

117 

118 By default, a temporary table in the database is created to hold the names 

119 of the files in the selection. This table is only visible inside the 

120 application which created it. If a name is given to ``persistent``, a named 

121 selection is created, which is visible also in other applications using the 

122 same database. 

123 

124 Besides the filename references, desired content kind masks and file format 

125 indications are stored in the selection's database table to make the user 

126 choice regarding these options persistent on a per-file basis. Book-keeping 

127 on whether files are unknown, known or if modification checks are forced is 

128 handled in the selection's file-state table. 

129 

130 Paths of files can be added to the selection using the :py:meth:`add` 

131 method and removed with :py:meth:`remove`. :py:meth:`undig_grouped` can be 

132 used to iterate over all content known to the selection. 

133 ''' 

134 

135 def __init__(self, database, persistent=None): 

136 self._conn = None 

137 

138 if not isinstance(database, Database): 

139 database = get_database(database) 

140 

141 if persistent is not None: 

142 assert isinstance(persistent, str) 

143 if not re_persistent_name.match(persistent): 

144 raise error.SquirrelError( 

145 'invalid persistent selection name: %s' % persistent) 

146 

147 self.name = 'psel_' + persistent 

148 else: 

149 self.name = 'sel_' + make_unique_name() 

150 

151 self._persistent = persistent 

152 self._database = database 

153 self._database.add_selection(self) 

154 

155 self._conn = self._database.get_connection() 

156 self._sources = [] 

157 self._is_new = True 

158 self._volatile_paths = [] 

159 

160 with self.transaction('init selection') as cursor: 

161 

162 if persistent is not None: 

163 self._is_new = 1 == cursor.execute( 

164 ''' 

165 INSERT OR IGNORE INTO persistent VALUES (?) 

166 ''', (persistent,)).rowcount 

167 

168 self._names = { 

169 'db': 'main' if self._persistent else 'temp', 

170 'file_states': self.name + '_file_states', 

171 'bulkinsert': self.name + '_bulkinsert'} 

172 

173 cursor.execute(self._register_table(self._sql( 

174 ''' 

175 CREATE TABLE IF NOT EXISTS %(db)s.%(file_states)s ( 

176 file_id integer PRIMARY KEY, 

177 file_state integer, 

178 kind_mask integer, 

179 format text) 

180 '''))) 

181 

182 cursor.execute(self._sql( 

183 ''' 

184 CREATE INDEX 

185 IF NOT EXISTS %(db)s.%(file_states)s_index_file_state 

186 ON %(file_states)s (file_state) 

187 ''')) 

188 

189 def __del__(self): 

190 self.close() 

191 

192 def close(self, delete_persistent=False): 

193 if hasattr(self, '_conn') and self._conn: 

194 self._cleanup() 

195 if not self._persistent or delete_persistent: 

196 self._delete() 

197 

198 self._conn = None 

199 self._database.remove_selection(self) 

200 

201 def delete(self): 

202 if self._conn is None: 

203 raise error.SquirrelError( 

204 'Cannot delete selection, no database connection.') 

205 

206 self.close(delete_persistent=True) 

207 

208 def _cleanup(self): 

209 ''' 

210 Perform cleanup actions before database connection is closed. 

211 

212 Removes volatile content from database. 

213 ''' 

214 

215 while self._volatile_paths: 

216 path = self._volatile_paths.pop() 

217 self._database.remove(path) 

218 

219 def _delete(self): 

220 ''' 

221 Destroy the tables assoctiated with this selection. 

222 ''' 

223 with self.transaction('delete selection') as cursor: 

224 cursor.execute(self._sql( 

225 'DROP TABLE %(db)s.%(file_states)s')) 

226 

227 if self._persistent: 

228 cursor.execute( 

229 ''' 

230 DELETE FROM persistent WHERE name == ? 

231 ''', (self.name[5:],)) 

232 

233 def _register_table(self, s): 

234 return self._database._register_table(s) 

235 

236 def _sql(self, s): 

237 return s % self._names 

238 

239 def transaction(self, label='', mode='immediate'): 

240 return self._database.transaction(label, mode) 

241 

242 def is_new(self): 

243 ''' 

244 Is this a new selection? 

245 

246 Always ``True`` for non-persistent selections. Only ``False`` for 

247 a persistent selection which already existed in the database when the 

248 it was initialized. 

249 ''' 

250 return self._is_new 

251 

252 def get_database(self): 

253 ''' 

254 Get the database to which this selection belongs. 

255 

256 :returns: :py:class:`~pyrocko.squirrel.database.Database` object 

257 ''' 

258 return self._database 

259 

260 @filldocs 

261 def add( 

262 self, 

263 paths, 

264 kind_mask=model.g_kind_mask_all, 

265 format='detect', 

266 show_progress=True, 

267 transaction=None): 

268 

269 ''' 

270 Add files to the selection. 

271 

272 :param paths: 

273 Paths to files to be added to the selection. 

274 :type paths: 

275 iterator yielding :py:class:`str` objects 

276 

277 :param kind_mask: 

278 Content kinds to be added to the selection. 

279 :type kind_mask: 

280 :py:class:`int` (bit mask) 

281 

282 :param format: 

283 File format identifier or ``'detect'`` to enable auto-detection 

284 (available: %(file_formats)s). 

285 :type format: 

286 str 

287 ''' 

288 

289 if isinstance(paths, str): 

290 paths = [paths] 

291 

292 paths = util.short_to_list(200, paths) 

293 

294 if isinstance(paths, list) and len(paths) == 0: 

295 return 

296 

297 use_temp_table = not isinstance(paths, list) or len(paths) > 200 

298 

299 if show_progress: 

300 task = make_task('Gathering file names') 

301 paths = task(paths) 

302 

303 db = self.get_database() 

304 with (transaction or self.transaction('add files')) as cursor: 

305 if not use_temp_table: 

306 paths = [db.relpath(path) for path in paths] 

307 

308 # short non-iterator paths: can do without temp table 

309 

310 cursor.executemany( 

311 ''' 

312 INSERT OR IGNORE INTO files 

313 VALUES (NULL, ?, NULL, NULL, NULL) 

314 ''', ((x,) for x in paths)) 

315 

316 if show_progress: 

317 task = make_task('Preparing database', 3) 

318 task.update(0, condition='pruning stale information') 

319 

320 cursor.executemany(self._sql( 

321 ''' 

322 DELETE FROM %(db)s.%(file_states)s 

323 WHERE file_id IN ( 

324 SELECT files.file_id 

325 FROM files 

326 WHERE files.path == ? ) 

327 AND ( kind_mask != ? OR format != ? ) 

328 '''), ( 

329 (path, kind_mask, format) for path in paths)) 

330 

331 if show_progress: 

332 task.update(1, condition='adding file names to selection') 

333 

334 cursor.executemany(self._sql( 

335 ''' 

336 INSERT OR IGNORE INTO %(db)s.%(file_states)s 

337 SELECT files.file_id, 0, ?, ? 

338 FROM files 

339 WHERE files.path = ? 

340 '''), ((kind_mask, format, path) for path in paths)) 

341 

342 if show_progress: 

343 task.update(2, condition='updating file states') 

344 

345 cursor.executemany(self._sql( 

346 ''' 

347 UPDATE %(db)s.%(file_states)s 

348 SET file_state = 1 

349 WHERE file_id IN ( 

350 SELECT files.file_id 

351 FROM files 

352 WHERE files.path == ? ) 

353 AND file_state != 0 

354 '''), ((path,) for path in paths)) 

355 

356 if show_progress: 

357 task.update(3) 

358 task.done() 

359 

360 else: 

361 

362 cursor.execute(self._sql( 

363 ''' 

364 CREATE TEMP TABLE temp.%(bulkinsert)s 

365 (path text) 

366 ''')) 

367 

368 cursor.executemany(self._sql( 

369 'INSERT INTO temp.%(bulkinsert)s VALUES (?)'), 

370 ((db.relpath(x),) for x in paths)) 

371 

372 if show_progress: 

373 task = make_task('Preparing database', 5) 

374 task.update(0, condition='adding file names to database') 

375 

376 cursor.execute(self._sql( 

377 ''' 

378 INSERT OR IGNORE INTO files 

379 SELECT NULL, path, NULL, NULL, NULL 

380 FROM temp.%(bulkinsert)s 

381 ''')) 

382 

383 if show_progress: 

384 task.update(1, condition='pruning stale information') 

385 

386 cursor.execute(self._sql( 

387 ''' 

388 DELETE FROM %(db)s.%(file_states)s 

389 WHERE file_id IN ( 

390 SELECT files.file_id 

391 FROM temp.%(bulkinsert)s 

392 INNER JOIN files 

393 ON temp.%(bulkinsert)s.path == files.path) 

394 AND ( kind_mask != ? OR format != ? ) 

395 '''), (kind_mask, format)) 

396 

397 if show_progress: 

398 task.update(2, condition='adding file names to selection') 

399 

400 cursor.execute(self._sql( 

401 ''' 

402 INSERT OR IGNORE INTO %(db)s.%(file_states)s 

403 SELECT files.file_id, 0, ?, ? 

404 FROM temp.%(bulkinsert)s 

405 INNER JOIN files 

406 ON temp.%(bulkinsert)s.path == files.path 

407 '''), (kind_mask, format)) 

408 

409 if show_progress: 

410 task.update(3, condition='updating file states') 

411 

412 cursor.execute(self._sql( 

413 ''' 

414 UPDATE %(db)s.%(file_states)s 

415 SET file_state = 1 

416 WHERE file_id IN ( 

417 SELECT files.file_id 

418 FROM temp.%(bulkinsert)s 

419 INNER JOIN files 

420 ON temp.%(bulkinsert)s.path == files.path) 

421 AND file_state != 0 

422 ''')) 

423 

424 if show_progress: 

425 task.update(4, condition='dropping temporary data') 

426 

427 cursor.execute(self._sql( 

428 'DROP TABLE temp.%(bulkinsert)s')) 

429 

430 if show_progress: 

431 task.update(5) 

432 task.done() 

433 

434 def remove(self, paths): 

435 ''' 

436 Remove files from the selection. 

437 

438 :param paths: 

439 Paths to files to be removed from the selection. 

440 :type paths: 

441 :py:class:`list` of :py:class:`str` 

442 ''' 

443 if isinstance(paths, str): 

444 paths = [paths] 

445 

446 db = self.get_database() 

447 

448 def normpath(path): 

449 return db.relpath(abspath(path)) 

450 

451 with self.transaction('remove files') as cursor: 

452 cursor.executemany(self._sql( 

453 ''' 

454 DELETE FROM %(db)s.%(file_states)s 

455 WHERE %(db)s.%(file_states)s.file_id IN 

456 (SELECT files.file_id 

457 FROM files 

458 WHERE files.path == ?) 

459 '''), ((normpath(path),) for path in paths)) 

460 

461 def iter_paths(self, raw=False, format=None): 

462 ''' 

463 Iterate over all file paths currently belonging to the selection. 

464 

465 :param raw: 

466 By default absolute paths are yielded. Set to ``True`` to yield 

467 the path as it is stored in the database, which can be relative or 

468 absolute, depending on whether the file is within a Squirrel 

469 environment or outside. 

470 :type raw: 

471 bool 

472 

473 :yields: File paths. 

474 ''' 

475 

476 conditions = [] 

477 args = [] 

478 if format is not None: 

479 conditions.append('files.format = ?') 

480 args.append(format) 

481 

482 if conditions: 

483 condition = 'WHERE %s' % ' AND '.join(conditions) 

484 else: 

485 condition = '' 

486 

487 sql = self._sql(''' 

488 SELECT 

489 files.path 

490 FROM %(db)s.%(file_states)s 

491 INNER JOIN files 

492 ON files.file_id = %(db)s.%(file_states)s.file_id 

493''' + condition + ''' 

494 ORDER BY %(db)s.%(file_states)s.file_id 

495 ''') 

496 

497 if raw: 

498 def trans(path): 

499 return path 

500 else: 

501 db = self.get_database() 

502 trans = db.abspath 

503 

504 for values in self._conn.execute(sql, args): 

505 yield trans(values[0]) 

506 

507 def get_paths(self, raw=False, format=None): 

508 ''' 

509 Get all file paths currently belonging to the selection. 

510 

511 :param raw: 

512 By default absolute paths are returned. Set to ``True`` to return 

513 the path as it is stored in the database, which can be relative or 

514 absolute, depending on whether the file is within a Squirrel 

515 environment or outside. 

516 :type raw: 

517 bool 

518 

519 :returns: List of file paths. 

520 ''' 

521 return list(self.iter_paths(raw=raw, format=format)) 

522 

523 def _set_file_states_known(self, transaction=None): 

524 ''' 

525 Set file states to "known" (2). 

526 ''' 

527 with (transaction or self.transaction('set file states known')) \ 

528 as cursor: 

529 cursor.execute(self._sql( 

530 ''' 

531 UPDATE %(db)s.%(file_states)s 

532 SET file_state = 2 

533 WHERE file_state < 2 

534 ''')) 

535 

536 def _set_file_states_force_check(self, paths=None, transaction=None): 

537 ''' 

538 Set file states to "request force check" (1). 

539 ''' 

540 

541 with (transaction or self.transaction('set file states force check')) \ 

542 as cursor: 

543 

544 if paths is None: 

545 cursor.execute(self._sql( 

546 ''' 

547 UPDATE %(db)s.%(file_states)s 

548 SET file_state = 1 

549 ''')) 

550 else: 

551 db = self.get_database() 

552 

553 def normpath(path): 

554 return db.relpath(abspath(path)) 

555 

556 cursor.executemany(self._sql( 

557 ''' 

558 UPDATE %(db)s.%(file_states)s 

559 SET file_state = 1 

560 WHERE %(db)s.%(file_states)s.file_id IN 

561 (SELECT files.file_id 

562 FROM files 

563 WHERE files.path == ?) 

564 '''), ((normpath(path),) for path in paths)) 

565 

566 def undig_grouped(self, skip_unchanged=False): 

567 ''' 

568 Get inventory of cached content for all files in the selection. 

569 

570 :param skip_unchanged: 

571 If ``True`` only inventory of modified files is 

572 yielded (:py:meth:`flag_modified` must be called beforehand). 

573 :type skip_unchanged: 

574 bool 

575 

576 This generator yields tuples ``((format, path), nuts)`` where ``path`` 

577 is the path to the file, ``format`` is the format assignation or 

578 ``'detect'`` and ``nuts`` is a list of 

579 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the 

580 contents of the file. 

581 ''' 

582 

583 where = '' 

584 if skip_unchanged: 

585 where = ''' 

586 WHERE file_states_b.file_state == 0 

587 ''' 

588 

589 nfiles = execute_get1(self._conn, self._sql(''' 

590 SELECT 

591 COUNT() 

592 FROM %(db)s.%(file_states)s file_states_b 

593 ''' + where), ())[0] 

594 

595 def gen(): 

596 sql = self._sql(''' 

597 SELECT 

598 file_states_a.format, 

599 files.path, 

600 files.format, 

601 files.mtime, 

602 files.size, 

603 nuts.file_segment, 

604 nuts.file_element, 

605 kind_codes.kind_id, 

606 kind_codes.codes, 

607 nuts.tmin_seconds, 

608 nuts.tmin_offset, 

609 nuts.tmax_seconds, 

610 nuts.tmax_offset, 

611 kind_codes.deltat 

612 FROM %(db)s.%(file_states)s file_states_a 

613 LEFT OUTER JOIN files 

614 ON file_states_a.file_id = files.file_id 

615 LEFT OUTER JOIN nuts 

616 ON files.file_id = nuts.file_id 

617 LEFT OUTER JOIN kind_codes 

618 ON nuts.kind_codes_id == kind_codes.kind_codes_id 

619 WHERE file_states_a.file_id IN ( 

620 SELECT file_id FROM %(db)s.%(file_states)s file_states_b 

621 ''' + where + ''' 

622 ORDER BY file_states_b.file_id 

623 LIMIT ? OFFSET ? 

624 ) 

625 ORDER BY file_states_a.file_id 

626 ''') 

627 

628 db = self.get_database() 

629 

630 limit = 100 

631 offset = 0 

632 while True: 

633 nuts = [] 

634 format_path = None 

635 at_end = True 

636 for values in self._conn.execute( 

637 sql, (limit, offset)): 

638 

639 at_end = False 

640 apath = db.abspath(values[1]) 

641 if format_path is not None and apath != format_path[1]: 

642 yield format_path, nuts 

643 nuts = [] 

644 

645 format_path = values[0], apath 

646 

647 if values[2] is not None: 

648 nuts.append(model.Nut( 

649 values_nocheck=format_path[1:2] + values[2:])) 

650 

651 if format_path is not None: 

652 yield format_path, nuts 

653 

654 offset += limit 

655 if at_end: 

656 break 

657 

658 return GeneratorWithLen(gen(), nfiles) 

659 

660 def flag_modified(self, check=True, transaction=None): 

661 ''' 

662 Mark files which have been modified. 

663 

664 :param check: 

665 If ``True`` query modification times of known files on disk. If 

666 ``False``, only flag unknown files. 

667 :type check: 

668 bool 

669 

670 Assumes file state is 0 for newly added files, 1 for files added again 

671 to the selection (forces check), or 2 for all others (no checking is 

672 done for those). 

673 

674 Sets file state to 0 for unknown or modified files, 2 for known and not 

675 modified files. 

676 ''' 

677 

678 db = self.get_database() 

679 with (transaction or self.transaction('flag modified')) as cursor: 

680 sql = self._sql(''' 

681 UPDATE %(db)s.%(file_states)s 

682 SET file_state = 0 

683 WHERE ( 

684 SELECT mtime 

685 FROM files 

686 WHERE 

687 files.file_id == %(db)s.%(file_states)s.file_id) IS NULL 

688 AND file_state == 1 

689 ''') 

690 

691 cursor.execute(sql) 

692 

693 if not check: 

694 

695 sql = self._sql(''' 

696 UPDATE %(db)s.%(file_states)s 

697 SET file_state = 2 

698 WHERE file_state == 1 

699 ''') 

700 

701 cursor.execute(sql) 

702 

703 return 

704 

705 def iter_file_states(): 

706 sql = self._sql(''' 

707 SELECT 

708 files.file_id, 

709 files.path, 

710 files.format, 

711 files.mtime, 

712 files.size 

713 FROM %(db)s.%(file_states)s 

714 INNER JOIN files 

715 ON %(db)s.%(file_states)s.file_id == files.file_id 

716 WHERE %(db)s.%(file_states)s.file_state == 1 

717 ORDER BY %(db)s.%(file_states)s.file_id 

718 ''') 

719 

720 for (file_id, path, fmt, mtime_db, 

721 size_db) in self._conn.execute(sql): 

722 

723 path = db.abspath(path) 

724 try: 

725 mod = io.get_backend(fmt) 

726 file_stats = mod.get_stats(path) 

727 

728 except FileLoadError: 

729 yield 0, file_id 

730 continue 

731 except io.UnknownFormat: 

732 continue 

733 

734 if (mtime_db, size_db) != file_stats: 

735 yield 0, file_id 

736 else: 

737 yield 2, file_id 

738 

739 # could better use callback function here... 

740 

741 sql = self._sql(''' 

742 UPDATE %(db)s.%(file_states)s 

743 SET file_state = ? 

744 WHERE file_id = ? 

745 ''') 

746 

747 cursor.executemany(sql, iter_file_states()) 

748 

749 

750__all__ = [ 

751 'Selection', 

752]