1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import os 

7import re 

8import threading 

9import logging 

10 

11from pyrocko import util 

12from pyrocko.io.io_common import FileLoadError 

13from pyrocko.progress import progress 

14 

15from . import error, io, model 

16from .database import Database, get_database, execute_get1, abspath 

17 

18logger = logging.getLogger('psq.selection') 

19 

20g_icount = 0 

21g_lock = threading.Lock() 

22 

23re_persistent_name = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]{0,64}$') 

24 

25 

26def make_unique_name(): 

27 with g_lock: 

28 global g_icount 

29 name = '%i_%i' % (os.getpid(), g_icount) 

30 g_icount += 1 

31 

32 return name 

33 

34 

35def make_task(*args): 

36 return progress.task(*args, logger=logger) 

37 

38 

39doc_snippets = dict( 

40 query_args=''' 

41 :param obj: 

42 Object providing ``tmin``, ``tmax`` and ``codes`` to be used to 

43 constrain the query. Direct arguments override those from ``obj``. 

44 :type obj: 

45 any object with attributes ``tmin``, ``tmax`` and ``codes`` 

46 

47 :param tmin: 

48 Start time of query interval. 

49 :type tmin: 

50 timestamp 

51 

52 :param tmax: 

53 End time of query interval. 

54 :type tmax: 

55 timestamp 

56 

57 :param time: 

58 Time instant to query. Equivalent to setting ``tmin`` and ``tmax`` 

59 to the same value. 

60 :type time: 

61 timestamp 

62 

63 :param codes: 

64 Pattern of content codes to query. 

65 :type codes: 

66 :class:`list` of :py:class:`~pyrocko.squirrel.model.Codes` 

67 objects appropriate for the queried content type, or anything which 

68 can be converted to such objects. 

69''', 

70 file_formats=', '.join( 

71 "``'%s'``" % fmt for fmt in io.supported_formats())) 

72 

73 

74def filldocs(meth): 

75 meth.__doc__ %= doc_snippets 

76 return meth 

77 

78 

79class GeneratorWithLen(object): 

80 

81 def __init__(self, gen, length): 

82 self.gen = gen 

83 self.length = length 

84 

85 def __len__(self): 

86 return self.length 

87 

88 def __iter__(self): 

89 return self.gen 

90 

91 

92class Selection(object): 

93 

94 ''' 

95 Database backed file selection (base class for 

96 :py:class:`~pyrocko.squirrel.base.Squirrel`). 

97 

98 :param database: 

99 Database instance or file path to database. 

100 :type database: 

101 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str` 

102 

103 :param persistent: 

104 If given a name, create a persistent selection. 

105 :type persistent: 

106 :py:class:`str` 

107 

108 A selection in this context represents the list of files available to the 

109 application. Instead of using :py:class:`Selection` directly, user 

110 applications should usually use its subclass 

111 :py:class:`~pyrocko.squirrel.base.Squirrel` which adds content indices to 

112 the selection and provides high level data querying. 

113 

114 By default, a temporary table in the database is created to hold the names 

115 of the files in the selection. This table is only visible inside the 

116 application which created it. If a name is given to ``persistent``, a named 

117 selection is created, which is visible also in other applications using the 

118 same database. 

119 

120 Besides the filename references, desired content kind masks and file format 

121 indications are stored in the selection's database table to make the user 

122 choice regarding these options persistent on a per-file basis. Book-keeping 

123 on whether files are unknown, known or if modification checks are forced is 

124 handled in the selection's file-state table. 

125 

126 Paths of files can be added to the selection using the :py:meth:`add` 

127 method and removed with :py:meth:`remove`. :py:meth:`undig_grouped` can be 

128 used to iterate over all content known to the selection. 

129 ''' 

130 

131 def __init__(self, database, persistent=None): 

132 self._conn = None 

133 

134 if not isinstance(database, Database): 

135 database = get_database(database) 

136 

137 if persistent is not None: 

138 assert isinstance(persistent, str) 

139 if not re_persistent_name.match(persistent): 

140 raise error.SquirrelError( 

141 'invalid persistent selection name: %s' % persistent) 

142 

143 self.name = 'psel_' + persistent 

144 else: 

145 self.name = 'sel_' + make_unique_name() 

146 

147 self._persistent = persistent 

148 self._database = database 

149 self._conn = self._database.get_connection() 

150 self._sources = [] 

