1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6from __future__ import absolute_import, print_function 

7 

8import os 

9import re 

10import threading 

11import logging 

12 

13from pyrocko import util 

14from pyrocko.io.io_common import FileLoadError 

15from pyrocko.progress import progress 

16 

17from . import error, io, model 

18from .database import Database, get_database, execute_get1, abspath 

19 

20logger = logging.getLogger('psq.selection') 

21 

22g_icount = 0 

23g_lock = threading.Lock() 

24 

25re_persistent_name = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]{0,64}$') 

26 

27 

28def make_unique_name(): 

29 with g_lock: 

30 global g_icount 

31 name = '%i_%i' % (os.getpid(), g_icount) 

32 g_icount += 1 

33 

34 return name 

35 

36 

37def make_task(*args): 

38 return progress.task(*args, logger=logger) 

39 

40 

41doc_snippets = dict( 

42 query_args=''' 

43 :param obj: 

44 Object providing ``tmin``, ``tmax`` and ``codes`` to be used to 

45 constrain the query. Direct arguments override those from ``obj``. 

46 :type obj: 

47 any object with attributes ``tmin``, ``tmax`` and ``codes`` 

48 

49 :param tmin: 

50 Start time of query interval. 

51 :type tmin: 

52 timestamp 

53 

54 :param tmax: 

55 End time of query interval. 

56 :type tmax: 

57 timestamp 

58 

59 :param time: 

60 Time instant to query. Equivalent to setting ``tmin`` and ``tmax`` 

61 to the same value. 

62 :type time: 

63 timestamp 

64 

65 :param codes: 

66 Pattern of content codes to query. 

67 :type codes: 

68 :class:`list` of :py:class:`~pyrocko.squirrel.model.Codes` 

69 objects appropriate for the queried content type, or anything which 

70 can be converted to such objects. 

71''', 

72 file_formats=', '.join( 

73 "``'%s'``" % fmt for fmt in io.supported_formats())) 

74 

75 

76def filldocs(meth): 

77 meth.__doc__ %= doc_snippets 

78 return meth 

79 

80 

81class GeneratorWithLen(object): 

82 

83 def __init__(self, gen, length): 

84 self.gen = gen 

85 self.length = length 

86 

87 def __len__(self): 

88 return self.length 

89 

90 def __iter__(self): 

91 return self.gen 

92 

93 

94class Selection(object): 

95 

96 ''' 

97 Database backed file selection (base class for 

98 :py:class:`~pyrocko.squirrel.base.Squirrel`). 

99 

100 :param database: 

101 Database instance or file path to database. 

102 :type database: 

103 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str` 

104 

105 :param persistent: 

106 If given a name, create a persistent selection. 

107 :type persistent: 

108 :py:class:`str` 

109 

110 A selection in this context represents the list of files available to the 

111 application. Instead of using :py:class:`Selection` directly, user 

112 applications should usually use its subclass 

113 :py:class:`~pyrocko.squirrel.base.Squirrel` which adds content indices to 

114 the selection and provides high level data querying. 

115 

116 By default, a temporary table in the database is created to hold the names 

117 of the files in the selection. This table is only visible inside the 

118 application which created it. If a name is given to ``persistent``, a named 

119 selection is created, which is visible also in other applications using the 

120 same database. 

121 

122 Besides the filename references, desired content kind masks and file format 

123 indications are stored in the selection's database table to make the user 

124 choice regarding these options persistent on a per-file basis. Book-keeping 

125 on whether files are unknown, known or if modification checks are forced is 

126 handled in the selection's file-state table. 

127 

128 Paths of files can be added to the selection using the :py:meth:`add` 

129 method and removed with :py:meth:`remove`. :py:meth:`undig_grouped` can be 

130 used to iterate over all content known to the selection. 

131 ''' 

132 

133 def __init__(self, database, persistent=None): 

134 self._conn = None 

135 

136 if not isinstance(database, Database): 

137 database = get_database(database) 

138 

139 if persistent is not None: 

140 assert isinstance(persistent, str) 

141 if not re_persistent_name.match(persistent): 

142 raise error.SquirrelError( 

143 'invalid persistent selection name: %s' % persistent) 

144 

145 self.name = 'psel_' + persistent 

146 else: 

147 self.name = 'sel_' + make_unique_name() 

148 

149 self._persistent = persistent 

150 self._database = database 

