Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/selection.py: 86%

209 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-04 09:52 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Meta-data caching for flexible file selections. 

8''' 

9 

10import os 

11import re 

12import threading 

13import logging 

14 

15from pyrocko import util 

16from pyrocko.io.io_common import FileLoadError 

17from pyrocko.progress import progress 

18 

19from . import error, io, model 

20from .database import Database, get_database, execute_get1, abspath 

21 

22logger = logging.getLogger('psq.selection') 

23 

24g_icount = 0 

25g_lock = threading.Lock() 

26 

27re_persistent_name = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]{0,64}$') 

28 

29 

30def make_unique_name(): 

31 with g_lock: 

32 global g_icount 

33 name = '%i_%i' % (os.getpid(), g_icount) 

34 g_icount += 1 

35 

36 return name 

37 

38 

39def make_task(*args): 

40 return progress.task(*args, logger=logger) 

41 

42 

43doc_snippets = dict( 

44 query_args=''' 

45 :param obj: 

46 Object providing ``tmin``, ``tmax`` and ``codes`` to be used to 

47 constrain the query. Direct arguments override those from ``obj``. 

48 :type obj: 

49 any object with attributes ``tmin``, ``tmax`` and ``codes`` 

50 

51 :param tmin: 

52 Start time of query interval. 

53 :type tmin: 

54 :py:func:`~pyrocko.util.get_time_float` 

55 

56 :param tmax: 

57 End time of query interval. 

58 :type tmax: 

59 :py:func:`~pyrocko.util.get_time_float` 

60 

61 :param time: 

62 Time instant to query. Equivalent to setting ``tmin`` and ``tmax`` 

63 to the same value. 

64 :type time: 

65 :py:func:`~pyrocko.util.get_time_float` 

66 

67 :param codes: 

68 Pattern of content codes to query. 

69 :type codes: 

70 :class:`list` of :py:class:`~pyrocko.squirrel.model.Codes` 

71 objects appropriate for the queried content type, or anything which 

72 can be converted to such objects. 

73''', 

74 file_formats=', '.join( 

75 "``'%s'``" % fmt for fmt in io.supported_formats())) 

76 

77 

78def filldocs(meth): 

79 meth.__doc__ %= doc_snippets 

80 return meth 

81 

82 

83class GeneratorWithLen(object): 

84 

85 def __init__(self, gen, length): 

86 self.gen = gen 

87 self.length = length 

88 

89 def __len__(self): 

90 return self.length 

91 

92 def __iter__(self): 

93 return self.gen 

94 

95 

96class Selection(object): 

97 

98 ''' 

99 Database backed file selection (base class for 

100 :py:class:`~pyrocko.squirrel.base.Squirrel`). 

101 

102 :param database: 

103 Database instance or file path to database. 

104 :type database: 