151 self._is_new = True 

152 self._volatile_paths = [] 

153 

154 with self.transaction('init selection') as cursor: 

155 

156 if persistent is not None: 

157 self._is_new = 1 == cursor.execute( 

158 ''' 

159 INSERT OR IGNORE INTO persistent VALUES (?) 

160 ''', (persistent,)).rowcount 

161 

162 self._names = { 

163 'db': 'main' if self._persistent else 'temp', 

164 'file_states': self.name + '_file_states', 

165 'bulkinsert': self.name + '_bulkinsert'} 

166 

167 cursor.execute(self._register_table(self._sql( 

168 ''' 

169 CREATE TABLE IF NOT EXISTS %(db)s.%(file_states)s ( 

170 file_id integer PRIMARY KEY, 

171 file_state integer, 

172 kind_mask integer, 

173 format text) 

174 '''))) 

175 

176 cursor.execute(self._sql( 

177 ''' 

178 CREATE INDEX 

179 IF NOT EXISTS %(db)s.%(file_states)s_index_file_state 

180 ON %(file_states)s (file_state) 

181 ''')) 

182 

183 def __del__(self): 

184 if hasattr(self, '_conn') and self._conn: 

185 self._cleanup() 

186 if not self._persistent: 

187 self._delete() 

188 

189 def _register_table(self, s): 

190 return self._database._register_table(s) 

191 

192 def _sql(self, s): 

193 return s % self._names 

194 

195 def transaction(self, label='', mode='immediate'): 

196 return self._database.transaction(label, mode) 

197 

198 def is_new(self): 

199 ''' 

200 Is this a new selection? 

201 

202 Always ``True`` for non-persistent selections. Only ``False`` for 

203 a persistent selection which already existed in the database when the 

204 it was initialized. 

205 ''' 

206 return self._is_new 

207 

208 def get_database(self): 

209 ''' 

210 Get the database to which this selection belongs. 

211 

212 :returns: :py:class:`~pyrocko.squirrel.database.Database` object 

213 ''' 

214 return self._database 

215 

216 def _cleanup(self): 

217 ''' 

218 Perform cleanup actions before database connection is closed. 

219 

220 Removes volatile content from database. 

221 ''' 

222 

223 while self._volatile_paths: 

224 path = self._volatile_paths.pop() 

225 self._database.remove(path) 

226 

227 def _delete(self): 

228 ''' 

229 Destroy the tables assoctiated with this selection. 

230 ''' 

231 with self.transaction('delete selection') as cursor: 

232 cursor.execute(self._sql( 

233 'DROP TABLE %(db)s.%(file_states)s')) 

234 

235 if self._persistent: 

236 cursor.execute( 

237 ''' 

238 DELETE FROM persistent WHERE name == ? 

239 ''', (self.name[5:],)) 

240 

241 self._conn = None 

242 

243 def delete(self): 

244 self._delete() 

245 

246 @filldocs 

247 def add( 

248 self, 

249 paths, 

250 kind_mask=model.g_kind_mask_all, 

251 format='detect', 

252 show_progress=True): 

253 

254 ''' 

255 Add files to the selection. 

256 

257 :param paths: 

258 Paths to files to be added to the selection. 

259 :type paths: 

260 iterator yielding :py:class:`str` objects 

261 

262 :param kind_mask: 

263 Content kinds to be added to the selection. 

264 :type kind_mask: 

265 :py:class:`int` (bit mask) 

266 

267 :param format: 

268 File format identifier or ``'detect'`` to enable auto-detection 

269 (available: %(file_formats)s). 

270 :type format: 

271 str 

272 ''' 

273 

274 if isinstance(paths, str): 

275 paths = [paths] 

276 

277 if show_progress: 

278 task = make_task('Gathering file names') 

279 paths = task(paths) 

280 

281 paths = util.short_to_list(200, paths) 

282 

283 db = self.get_database() 

284 with self.transaction('add files') as cursor: 

285 

286 if isinstance(paths, list) and len(paths) <= 200: 

287 

288 paths = [db.relpath(path) for path in paths] 

289 

290 # short non-iterator paths: can do without temp table 

291 

292 cursor.executemany( 

293 ''' 

294 INSERT OR IGNORE INTO files 

295 VALUES (NULL, ?, NULL, NULL, NULL) 

296 ''', ((x,) for x in paths)) 

297 

