1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import os 

7import re 

8import threading 

9import logging 

10 

11from pyrocko import util 

12from pyrocko.io.io_common import FileLoadError 

13from pyrocko.progress import progress 

14 

15from . import error, io, model 

16from .database import Database, get_database, execute_get1, abspath 

17 

18logger = logging.getLogger('psq.selection') 

19 

20g_icount = 0 

21g_lock = threading.Lock() 

22 

23re_persistent_name = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]{0,64}$') 

24 

25 

26def make_unique_name(): 

27 with g_lock: 

28 global g_icount 

29 name = '%i_%i' % (os.getpid(), g_icount) 

30 g_icount += 1 

31 

32 return name 

33 

34 

35def make_task(*args): 

36 return progress.task(*args, logger=logger) 

37 

38 

39doc_snippets = dict( 

40 query_args=''' 

41 :param obj: 

42 Object providing ``tmin``, ``tmax`` and ``codes`` to be used to 

43 constrain the query. Direct arguments override those from ``obj``. 

44 :type obj: 

45 any object with attributes ``tmin``, ``tmax`` and ``codes`` 

46 

47 :param tmin: 

48 Start time of query interval. 

49 :type tmin: 

50 timestamp 

51 

52 :param tmax: 

53 End time of query interval. 

54 :type tmax: 

55 timestamp 

56 

57 :param time: 

58 Time instant to query. Equivalent to setting ``tmin`` and ``tmax`` 

59 to the same value. 

60 :type time: 

61 timestamp 

62 

63 :param codes: 

64 Pattern of content codes to query. 

65 :type codes: 

66 :class:`list` of :py:class:`~pyrocko.squirrel.model.Codes` 

67 objects appropriate for the queried content type, or anything which 

68 can be converted to such objects. 

69''', 

70 file_formats=', '.join( 

71 "``'%s'``" % fmt for fmt in io.supported_formats())) 

72 

73 

74def filldocs(meth): 

75 meth.__doc__ %= doc_snippets 

76 return meth 

77 

78 

79class GeneratorWithLen(object): 

80 

81 def __init__(self, gen, length): 

82 self.gen = gen 

83 self.length = length 

84 

85 def __len__(self): 

86 return self.length 

87 

88 def __iter__(self): 

89 return self.gen 

90 

91 

92class Selection(object): 

93 

94 ''' 

95 Database backed file selection (base class for 

96 :py:class:`~pyrocko.squirrel.base.Squirrel`). 

97 

98 :param database: 

99 Database instance or file path to database. 

100 :type database: 

101 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str` 

102 

103 :param persistent: 

104 If given a name, create a persistent selection. 

105 :type persistent: 

106 :py:class:`str` 

107 

108 A selection in this context represents the list of files available to the 

109 application. Instead of using :py:class:`Selection` directly, user 

110 applications should usually use its subclass 

111 :py:class:`~pyrocko.squirrel.base.Squirrel` which adds content indices to 

112 the selection and provides high level data querying. 

113 

114 By default, a temporary table in the database is created to hold the names 

115 of the files in the selection. This table is only visible inside the 

116 application which created it. If a name is given to ``persistent``, a named 

117 selection is created, which is visible also in other applications using the 

118 same database. 

119 

120 Besides the filename references, desired content kind masks and file format 

121 indications are stored in the selection's database table to make the user 

122 choice regarding these options persistent on a per-file basis. Book-keeping 

123 on whether files are unknown, known or if modification checks are forced is 

124 handled in the selection's file-state table. 

125 

126 Paths of files can be added to the selection using the :py:meth:`add` 

127 method and removed with :py:meth:`remove`. :py:meth:`undig_grouped` can be 

128 used to iterate over all content known to the selection. 

129 ''' 

130 

131 def __init__(self, database, persistent=None): 

132 self._conn = None 

133 

134 if not isinstance(database, Database): 

135 database = get_database(database) 

136 

137 if persistent is not None: 

138 assert isinstance(persistent, str) 

139 if not re_persistent_name.match(persistent): 

140 raise error.SquirrelError( 

141 'invalid persistent selection name: %s' % persistent) 

142 

143 self.name = 'psel_' + persistent 

144 else: 

145 self.name = 'sel_' + make_unique_name() 

146 

147 self._persistent = persistent 

148 self._database = database 

149 self._conn = self._database.get_connection() 

150 self._sources = [] 