105 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str` 

106 

107 :param persistent: 

108 If given a name, create a persistent selection. 

109 :type persistent: 

110 :py:class:`str` 

111 

112 A selection in this context represents the list of files available to the 

113 application. Instead of using :py:class:`Selection` directly, user 

114 applications should usually use its subclass 

115 :py:class:`~pyrocko.squirrel.base.Squirrel` which adds content indices to 

116 the selection and provides high level data querying. 

117 

118 By default, a temporary table in the database is created to hold the names 

119 of the files in the selection. This table is only visible inside the 

120 application which created it. If a name is given to ``persistent``, a named 

121 selection is created, which is visible also in other applications using the 

122 same database. 

123 

124 Besides the filename references, desired content kind masks and file format 

125 indications are stored in the selection's database table to make the user 

126 choice regarding these options persistent on a per-file basis. Book-keeping 

127 on whether files are unknown, known or if modification checks are forced is 

128 handled in the selection's file-state table. 

129 

130 Paths of files can be added to the selection using the :py:meth:`add` 

131 method and removed with :py:meth:`remove`. :py:meth:`undig_grouped` can be 

132 used to iterate over all content known to the selection. 

133 ''' 

134 

135 def __init__(self, database, persistent=None): 

136 self._conn = None 

137 

138 if not isinstance(database, Database): 

139 database = get_database(database) 

140 

141 if persistent is not None: 

142 assert isinstance(persistent, str) 

143 if not re_persistent_name.match(persistent): 

144 raise error.SquirrelError( 

145 'invalid persistent selection name: %s' % persistent) 

146 

147 self.name = 'psel_' + persistent 

148 else: 

149 self.name = 'sel_' + make_unique_name() 

150 

151 self._persistent = persistent 

152 self._database = database 

153 self._conn = self._database.get_connection() 

154 self._sources = [] 

155 self._is_new = True 

156 self._volatile_paths = [] 

157 

158 with self.transaction('init selection') as cursor: 

159 

160 if persistent is not None: 

161 self._is_new = 1 == cursor.execute( 

162 ''' 

163 INSERT OR IGNORE INTO persistent VALUES (?) 

164 ''', (persistent,)).rowcount 

165 

166 self._names = { 

167 'db': 'main' if self._persistent else 'temp', 

168 'file_states': self.name + '_file_states', 

169 'bulkinsert': self.name + '_bulkinsert'} 

170 

171 cursor.execute(self._register_table(self._sql( 

172 ''' 

173 CREATE TABLE IF NOT EXISTS %(db)s.%(file_states)s ( 

174 file_id integer PRIMARY KEY, 

175 file_state integer, 

176 kind_mask integer, 

177 format text) 

178 '''))) 

179 

180 cursor.execute(self._sql( 

181 ''' 

182 CREATE INDEX 

183 IF NOT EXISTS %(db)s.%(file_states)s_index_file_state 

184 ON %(file_states)s (file_state) 

185 ''')) 

186 

187 def __del__(self): 

188 if hasattr(self, '_conn') and self._conn: 

189 self._cleanup() 

190 if not self._persistent: 

191 self._delete() 

192 

193 def _register_table(self, s): 

194 return self._database._register_table(s) 

195 

196 def _sql(self, s): 

197 return s % self._names 

198 

199 def transaction(self, label='', mode='immediate'): 

200 return self._database.transaction(label, mode) 

201 

202 def is_new(self): 

203 ''' 

204 Is this a new selection? 

205 

206 Always ``True`` for non-persistent selections. Only ``False`` for 

207 a persistent selection which already existed in the database when the 

208 it was initialized. 

209 ''' 

210 return self._is_new 

211 

212 def get_database(self): 

213 ''' 

214 Get the database to which this selection belongs. 

215 

216 :returns: :py:class:`~pyrocko.squirrel.database.Database` object 

217 ''' 

218 return self._database 

219 

220 def _cleanup(self): 

221 ''' 

222 Perform cleanup actions before database connection is closed. 

223 

224 Removes volatile content from database. 

225 ''' 

226 

227 while self._volatile_paths: 

228 path = self._volatile_paths.pop() 

229 self._database.remove(path) 

230 

231 def _delete(self): 

232 ''' 

233 Destroy the tables assoctiated with this selection. 

234 ''' 

235 with self.transaction('delete selection') as cursor: 

236 cursor.execute(self._sql( 

237 'DROP TABLE %(db)s.%(file_states)s')) 

238 

239 if self._persistent: 

240 cursor.execute( 

241 ''' 

242 DELETE FROM persistent WHERE name == ? 

243 ''', (self.name[5:],)) 

244 

245 self._conn = None 

246 

247 def delete(self): 

248 self._delete() 

249 

250 @filldocs 

251 def add( 

252 self, 

253 paths, 

254 kind_mask=model.g_kind_mask_all, 

255 format='detect', 

256 show_progress=True): 

257 

258 ''' 

259 Add files to the selection. 

260 

261 :param paths: 

262 Paths to files to be added to the selection. 

263 :type paths: 

264 iterator yielding :py:class:`str` objects 

265 

266 :param kind_mask: 

267 Content kinds to be added to the selection. 

268 :type kind_mask: 

269 :py:class:`int` (bit mask) 

270 

271 :param format: 

272 File format identifier or ``'detect'`` to enable auto-detection 

273 (available: %(file_formats)s). 

274 :type format: 

275 str 

276 ''' 

277 

278 if isinstance(paths, str): 

279 paths = [paths] 

280 

281 paths = util.short_to_list(200, paths) 

282 

283 if isinstance(paths, list) and len(paths) == 0: 

284 return 

285 

286 if show_progress: 

287 task = make_task('Gathering file names') 

288 paths = task(paths) 

289 

290 db = self.get_database() 

291 with self.transaction('add files') as cursor: 

292 

293 if isinstance(paths, list) and len(paths) <= 200: 

294 

295 paths = [db.relpath(path) for path in paths] 

296 

297 # short non-iterator paths: can do without temp table 

298 

299 cursor.executemany( 

300 ''' 

301 INSERT OR IGNORE INTO files 

302 VALUES (NULL, ?, NULL, NULL, NULL) 

303 ''', ((x,) for x in paths)) 

304 

305 if show_progress: 

306 task = make_task('Preparing database', 3) 

307 task.update(0, condition='pruning stale information') 

308 

309 cursor.executemany(self._sql( 

310 ''' 

311 DELETE FROM %(db)s.%(file_states)s 

312 WHERE file_id IN ( 

313 SELECT files.file_id 

314 FROM files 

315 WHERE files.path == ? ) 

316 AND ( kind_mask != ? OR format != ? ) 

317 '''), ( 

318 (path, kind_mask, format) for path in paths)) 

319 

320 if show_progress: 

321 task.update(1, condition='adding file names to selection') 

322 

323 cursor.executemany(self._sql( 

324 ''' 

325 INSERT OR IGNORE INTO %(db)s.%(file_states)s 

326 SELECT files.file_id, 0, ?, ? 

327 FROM files 

328 WHERE files.path = ? 

329 '''), ((kind_mask, format, path) for path in paths)) 

330 

331 if show_progress: 

332 task.update(2, condition='updating file states') 

333 

334 cursor.executemany(self._sql( 

335 ''' 

336 UPDATE %(db)s.%(file_states)s 

337 SET file_state = 1 

338 WHERE file_id IN ( 

339 SELECT files.file_id 

340 FROM files 

341 WHERE files.path == ? ) 

342 AND file_state != 0 

343 '''), ((path,) for path in paths)) 

344 

345 if show_progress: 

346 task.update(3) 

347 task.done() 

348 

349 else: 

350 

351 cursor.execute(self._sql( 

352 ''' 

353 CREATE TEMP TABLE temp.%(bulkinsert)s 

354 (path text) 

355 ''')) 

356 

357 cursor.executemany(self._sql( 

358 'INSERT INTO temp.%(bulkinsert)s VALUES (?)'), 

359 ((db.relpath(x),) for x in paths)) 

360 

361 if show_progress: 

362 task = make_task('Preparing database', 5) 

363 task.update(0, condition='adding file names to database') 

364 

365 cursor.execute(self._sql( 

366 ''' 

367 INSERT OR IGNORE INTO files 

368 SELECT NULL, path, NULL, NULL, NULL 

369 FROM temp.%(bulkinsert)s 

370 ''')) 

371 

372 if show_progress: 

373 task.update(1, condition='pruning stale information') 

374 

375 cursor.execute(self._sql( 

376 ''' 

377 DELETE FROM %(db)s.%(file_states)s 

378 WHERE file_id IN ( 

379 SELECT files.file_id 

380 FROM temp.%(bulkinsert)s 

381 INNER JOIN files 

382 ON temp.%(bulkinsert)s.path == files.path) 

383 AND ( kind_mask != ? OR format != ? ) 

384 '''), (kind_mask, format)) 

385 

386 if show_progress: 

387 task.update(2, condition='adding file names to selection') 

388 

389 cursor.execute(self._sql( 

390 ''' 

391 INSERT OR IGNORE INTO %(db)s.%(file_states)s 

392 SELECT files.file_id, 0, ?, ? 

393 FROM temp.%(bulkinsert)s 

394 INNER JOIN files 

395 ON temp.%(bulkinsert)s.path == files.path 

396 '''), (kind_mask, format)) 

397 

398 if show_progress: 

399 task.update(3, condition='updating file states') 

400 

401 cursor.execute(self._sql( 

402 ''' 

403 UPDATE %(db)s.%(file_states)s 

404 SET file_state = 1 

405 WHERE file_id IN ( 

406 SELECT files.file_id 

407 FROM temp.%(bulkinsert)s 

408 INNER JOIN files 

409 ON temp.%(bulkinsert)s.path == files.path) 

410 AND file_state != 0 

411 ''')) 

412 

413 if show_progress: 

414 task.update(4, condition='dropping temporary data') 

415 

416 cursor.execute(self._sql( 

417 'DROP TABLE temp.%(bulkinsert)s')) 

418 

419 if show_progress: 

420 task.update(5) 

421 task.done() 

422 

423 def remove(self, paths): 

424 ''' 

425 Remove files from the selection. 

426 

427 :param paths: 

428 Paths to files to be removed from the selection. 

429 :type paths: 

430 :py:class:`list` of :py:class:`str` 

431 ''' 

432 if isinstance(paths, str): 

433 paths = [paths] 

434 

435 db = self.get_database() 

436 

437 def normpath(path): 

438 return db.relpath(abspath(path)) 

439 

440 with self.transaction('remove files') as cursor: 

441 cursor.executemany(self._sql( 

442 ''' 

443 DELETE FROM %(db)s.%(file_states)s 

444 WHERE %(db)s.%(file_states)s.file_id IN 

445 (SELECT files.file_id 

446 FROM files 

447 WHERE files.path == ?) 

448 '''), ((normpath(path),) for path in paths)) 

449 

450 def iter_paths(self, raw=False): 

451 ''' 

452 Iterate over all file paths currently belonging to the selection. 

453 

454 :param raw: 

455 By default absolute paths are yielded. Set to ``True`` to yield 

456 the path as it is stored in the database, which can be relative or 

457 absolute, depending on whether the file is within a Squirrel 

458 environment or outside. 

459 :type raw: 

460 bool 

461 

462 :yields: File paths. 

463 ''' 

464 

465 sql = self._sql(''' 

466 SELECT 

467 files.path 

468 FROM %(db)s.%(file_states)s 

469 INNER JOIN files 

470 ON files.file_id = %(db)s.%(file_states)s.file_id 

471 ORDER BY %(db)s.%(file_states)s.file_id 

472 ''') 

473 

474 if raw: 

475 def trans(path): 

476 return path 

477 else: 

478 db = self.get_database() 

479 trans = db.abspath 

480 

481 for values in self._conn.execute(sql): 

482 yield trans(values[0]) 

483 

484 def get_paths(self, raw=False): 

485 ''' 

486 Get all file paths currently belonging to the selection. 

487 

488 :param raw: 

489 By default absolute paths are returned. Set to ``True`` to return 

490 the path as it is stored in the database, which can be relative or 

491 absolute, depending on whether the file is within a Squirrel 

492 environment or outside. 

493 :type raw: 

494 bool 

495 

496 :returns: List of file paths. 

497 ''' 

498 return list(self.iter_paths(raw=raw)) 

499 

500 def _set_file_states_known(self, transaction=None): 

501 ''' 

502 Set file states to "known" (2). 

503 ''' 

504 with (transaction or self.transaction('set file states known')) \ 

505 as cursor: 

506 cursor.execute(self._sql( 

507 ''' 

508 UPDATE %(db)s.%(file_states)s 

509 SET file_state = 2 

510 WHERE file_state < 2 

511 ''')) 

512 

513 def _set_file_states_force_check(self, paths=None, transaction=None): 

514 ''' 

515 Set file states to "request force check" (1). 

516 ''' 

517 

518 with (transaction or self.transaction('set file states force check')) \ 

519 as cursor: 

520 

521 if paths is None: 

522 cursor.execute(self._sql( 

523 ''' 

524 UPDATE %(db)s.%(file_states)s 

525 SET file_state = 1 

526 ''')) 

527 else: 

528 db = self.get_database() 

529 

530 def normpath(path): 

531 return db.relpath(abspath(path)) 

532 

533 cursor.executemany(self._sql( 

534 ''' 

535 UPDATE %(db)s.%(file_states)s 

536 SET file_state = 1 

537 WHERE %(db)s.%(file_states)s.file_id IN 

538 (SELECT files.file_id 

539 FROM files 

540 WHERE files.path == ?) 

541 '''), ((normpath(path),) for path in paths)) 

542 

543 def undig_grouped(self, skip_unchanged=False): 

544 ''' 

545 Get inventory of cached content for all files in the selection. 

546 

547 :param skip_unchanged: 

548 If ``True`` only inventory of modified files is 

549 yielded (:py:meth:`flag_modified` must be called beforehand). 

550 :type skip_unchanged: 

551 bool 

552 

553 This generator yields tuples ``((format, path), nuts)`` where ``path`` 

554 is the path to the file, ``format`` is the format assignation or 

555 ``'detect'`` and ``nuts`` is a list of 

556 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the 

557 contents of the file. 

558 ''' 

559 

560 if skip_unchanged: 

561 where = ''' 

562 WHERE %(db)s.%(file_states)s.file_state == 0 

563 ''' 

564 else: 

565 where = '' 

566 

567 nfiles = execute_get1(self._conn, self._sql(''' 

568 SELECT 

569 COUNT() 

570 FROM %(db)s.%(file_states)s 

571 ''' + where), ())[0] 

572 

573 def gen(): 

574 sql = self._sql(''' 

575 SELECT 

576 %(db)s.%(file_states)s.format, 

577 files.path, 

578 files.format, 

579 files.mtime, 

580 files.size, 

581 nuts.file_segment, 

582 nuts.file_element, 

583 kind_codes.kind_id, 

584 kind_codes.codes, 

585 nuts.tmin_seconds, 

586 nuts.tmin_offset, 

587 nuts.tmax_seconds, 

588 nuts.tmax_offset, 

589 kind_codes.deltat 

590 FROM %(db)s.%(file_states)s 

591 LEFT OUTER JOIN files 

592 ON %(db)s.%(file_states)s.file_id = files.file_id 

593 LEFT OUTER JOIN nuts 

594 ON files.file_id = nuts.file_id 

595 LEFT OUTER JOIN kind_codes 

596 ON nuts.kind_codes_id == kind_codes.kind_codes_id 

597 ''' + where + ''' 

598 ORDER BY %(db)s.%(file_states)s.file_id 

599 ''') 

600 

601 nuts = [] 

602 format_path = None 

603 db = self.get_database() 

604 for values in self._conn.execute(sql): 

605 apath = db.abspath(values[1]) 

606 if format_path is not None and apath != format_path[1]: 

607 yield format_path, nuts 

608 nuts = [] 

609 

610 format_path = values[0], apath 

611 

612 if values[2] is not None: 

613 nuts.append(model.Nut( 

614 values_nocheck=format_path[1:2] + values[2:])) 

615 

616 if format_path is not None: 

617 yield format_path, nuts 

618 

619 return GeneratorWithLen(gen(), nfiles) 

620 

621 def flag_modified(self, check=True): 

622 ''' 

623 Mark files which have been modified. 

624 

625 :param check: 

626 If ``True`` query modification times of known files on disk. If 

627 ``False``, only flag unknown files. 

628 :type check: 

629 bool 

630 

631 Assumes file state is 0 for newly added files, 1 for files added again 

632 to the selection (forces check), or 2 for all others (no checking is 

633 done for those). 

634 

635 Sets file state to 0 for unknown or modified files, 2 for known and not 

636 modified files. 

637 ''' 

638 

639 db = self.get_database() 

640 with self.transaction('flag modified') as cursor: 

641 sql = self._sql(''' 

642 UPDATE %(db)s.%(file_states)s 

643 SET file_state = 0 

644 WHERE ( 

645 SELECT mtime 

646 FROM files 

647 WHERE 

648 files.file_id == %(db)s.%(file_states)s.file_id) IS NULL 

649 AND file_state == 1 

650 ''') 

651 

652 cursor.execute(sql) 

653 

654 if not check: 

655 

656 sql = self._sql(''' 

657 UPDATE %(db)s.%(file_states)s 

658 SET file_state = 2 

659 WHERE file_state == 1 

660 ''') 

661 

662 cursor.execute(sql) 

663 

664 return 

665 

666 def iter_file_states(): 

667 sql = self._sql(''' 

668 SELECT 

669 files.file_id, 

670 files.path, 

671 files.format, 

672 files.mtime, 

673 files.size 

674 FROM %(db)s.%(file_states)s 

675 INNER JOIN files 

676 ON %(db)s.%(file_states)s.file_id == files.file_id 

677 WHERE %(db)s.%(file_states)s.file_state == 1 

678 ORDER BY %(db)s.%(file_states)s.file_id 

679 ''') 

680 

681 for (file_id, path, fmt, mtime_db, 

682 size_db) in self._conn.execute(sql): 

683 

684 path = db.abspath(path) 

685 try: 

686 mod = io.get_backend(fmt) 

687 file_stats = mod.get_stats(path) 

688 

689 except FileLoadError: 

690 yield 0, file_id 

691 continue 

692 except io.UnknownFormat: 

693 continue 

694 

695 if (mtime_db, size_db) != file_stats: 

696 yield 0, file_id 

697 else: 

698 yield 2, file_id 

699 

700 # could better use callback function here... 

701 

702 sql = self._sql(''' 

703 UPDATE %(db)s.%(file_states)s 

704 SET file_state = ? 

705 WHERE file_id = ? 

706 ''') 

707 

708 cursor.executemany(sql, iter_file_states()) 

709 

710 

711__all__ = [ 

712 'Selection', 

713]