298 if show_progress: 

299 task = make_task('Preparing database', 3) 

300 task.update(0, condition='pruning stale information') 

301 

302 cursor.executemany(self._sql( 

303 ''' 

304 DELETE FROM %(db)s.%(file_states)s 

305 WHERE file_id IN ( 

306 SELECT files.file_id 

307 FROM files 

308 WHERE files.path == ? ) 

309 AND ( kind_mask != ? OR format != ? ) 

310 '''), ( 

311 (path, kind_mask, format) for path in paths)) 

312 

313 if show_progress: 

314 task.update(1, condition='adding file names to selection') 

315 

316 cursor.executemany(self._sql( 

317 ''' 

318 INSERT OR IGNORE INTO %(db)s.%(file_states)s 

319 SELECT files.file_id, 0, ?, ? 

320 FROM files 

321 WHERE files.path = ? 

322 '''), ((kind_mask, format, path) for path in paths)) 

323 

324 if show_progress: 

325 task.update(2, condition='updating file states') 

326 

327 cursor.executemany(self._sql( 

328 ''' 

329 UPDATE %(db)s.%(file_states)s 

330 SET file_state = 1 

331 WHERE file_id IN ( 

332 SELECT files.file_id 

333 FROM files 

334 WHERE files.path == ? ) 

335 AND file_state != 0 

336 '''), ((path,) for path in paths)) 

337 

338 if show_progress: 

339 task.update(3) 

340 task.done() 

341 

342 else: 

343 

344 cursor.execute(self._sql( 

345 ''' 

346 CREATE TEMP TABLE temp.%(bulkinsert)s 

347 (path text) 

348 ''')) 

349 

350 cursor.executemany(self._sql( 

351 'INSERT INTO temp.%(bulkinsert)s VALUES (?)'), 

352 ((db.relpath(x),) for x in paths)) 

353 

354 if show_progress: 

355 task = make_task('Preparing database', 5) 

356 task.update(0, condition='adding file names to database') 

357 

358 cursor.execute(self._sql( 

359 ''' 

360 INSERT OR IGNORE INTO files 

361 SELECT NULL, path, NULL, NULL, NULL 

362 FROM temp.%(bulkinsert)s 

363 ''')) 

364 

365 if show_progress: 

366 task.update(1, condition='pruning stale information') 

367 

368 cursor.execute(self._sql( 

369 ''' 

370 DELETE FROM %(db)s.%(file_states)s 

371 WHERE file_id IN ( 

372 SELECT files.file_id 

373 FROM temp.%(bulkinsert)s 

374 INNER JOIN files 

375 ON temp.%(bulkinsert)s.path == files.path) 

376 AND ( kind_mask != ? OR format != ? ) 

377 '''), (kind_mask, format)) 

378 

379 if show_progress: 

380 task.update(2, condition='adding file names to selection') 

381 

382 cursor.execute(self._sql( 

383 ''' 

384 INSERT OR IGNORE INTO %(db)s.%(file_states)s 

385 SELECT files.file_id, 0, ?, ? 

386 FROM temp.%(bulkinsert)s 

387 INNER JOIN files 

388 ON temp.%(bulkinsert)s.path == files.path 

389 '''), (kind_mask, format)) 

390 

391 if show_progress: 

392 task.update(3, condition='updating file states') 

393 

394 cursor.execute(self._sql( 

395 ''' 

396 UPDATE %(db)s.%(file_states)s 

397 SET file_state = 1 

398 WHERE file_id IN ( 

399 SELECT files.file_id 

400 FROM temp.%(bulkinsert)s 

401 INNER JOIN files 

402 ON temp.%(bulkinsert)s.path == files.path) 

403 AND file_state != 0 

404 ''')) 

405 

406 if show_progress: 

407 task.update(4, condition='dropping temporary data') 

408 

409 cursor.execute(self._sql( 

410 'DROP TABLE temp.%(bulkinsert)s')) 

411 

412 if show_progress: 

413 task.update(5) 

414 task.done() 

415 

416 def remove(self, paths): 

417 ''' 

418 Remove files from the selection. 

419 

420 :param paths: 

421 Paths to files to be removed from the selection. 

422 :type paths: 

423 :py:class:`list` of :py:class:`str` 

424 ''' 

425 if isinstance(paths, str): 

426 paths = [paths] 

427 

428 db = self.get_database() 

429 