151 self._is_new = True 

152 self._volatile_paths = [] 

153 

154 with self.transaction('init selection') as cursor: 

155 

156 if persistent is not None: 

157 self._is_new = 1 == cursor.execute( 

158 ''' 

159 INSERT OR IGNORE INTO persistent VALUES (?) 

160 ''', (persistent,)).rowcount 

161 

162 self._names = { 

163 'db': 'main' if self._persistent else 'temp', 

164 'file_states': self.name + '_file_states', 

165 'bulkinsert': self.name + '_bulkinsert'} 

166 

167 cursor.execute(self._register_table(self._sql( 

168 ''' 

169 CREATE TABLE IF NOT EXISTS %(db)s.%(file_states)s ( 

170 file_id integer PRIMARY KEY, 

171 file_state integer, 

172 kind_mask integer, 

173 format text) 

174 '''))) 

175 

176 cursor.execute(self._sql( 

177 ''' 

178 CREATE INDEX 

179 IF NOT EXISTS %(db)s.%(file_states)s_index_file_state 

180 ON %(file_states)s (file_state) 

181 ''')) 

182 

183 def __del__(self): 

184 if hasattr(self, '_conn') and self._conn: 

185 self._cleanup() 

186 if not self._persistent: 

187 self._delete() 

188 

189 def _register_table(self, s): 

190 return self._database._register_table(s) 

191 

192 def _sql(self, s): 

193 return s % self._names 

194 

195 def transaction(self, label='', mode='immediate'): 

196 return self._database.transaction(label, mode) 

197 

198 def is_new(self): 

199 ''' 

200 Is this a new selection? 

201 

202 Always ``True`` for non-persistent selections. Only ``False`` for 

203 a persistent selection which already existed in the database when the 

204 it was initialized. 

205 ''' 

206 return self._is_new 

207 

208 def get_database(self): 

209 ''' 

210 Get the database to which this selection belongs. 

211 

212 :returns: :py:class:`~pyrocko.squirrel.database.Database` object 

213 ''' 

214 return self._database 

215 

216 def _cleanup(self): 

217 ''' 

218 Perform cleanup actions before database connection is closed. 

219 

220 Removes volatile content from database. 

221 ''' 

222 

223 while self._volatile_paths: 

224 path = self._volatile_paths.pop() 

225 self._database.remove(path) 

226 

227 def _delete(self): 

228 ''' 

229 Destroy the tables assoctiated with this selection. 

230 ''' 

231 with self.transaction('delete selection') as cursor: 

232 cursor.execute(self._sql( 

233 'DROP TABLE %(db)s.%(file_states)s')) 

234 

235 if self._persistent: 

236 cursor.execute( 

237 ''' 

238 DELETE FROM persistent WHERE name == ? 

239 ''', (self.name[5:],)) 

240 

241 self._conn = None 

242 

243 def delete(self): 

244 self._delete() 

245 

246 @filldocs 

247 def add( 

248 self, 

249 paths, 

250 kind_mask=model.g_kind_mask_all, 

251 format='detect', 

252 show_progress=True): 

253 

254 ''' 

255 Add files to the selection. 

256 

257 :param paths: 

258 Paths to files to be added to the selection. 

259 :type paths: 

260 iterator yielding :py:class:`str` objects 

261 

262 :param kind_mask: 

263 Content kinds to be added to the selection. 

264 :type kind_mask: 

265 :py:class:`int` (bit mask) 

266 

267 :param format: 

268 File format identifier or ``'detect'`` to enable auto-detection 

269 (available: %(file_formats)s). 

270 :type format: 

271 str 

272 ''' 

273 

274 if isinstance(paths, str): 

275 paths = [paths] 

276 

277 paths = util.short_to_list(200, paths) 

278 

279 if isinstance(paths, list) and len(paths) == 0: 

280 return 

281 

282 if show_progress: 

283 task = make_task('Gathering file names') 

284 paths = task(paths) 

285 

286 db = self.get_database() 

287 with self.transaction('add files') as cursor: 

288 

289 if isinstance(paths, list) and len(paths) <= 200: 

290 

291 paths = [db.relpath(path) for path in paths] 

292 

293 # short non-iterator paths: can do without temp table 

294 

295 cursor.executemany( 

296 ''' 

297 INSERT OR IGNORE INTO files 

298 VALUES (NULL, ?, NULL, NULL, NULL) 

299 ''', ((x,) for x in paths)) 