151 self._conn = self._database.get_connection() 

152 self._sources = [] 

153 self._is_new = True 

154 self._volatile_paths = [] 

155 

156 with self.transaction('init selection') as cursor: 

157 

158 if persistent is not None: 

159 self._is_new = 1 == cursor.execute( 

160 ''' 

161 INSERT OR IGNORE INTO persistent VALUES (?) 

162 ''', (persistent,)).rowcount 

163 

164 self._names = { 

165 'db': 'main' if self._persistent else 'temp', 

166 'file_states': self.name + '_file_states', 

167 'bulkinsert': self.name + '_bulkinsert'} 

168 

169 cursor.execute(self._register_table(self._sql( 

170 ''' 

171 CREATE TABLE IF NOT EXISTS %(db)s.%(file_states)s ( 

172 file_id integer PRIMARY KEY, 

173 file_state integer, 

174 kind_mask integer, 

175 format text) 

176 '''))) 

177 

178 cursor.execute(self._sql( 

179 ''' 

180 CREATE INDEX 

181 IF NOT EXISTS %(db)s.%(file_states)s_index_file_state 

182 ON %(file_states)s (file_state) 

183 ''')) 

184 

185 def __del__(self): 

186 if hasattr(self, '_conn') and self._conn: 

187 self._cleanup() 

188 if not self._persistent: 

189 self._delete() 

190 

191 def _register_table(self, s): 

192 return self._database._register_table(s) 

193 

194 def _sql(self, s): 

195 return s % self._names 

196 

197 def transaction(self, label='', mode='immediate'): 

198 return self._database.transaction(label, mode) 

199 

200 def is_new(self): 

201 ''' 

202 Is this a new selection? 

203 

204 Always ``True`` for non-persistent selections. Only ``False`` for 

205 a persistent selection which already existed in the database when the 

206 it was initialized. 

207 ''' 

208 return self._is_new 

209 

210 def get_database(self): 

211 ''' 

212 Get the database to which this selection belongs. 

213 

214 :returns: :py:class:`~pyrocko.squirrel.database.Database` object 

215 ''' 

216 return self._database 

217 

218 def _cleanup(self): 

219 ''' 

220 Perform cleanup actions before database connection is closed. 

221 

222 Removes volatile content from database. 

223 ''' 

224 

225 while self._volatile_paths: 

226 path = self._volatile_paths.pop() 

227 self._database.remove(path) 

228 

229 def _delete(self): 

230 ''' 

231 Destroy the tables assoctiated with this selection. 

232 ''' 

233 with self.transaction('delete selection') as cursor: 

234 cursor.execute(self._sql( 

235 'DROP TABLE %(db)s.%(file_states)s')) 

236 

237 if self._persistent: 

238 cursor.execute( 

239 ''' 

240 DELETE FROM persistent WHERE name == ? 

241 ''', (self.name[5:],)) 

242 

243 self._conn = None 

244 

245 def delete(self): 

246 self._delete() 

247 

248 @filldocs 

249 def add( 

250 self, 

251 paths, 

252 kind_mask=model.g_kind_mask_all, 

253 format='detect', 

254 show_progress=True): 

255 

256 ''' 

257 Add files to the selection. 

258 

259 :param paths: 

260 Paths to files to be added to the selection. 

261 :type paths: 

262 iterator yielding :py:class:`str` objects 

263 

264 :param kind_mask: 

265 Content kinds to be added to the selection. 

266 :type kind_mask: 

267 :py:class:`int` (bit mask) 

268 

269 :param format: 

270 File format identifier or ``'detect'`` to enable auto-detection 

271 (available: %(file_formats)s). 

272 :type format: 

273 str 

274 ''' 

275 

276 if isinstance(paths, str): 

277 paths = [paths] 

278 

279 if show_progress: 

280 task = make_task('Gathering file names') 

281 paths = task(paths) 

282 

283 paths = util.short_to_list(200, paths) 

284 

285 db = self.get_database() 

286 with self.transaction('add files') as cursor: 

287 

288 if isinstance(paths, list) and len(paths) <= 200: 

289 

290 paths = [db.relpath(path) for path in paths] 

291 

292 # short non-iterator paths: can do without temp table 

293 

294 cursor.executemany( 

295 ''' 

296 INSERT OR IGNORE INTO files 

297 VALUES (NULL, ?, NULL, NULL, NULL) 

298 ''', ((x,) for x in paths)) 