430 def normpath(path): 

431 return db.relpath(abspath(path)) 

432 

433 with self.transaction('remove files') as cursor: 

434 cursor.executemany(self._sql( 

435 ''' 

436 DELETE FROM %(db)s.%(file_states)s 

437 WHERE %(db)s.%(file_states)s.file_id IN 

438 (SELECT files.file_id 

439 FROM files 

440 WHERE files.path == ?) 

441 '''), ((normpath(path),) for path in paths)) 

442 

443 def iter_paths(self, raw=False): 

444 ''' 

445 Iterate over all file paths currently belonging to the selection. 

446 

447 :param raw: 

448 By default absolute paths are yielded. Set to ``True`` to yield 

449 the path as it is stored in the database, which can be relative or 

450 absolute, depending on whether the file is within a Squirrel 

451 environment or outside. 

452 :type raw: 

453 bool 

454 

455 :yields: File paths. 

456 ''' 

457 

458 sql = self._sql(''' 

459 SELECT 

460 files.path 

461 FROM %(db)s.%(file_states)s 

462 INNER JOIN files 

463 ON files.file_id = %(db)s.%(file_states)s.file_id 

464 ORDER BY %(db)s.%(file_states)s.file_id 

465 ''') 

466 

467 if raw: 

468 def trans(path): 

469 return path 

470 else: 

471 db = self.get_database() 

472 trans = db.abspath 

473 

474 for values in self._conn.execute(sql): 

475 yield trans(values[0]) 

476 

477 def get_paths(self, raw=False): 

478 ''' 

479 Get all file paths currently belonging to the selection. 

480 

481 :param raw: 

482 By default absolute paths are returned. Set to ``True`` to return 

483 the path as it is stored in the database, which can be relative or 

484 absolute, depending on whether the file is within a Squirrel 

485 environment or outside. 

486 :type raw: 

487 bool 

488 

489 :returns: List of file paths. 

490 ''' 

491 return list(self.iter_paths(raw=raw)) 

492 

493 def _set_file_states_known(self, transaction=None): 

494 ''' 

495 Set file states to "known" (2). 

496 ''' 

497 with (transaction or self.transaction('set file states known')) \ 

498 as cursor: 

499 cursor.execute(self._sql( 

500 ''' 

501 UPDATE %(db)s.%(file_states)s 

502 SET file_state = 2 

503 WHERE file_state < 2 

504 ''')) 

505 

506 def _set_file_states_force_check(self, paths=None, transaction=None): 

507 ''' 

508 Set file states to "request force check" (1). 

509 ''' 

510 

511 with (transaction or self.transaction('set file states force check')) \ 

512 as cursor: 

513 

514 if paths is None: 

515 cursor.execute(self._sql( 

516 ''' 

517 UPDATE %(db)s.%(file_states)s 

518 SET file_state = 1 

519 ''')) 

520 else: 

521 db = self.get_database() 

522 

523 def normpath(path): 

524 return db.relpath(abspath(path)) 

525 

526 cursor.executemany(self._sql( 

527 ''' 

528 UPDATE %(db)s.%(file_states)s 

529 SET file_state = 1 

530 WHERE %(db)s.%(file_states)s.file_id IN 

531 (SELECT files.file_id 

532 FROM files 

533 WHERE files.path == ?) 

534 '''), ((normpath(path),) for path in paths)) 

535 

536 def undig_grouped(self, skip_unchanged=False): 

537 ''' 

538 Get inventory of cached content for all files in the selection. 

539 

540 :param skip_unchanged: 

541 If ``True`` only inventory of modified files is 

542 yielded (:py:meth:`flag_modified` must be called beforehand). 

543 :type skip_unchanged: 

544 bool 

545 

546 This generator yields tuples ``((format, path), nuts)`` where ``path`` 

547 is the path to the file, ``format`` is the format assignation or 

548 ``'detect'`` and ``nuts`` is a list of 

549 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the 

550 contents of the file. 

551 ''' 

552 

553 if skip_unchanged: 

554 where = ''' 

555 WHERE %(db)s.%(file_states)s.file_state == 0 

556 ''' 

557 else: 

558 where = '' 

559 

560 nfiles = execute_get1(self._conn, self._sql(''' 

561 SELECT 

562 COUNT() 

563 FROM %(db)s.%(file_states)s 

564 ''' + where), ())[0] 

565 