300 

301 if show_progress: 

302 task = make_task('Preparing database', 3) 

303 task.update(0, condition='pruning stale information') 

304 

305 cursor.executemany(self._sql( 

306 ''' 

307 DELETE FROM %(db)s.%(file_states)s 

308 WHERE file_id IN ( 

309 SELECT files.file_id 

310 FROM files 

311 WHERE files.path == ? ) 

312 AND ( kind_mask != ? OR format != ? ) 

313 '''), ( 

314 (path, kind_mask, format) for path in paths)) 

315 

316 if show_progress: 

317 task.update(1, condition='adding file names to selection') 

318 

319 cursor.executemany(self._sql( 

320 ''' 

321 INSERT OR IGNORE INTO %(db)s.%(file_states)s 

322 SELECT files.file_id, 0, ?, ? 

323 FROM files 

324 WHERE files.path = ? 

325 '''), ((kind_mask, format, path) for path in paths)) 

326 

327 if show_progress: 

328 task.update(2, condition='updating file states') 

329 

330 cursor.executemany(self._sql( 

331 ''' 

332 UPDATE %(db)s.%(file_states)s 

333 SET file_state = 1 

334 WHERE file_id IN ( 

335 SELECT files.file_id 

336 FROM files 

337 WHERE files.path == ? ) 

338 AND file_state != 0 

339 '''), ((path,) for path in paths)) 

340 

341 if show_progress: 

342 task.update(3) 

343 task.done() 

344 

345 else: 

346 

347 cursor.execute(self._sql( 

348 ''' 

349 CREATE TEMP TABLE temp.%(bulkinsert)s 

350 (path text) 

351 ''')) 

352 

353 cursor.executemany(self._sql( 

354 'INSERT INTO temp.%(bulkinsert)s VALUES (?)'), 

355 ((db.relpath(x),) for x in paths)) 

356 

357 if show_progress: 

358 task = make_task('Preparing database', 5) 

359 task.update(0, condition='adding file names to database') 

360 

361 cursor.execute(self._sql( 

362 ''' 

363 INSERT OR IGNORE INTO files 

364 SELECT NULL, path, NULL, NULL, NULL 

365 FROM temp.%(bulkinsert)s 

366 ''')) 

367 

368 if show_progress: 

369 task.update(1, condition='pruning stale information') 

370 

371 cursor.execute(self._sql( 

372 ''' 

373 DELETE FROM %(db)s.%(file_states)s 

374 WHERE file_id IN ( 

375 SELECT files.file_id 

376 FROM temp.%(bulkinsert)s 

377 INNER JOIN files 

378 ON temp.%(bulkinsert)s.path == files.path) 

379 AND ( kind_mask != ? OR format != ? ) 

380 '''), (kind_mask, format)) 

381 

382 if show_progress: 

383 task.update(2, condition='adding file names to selection') 

384 

385 cursor.execute(self._sql( 

386 ''' 

387 INSERT OR IGNORE INTO %(db)s.%(file_states)s 

388 SELECT files.file_id, 0, ?, ? 

389 FROM temp.%(bulkinsert)s 

390 INNER JOIN files 

391 ON temp.%(bulkinsert)s.path == files.path 

392 '''), (kind_mask, format)) 

393 

394 if show_progress: 

395 task.update(3, condition='updating file states') 

396 

397 cursor.execute(self._sql( 

398 ''' 

399 UPDATE %(db)s.%(file_states)s 

400 SET file_state = 1 

401 WHERE file_id IN ( 

402 SELECT files.file_id 

403 FROM temp.%(bulkinsert)s 

404 INNER JOIN files 

405 ON temp.%(bulkinsert)s.path == files.path) 

406 AND file_state != 0 

407 ''')) 

408 

409 if show_progress: 

410 task.update(4, condition='dropping temporary data') 

411 

412 cursor.execute(self._sql( 

413 'DROP TABLE temp.%(bulkinsert)s')) 

414 

415 if show_progress: 

416 task.update(5) 

417 task.done() 

418 

419 def remove(self, paths): 

420 ''' 

421 Remove files from the selection. 

422 

423 :param paths: 

424 Paths to files to be removed from the selection. 

425 :type paths: 

426 :py:class:`list` of :py:class:`str` 

427 ''' 

428 if isinstance(paths, str): 

429 paths = [paths] 

430 