299 

300 if show_progress: 

301 task = make_task('Preparing database', 3) 

302 task.update(0, condition='pruning stale information') 

303 

304 cursor.executemany(self._sql( 

305 ''' 

306 DELETE FROM %(db)s.%(file_states)s 

307 WHERE file_id IN ( 

308 SELECT files.file_id 

309 FROM files 

310 WHERE files.path == ? ) 

311 AND ( kind_mask != ? OR format != ? ) 

312 '''), ( 

313 (path, kind_mask, format) for path in paths)) 

314 

315 if show_progress: 

316 task.update(1, condition='adding file names to selection') 

317 

318 cursor.executemany(self._sql( 

319 ''' 

320 INSERT OR IGNORE INTO %(db)s.%(file_states)s 

321 SELECT files.file_id, 0, ?, ? 

322 FROM files 

323 WHERE files.path = ? 

324 '''), ((kind_mask, format, path) for path in paths)) 

325 

326 if show_progress: 

327 task.update(2, condition='updating file states') 

328 

329 cursor.executemany(self._sql( 

330 ''' 

331 UPDATE %(db)s.%(file_states)s 

332 SET file_state = 1 

333 WHERE file_id IN ( 

334 SELECT files.file_id 

335 FROM files 

336 WHERE files.path == ? ) 

337 AND file_state != 0 

338 '''), ((path,) for path in paths)) 

339 

340 if show_progress: 

341 task.update(3) 

342 task.done() 

343 

344 else: 

345 

346 cursor.execute(self._sql( 

347 ''' 

348 CREATE TEMP TABLE temp.%(bulkinsert)s 

349 (path text) 

350 ''')) 

351 

352 cursor.executemany(self._sql( 

353 'INSERT INTO temp.%(bulkinsert)s VALUES (?)'), 

354 ((db.relpath(x),) for x in paths)) 

355 

356 if show_progress: 

357 task = make_task('Preparing database', 5) 

358 task.update(0, condition='adding file names to database') 

359 

360 cursor.execute(self._sql( 

361 ''' 

362 INSERT OR IGNORE INTO files 

363 SELECT NULL, path, NULL, NULL, NULL 

364 FROM temp.%(bulkinsert)s 

365 ''')) 

366 

367 if show_progress: 

368 task.update(1, condition='pruning stale information') 

369 

370 cursor.execute(self._sql( 

371 ''' 

372 DELETE FROM %(db)s.%(file_states)s 

373 WHERE file_id IN ( 

374 SELECT files.file_id 

375 FROM temp.%(bulkinsert)s 

376 INNER JOIN files 

377 ON temp.%(bulkinsert)s.path == files.path) 

378 AND ( kind_mask != ? OR format != ? ) 

379 '''), (kind_mask, format)) 

380 

381 if show_progress: 

382 task.update(2, condition='adding file names to selection') 

383 

384 cursor.execute(self._sql( 

385 ''' 

386 INSERT OR IGNORE INTO %(db)s.%(file_states)s 

387 SELECT files.file_id, 0, ?, ? 

388 FROM temp.%(bulkinsert)s 

389 INNER JOIN files 

390 ON temp.%(bulkinsert)s.path == files.path 

391 '''), (kind_mask, format)) 

392 

393 if show_progress: 

394 task.update(3, condition='updating file states') 

395 

396 cursor.execute(self._sql( 

397 ''' 

398 UPDATE %(db)s.%(file_states)s 

399 SET file_state = 1 

400 WHERE file_id IN ( 

401 SELECT files.file_id 

402 FROM temp.%(bulkinsert)s 

403 INNER JOIN files 

404 ON temp.%(bulkinsert)s.path == files.path) 

405 AND file_state != 0 

406 ''')) 

407 

408 if show_progress: 

409 task.update(4, condition='dropping temporary data') 

410 

411 cursor.execute(self._sql( 

412 'DROP TABLE temp.%(bulkinsert)s')) 

413 

414 if show_progress: 

415 task.update(5) 

416 task.done() 

417 

418 def remove(self, paths): 

419 ''' 

420 Remove files from the selection. 

421 

422 :param paths: 

423 Paths to files to be removed from the selection. 

424 :type paths: 

425 :py:class:`list` of :py:class:`str` 

426 ''' 

427 if isinstance(paths, str): 

428 paths = [paths] 

429 