566 def gen(): 

567 sql = self._sql(''' 

568 SELECT 

569 %(db)s.%(file_states)s.format, 

570 files.path, 

571 files.format, 

572 files.mtime, 

573 files.size, 

574 nuts.file_segment, 

575 nuts.file_element, 

576 kind_codes.kind_id, 

577 kind_codes.codes, 

578 nuts.tmin_seconds, 

579 nuts.tmin_offset, 

580 nuts.tmax_seconds, 

581 nuts.tmax_offset, 

582 kind_codes.deltat 

583 FROM %(db)s.%(file_states)s 

584 LEFT OUTER JOIN files 

585 ON %(db)s.%(file_states)s.file_id = files.file_id 

586 LEFT OUTER JOIN nuts 

587 ON files.file_id = nuts.file_id 

588 LEFT OUTER JOIN kind_codes 

589 ON nuts.kind_codes_id == kind_codes.kind_codes_id 

590 ''' + where + ''' 

591 ORDER BY %(db)s.%(file_states)s.file_id 

592 ''') 

593 

594 nuts = [] 

595 format_path = None 

596 db = self.get_database() 

597 for values in self._conn.execute(sql): 

598 apath = db.abspath(values[1]) 

599 if format_path is not None and apath != format_path[1]: 

600 yield format_path, nuts 

601 nuts = [] 

602 

603 format_path = values[0], apath 

604 

605 if values[2] is not None: 

606 nuts.append(model.Nut( 

607 values_nocheck=format_path[1:2] + values[2:])) 

608 

609 if format_path is not None: 

610 yield format_path, nuts 

611 

612 return GeneratorWithLen(gen(), nfiles) 

613 

614 def flag_modified(self, check=True): 

615 ''' 

616 Mark files which have been modified. 

617 

618 :param check: 

619 If ``True`` query modification times of known files on disk. If 

620 ``False``, only flag unknown files. 

621 :type check: 

622 bool 

623 

624 Assumes file state is 0 for newly added files, 1 for files added again 

625 to the selection (forces check), or 2 for all others (no checking is 

626 done for those). 

627 

628 Sets file state to 0 for unknown or modified files, 2 for known and not 

629 modified files. 

630 ''' 

631 

632 db = self.get_database() 

633 with self.transaction('flag modified') as cursor: 

634 sql = self._sql(''' 

635 UPDATE %(db)s.%(file_states)s 

636 SET file_state = 0 

637 WHERE ( 

638 SELECT mtime 

639 FROM files 

640 WHERE 

641 files.file_id == %(db)s.%(file_states)s.file_id) IS NULL 

642 AND file_state == 1 

643 ''') 

644 

645 cursor.execute(sql) 

646 

647 if not check: 

648 

649 sql = self._sql(''' 

650 UPDATE %(db)s.%(file_states)s 

651 SET file_state = 2 

652 WHERE file_state == 1 

653 ''') 

654 

655 cursor.execute(sql) 

656 

657 return 

658 

659 def iter_file_states(): 

660 sql = self._sql(''' 

661 SELECT 

662 files.file_id, 

663 files.path, 

664 files.format, 

665 files.mtime, 

666 files.size 

667 FROM %(db)s.%(file_states)s 

668 INNER JOIN files 

669 ON %(db)s.%(file_states)s.file_id == files.file_id 

670 WHERE %(db)s.%(file_states)s.file_state == 1 

671 ORDER BY %(db)s.%(file_states)s.file_id 

672 ''') 

673 

674 for (file_id, path, fmt, mtime_db, 

675 size_db) in self._conn.execute(sql): 

676 

677 path = db.abspath(path) 

678 try: 

679 mod = io.get_backend(fmt) 

680 file_stats = mod.get_stats(path) 

681 

682 except FileLoadError: 

683 yield 0, file_id 

684 continue 

685 except io.UnknownFormat: 

686 continue 

687 

688 if (mtime_db, size_db) != file_stats: 

689 yield 0, file_id 

690 else: 

691 yield 2, file_id 

692 

693 # could better use callback function here... 

694 

695 sql = self._sql(''' 

696 UPDATE %(db)s.%(file_states)s 

697 SET file_state = ? 

698 WHERE file_id = ? 

699 ''') 

700 

701 cursor.executemany(sql, iter_file_states()) 

702 

703 

704__all__ = [ 

705 'Selection', 

706]