431 db = self.get_database() 

432 

433 def normpath(path): 

434 return db.relpath(abspath(path)) 

435 

436 with self.transaction('remove files') as cursor: 

437 cursor.executemany(self._sql( 

438 ''' 

439 DELETE FROM %(db)s.%(file_states)s 

440 WHERE %(db)s.%(file_states)s.file_id IN 

441 (SELECT files.file_id 

442 FROM files 

443 WHERE files.path == ?) 

444 '''), ((normpath(path),) for path in paths)) 

445 

446 def iter_paths(self, raw=False): 

447 ''' 

448 Iterate over all file paths currently belonging to the selection. 

449 

450 :param raw: 

451 By default absolute paths are yielded. Set to ``True`` to yield 

452 the path as it is stored in the database, which can be relative or 

453 absolute, depending on whether the file is within a Squirrel 

454 environment or outside. 

455 :type raw: 

456 bool 

457 

458 :yields: File paths. 

459 ''' 

460 

461 sql = self._sql(''' 

462 SELECT 

463 files.path 

464 FROM %(db)s.%(file_states)s 

465 INNER JOIN files 

466 ON files.file_id = %(db)s.%(file_states)s.file_id 

467 ORDER BY %(db)s.%(file_states)s.file_id 

468 ''') 

469 

470 if raw: 

471 def trans(path): 

472 return path 

473 else: 

474 db = self.get_database() 

475 trans = db.abspath 

476 

477 for values in self._conn.execute(sql): 

478 yield trans(values[0]) 

479 

480 def get_paths(self, raw=False): 

481 ''' 

482 Get all file paths currently belonging to the selection. 

483 

484 :param raw: 

485 By default absolute paths are returned. Set to ``True`` to return 

486 the path as it is stored in the database, which can be relative or 

487 absolute, depending on whether the file is within a Squirrel 

488 environment or outside. 

489 :type raw: 

490 bool 

491 

492 :returns: List of file paths. 

493 ''' 

494 return list(self.iter_paths(raw=raw)) 

495 

496 def _set_file_states_known(self, transaction=None): 

497 ''' 

498 Set file states to "known" (2). 

499 ''' 

500 with (transaction or self.transaction('set file states known')) \ 

501 as cursor: 

502 cursor.execute(self._sql( 

503 ''' 

504 UPDATE %(db)s.%(file_states)s 

505 SET file_state = 2 

506 WHERE file_state < 2 

507 ''')) 

508 

509 def _set_file_states_force_check(self, paths=None, transaction=None): 

510 ''' 

511 Set file states to "request force check" (1). 

512 ''' 

513 

514 with (transaction or self.transaction('set file states force check')) \ 

515 as cursor: 

516 

517 if paths is None: 

518 cursor.execute(self._sql( 

519 ''' 

520 UPDATE %(db)s.%(file_states)s 

521 SET file_state = 1 

522 ''')) 

523 else: 

524 db = self.get_database() 

525 

526 def normpath(path): 

527 return db.relpath(abspath(path)) 

528 

529 cursor.executemany(self._sql( 

530 ''' 

531 UPDATE %(db)s.%(file_states)s 

532 SET file_state = 1 

533 WHERE %(db)s.%(file_states)s.file_id IN 

534 (SELECT files.file_id 

535 FROM files 

536 WHERE files.path == ?) 

537 '''), ((normpath(path),) for path in paths)) 

538 

539 def undig_grouped(self, skip_unchanged=False): 

540 ''' 

541 Get inventory of cached content for all files in the selection. 

542 

543 :param skip_unchanged: 

544 If ``True`` only inventory of modified files is 

545 yielded (:py:meth:`flag_modified` must be called beforehand). 

546 :type skip_unchanged: 

547 bool 

548 

549 This generator yields tuples ``((format, path), nuts)`` where ``path`` 

550 is the path to the file, ``format`` is the format assignation or 

551 ``'detect'`` and ``nuts`` is a list of 

552 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the 

553 contents of the file. 

554 ''' 

555 

556 if skip_unchanged: 

557 where = ''' 

558 WHERE %(db)s.%(file_states)s.file_state == 0 

559 ''' 

560 else: 

561 where = '' 

562 

563 nfiles = execute_get1(self._conn, self._sql(''' 

564 SELECT 

565 COUNT() 

566 FROM %(db)s.%(file_states)s 

567 ''' + where), ())[0] 