430 db = self.get_database() 

431 

432 def normpath(path): 

433 return db.relpath(abspath(path)) 

434 

435 with self.transaction('remove files') as cursor: 

436 cursor.executemany(self._sql( 

437 ''' 

438 DELETE FROM %(db)s.%(file_states)s 

439 WHERE %(db)s.%(file_states)s.file_id IN 

440 (SELECT files.file_id 

441 FROM files 

442 WHERE files.path == ?) 

443 '''), ((normpath(path),) for path in paths)) 

444 

445 def iter_paths(self, raw=False): 

446 ''' 

447 Iterate over all file paths currently belonging to the selection. 

448 

449 :param raw: 

450 By default absolute paths are yielded. Set to ``True`` to yield 

451 the path as it is stored in the database, which can be relative or 

452 absolute, depending on whether the file is within a Squirrel 

453 environment or outside. 

454 :type raw: 

455 bool 

456 

457 :yields: File paths. 

458 ''' 

459 

460 sql = self._sql(''' 

461 SELECT 

462 files.path 

463 FROM %(db)s.%(file_states)s 

464 INNER JOIN files 

465 ON files.file_id = %(db)s.%(file_states)s.file_id 

466 ORDER BY %(db)s.%(file_states)s.file_id 

467 ''') 

468 

469 if raw: 

470 def trans(path): 

471 return path 

472 else: 

473 db = self.get_database() 

474 trans = db.abspath 

475 

476 for values in self._conn.execute(sql): 

477 yield trans(values[0]) 

478 

479 def get_paths(self, raw=False): 

480 ''' 

481 Get all file paths currently belonging to the selection. 

482 

483 :param raw: 

484 By default absolute paths are returned. Set to ``True`` to return 

485 the path as it is stored in the database, which can be relative or 

486 absolute, depending on whether the file is within a Squirrel 

487 environment or outside. 

488 :type raw: 

489 bool 

490 

491 :returns: List of file paths. 

492 ''' 

493 return list(self.iter_paths(raw=raw)) 

494 

495 def _set_file_states_known(self, transaction=None): 

496 ''' 

497 Set file states to "known" (2). 

498 ''' 

499 with (transaction or self.transaction('set file states known')) \ 

500 as cursor: 

501 cursor.execute(self._sql( 

502 ''' 

503 UPDATE %(db)s.%(file_states)s 

504 SET file_state = 2 

505 WHERE file_state < 2 

506 ''')) 

507 

508 def _set_file_states_force_check(self, paths=None, transaction=None): 

509 ''' 

510 Set file states to "request force check" (1). 

511 ''' 

512 

513 with (transaction or self.transaction('set file states force check')) \ 

514 as cursor: 

515 

516 if paths is None: 

517 cursor.execute(self._sql( 

518 ''' 

519 UPDATE %(db)s.%(file_states)s 

520 SET file_state = 1 

521 ''')) 

522 else: 

523 db = self.get_database() 

524 

525 def normpath(path): 

526 return db.relpath(abspath(path)) 

527 

528 cursor.executemany(self._sql( 

529 ''' 

530 UPDATE %(db)s.%(file_states)s 

531 SET file_state = 1 

532 WHERE %(db)s.%(file_states)s.file_id IN 

533 (SELECT files.file_id 

534 FROM files 

535 WHERE files.path == ?) 

536 '''), ((normpath(path),) for path in paths)) 

537 

538 def undig_grouped(self, skip_unchanged=False): 

539 ''' 

540 Get inventory of cached content for all files in the selection. 

541 

542 :param skip_unchanged: 

543 If ``True`` only inventory of modified files is 

544 yielded (:py:meth:`flag_modified` must be called beforehand). 

545 :type skip_unchanged: 

546 bool 

547 

548 This generator yields tuples ``((format, path), nuts)`` where ``path`` 

549 is the path to the file, ``format`` is the format assignation or 

550 ``'detect'`` and ``nuts`` is a list of 

551 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the 

552 contents of the file. 

553 ''' 

554 

555 if skip_unchanged: 

556 where = ''' 

557 WHERE %(db)s.%(file_states)s.file_state == 0 

558 ''' 

559 else: 

560 where = '' 

561 

562 nfiles = execute_get1(self._conn, self._sql(''' 

563 SELECT 

564 COUNT() 

565 FROM %(db)s.%(file_states)s 

566 ''' + where), ())[0] 