568 

569 def gen(): 

570 sql = self._sql(''' 

571 SELECT 

572 %(db)s.%(file_states)s.format, 

573 files.path, 

574 files.format, 

575 files.mtime, 

576 files.size, 

577 nuts.file_segment, 

578 nuts.file_element, 

579 kind_codes.kind_id, 

580 kind_codes.codes, 

581 nuts.tmin_seconds, 

582 nuts.tmin_offset, 

583 nuts.tmax_seconds, 

584 nuts.tmax_offset, 

585 kind_codes.deltat 

586 FROM %(db)s.%(file_states)s 

587 LEFT OUTER JOIN files 

588 ON %(db)s.%(file_states)s.file_id = files.file_id 

589 LEFT OUTER JOIN nuts 

590 ON files.file_id = nuts.file_id 

591 LEFT OUTER JOIN kind_codes 

592 ON nuts.kind_codes_id == kind_codes.kind_codes_id 

593 ''' + where + ''' 

594 ORDER BY %(db)s.%(file_states)s.file_id 

595 ''') 

596 

597 nuts = [] 

598 format_path = None 

599 db = self.get_database() 

600 for values in self._conn.execute(sql): 

601 apath = db.abspath(values[1]) 

602 if format_path is not None and apath != format_path[1]: 

603 yield format_path, nuts 

604 nuts = [] 

605 

606 format_path = values[0], apath 

607 

608 if values[2] is not None: 

609 nuts.append(model.Nut( 

610 values_nocheck=format_path[1:2] + values[2:])) 

611 

612 if format_path is not None: 

613 yield format_path, nuts 

614 

615 return GeneratorWithLen(gen(), nfiles) 

616 

617 def flag_modified(self, check=True): 

618 ''' 

619 Mark files which have been modified. 

620 

621 :param check: 

622 If ``True`` query modification times of known files on disk. If 

623 ``False``, only flag unknown files. 

624 :type check: 

625 bool 

626 

627 Assumes file state is 0 for newly added files, 1 for files added again 

628 to the selection (forces check), or 2 for all others (no checking is 

629 done for those). 

630 

631 Sets file state to 0 for unknown or modified files, 2 for known and not 

632 modified files. 

633 ''' 

634 

635 db = self.get_database() 

636 with self.transaction('flag modified') as cursor: 

637 sql = self._sql(''' 

638 UPDATE %(db)s.%(file_states)s 

639 SET file_state = 0 

640 WHERE ( 

641 SELECT mtime 

642 FROM files 

643 WHERE 

644 files.file_id == %(db)s.%(file_states)s.file_id) IS NULL 

645 AND file_state == 1 

646 ''') 

647 

648 cursor.execute(sql) 

649 

650 if not check: 

651 

652 sql = self._sql(''' 

653 UPDATE %(db)s.%(file_states)s 

654 SET file_state = 2 

655 WHERE file_state == 1 

656 ''') 

657 

658 cursor.execute(sql) 

659 

660 return 

661 

662 def iter_file_states(): 

663 sql = self._sql(''' 

664 SELECT 

665 files.file_id, 

666 files.path, 

667 files.format, 

668 files.mtime, 

669 files.size 

670 FROM %(db)s.%(file_states)s 

671 INNER JOIN files 

672 ON %(db)s.%(file_states)s.file_id == files.file_id 

673 WHERE %(db)s.%(file_states)s.file_state == 1 

674 ORDER BY %(db)s.%(file_states)s.file_id 

675 ''') 

676 

677 for (file_id, path, fmt, mtime_db, 

678 size_db) in self._conn.execute(sql): 

679 

680 path = db.abspath(path) 

681 try: 

682 mod = io.get_backend(fmt) 

683 file_stats = mod.get_stats(path) 

684 

685 except FileLoadError: 

686 yield 0, file_id 

687 continue 

688 except io.UnknownFormat: 

689 continue 

690 

691 if (mtime_db, size_db) != file_stats: 

692 yield 0, file_id 

693 else: 

694 yield 2, file_id 

695 

696 # could better use callback function here... 

697 

698 sql = self._sql(''' 

699 UPDATE %(db)s.%(file_states)s 

700 SET file_state = ? 

701 WHERE file_id = ? 

702 ''') 

703 

704 cursor.executemany(sql, iter_file_states()) 

705 

706 

707__all__ = [ 

708 'Selection', 

709]