567 

568 def gen(): 

569 sql = self._sql(''' 

570 SELECT 

571 %(db)s.%(file_states)s.format, 

572 files.path, 

573 files.format, 

574 files.mtime, 

575 files.size, 

576 nuts.file_segment, 

577 nuts.file_element, 

578 kind_codes.kind_id, 

579 kind_codes.codes, 

580 nuts.tmin_seconds, 

581 nuts.tmin_offset, 

582 nuts.tmax_seconds, 

583 nuts.tmax_offset, 

584 kind_codes.deltat 

585 FROM %(db)s.%(file_states)s 

586 LEFT OUTER JOIN files 

587 ON %(db)s.%(file_states)s.file_id = files.file_id 

588 LEFT OUTER JOIN nuts 

589 ON files.file_id = nuts.file_id 

590 LEFT OUTER JOIN kind_codes 

591 ON nuts.kind_codes_id == kind_codes.kind_codes_id 

592 ''' + where + ''' 

593 ORDER BY %(db)s.%(file_states)s.file_id 

594 ''') 

595 

596 nuts = [] 

597 format_path = None 

598 db = self.get_database() 

599 for values in self._conn.execute(sql): 

600 apath = db.abspath(values[1]) 

601 if format_path is not None and apath != format_path[1]: 

602 yield format_path, nuts 

603 nuts = [] 

604 

605 format_path = values[0], apath 

606 

607 if values[2] is not None: 

608 nuts.append(model.Nut( 

609 values_nocheck=format_path[1:2] + values[2:])) 

610 

611 if format_path is not None: 

612 yield format_path, nuts 

613 

614 return GeneratorWithLen(gen(), nfiles) 

615 

616 def flag_modified(self, check=True): 

617 ''' 

618 Mark files which have been modified. 

619 

620 :param check: 

621 If ``True`` query modification times of known files on disk. If 

622 ``False``, only flag unknown files. 

623 :type check: 

624 bool 

625 

626 Assumes file state is 0 for newly added files, 1 for files added again 

627 to the selection (forces check), or 2 for all others (no checking is 

628 done for those). 

629 

630 Sets file state to 0 for unknown or modified files, 2 for known and not 

631 modified files. 

632 ''' 

633 

634 db = self.get_database() 

635 with self.transaction('flag modified') as cursor: 

636 sql = self._sql(''' 

637 UPDATE %(db)s.%(file_states)s 

638 SET file_state = 0 

639 WHERE ( 

640 SELECT mtime 

641 FROM files 

642 WHERE 

643 files.file_id == %(db)s.%(file_states)s.file_id) IS NULL 

644 AND file_state == 1 

645 ''') 

646 

647 cursor.execute(sql) 

648 

649 if not check: 

650 

651 sql = self._sql(''' 

652 UPDATE %(db)s.%(file_states)s 

653 SET file_state = 2 

654 WHERE file_state == 1 

655 ''') 

656 

657 cursor.execute(sql) 

658 

659 return 

660 

661 def iter_file_states(): 

662 sql = self._sql(''' 

663 SELECT 

664 files.file_id, 

665 files.path, 

666 files.format, 

667 files.mtime, 

668 files.size 

669 FROM %(db)s.%(file_states)s 

670 INNER JOIN files 

671 ON %(db)s.%(file_states)s.file_id == files.file_id 

672 WHERE %(db)s.%(file_states)s.file_state == 1 

673 ORDER BY %(db)s.%(file_states)s.file_id 

674 ''') 

675 

676 for (file_id, path, fmt, mtime_db, 

677 size_db) in self._conn.execute(sql): 

678 

679 path = db.abspath(path) 

680 try: 

681 mod = io.get_backend(fmt) 

682 file_stats = mod.get_stats(path) 

683 

684 except FileLoadError: 

685 yield 0, file_id 

686 continue 

687 except io.UnknownFormat: 

688 continue 

689 

690 if (mtime_db, size_db) != file_stats: 

691 yield 0, file_id 

692 else: 

693 yield 2, file_id 

694 

695 # could better use callback function here... 

696 

697 sql = self._sql(''' 

698 UPDATE %(db)s.%(file_states)s 

699 SET file_state = ? 

700 WHERE file_id = ? 

701 ''') 

702 

703 cursor.executemany(sql, iter_file_states()) 

704 

705 

706__all__ = [ 

707 'Selection', 

708]