Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/database.py: 89%

399 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-04 09:52 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Database interface code. 

8''' 

9 

10import sys 

11import os 

12import logging 

13import sqlite3 

14import re 

15import time 

16 

17from pyrocko.io.io_common import FileLoadError 

18from pyrocko import util 

19from pyrocko.guts import Object, Int, List, Dict, Tuple, String 

20from . import error, io 

21from .model import Nut, to_kind_id, to_kind, to_codes_simple, \ 

22 codes_patterns_for_kind 

23from .error import SquirrelError 

24 

25logger = logging.getLogger('psq.database') 

26 

27guts_prefix = 'squirrel' 

28 

29 

30def abspath(path): 

31 if not path.startswith('virtual:') and not path.startswith('client:'): 

32 return os.path.abspath(path) 

33 else: 

34 return path 

35 

36 

37def versiontuple(s): 

38 fill = [0, 0, 0] 

39 vals = [int(x) for x in s.split('.')] 

40 fill[:len(vals)] = vals 

41 return tuple(fill) 

42 

43 

44class ExecuteGet1Error(SquirrelError): 

45 pass 

46 

47 

48def execute_get1(connection, sql, args=()): 

49 rows = list(connection.execute(sql, args)) 

50 if len(rows) == 1: 

51 return rows[0] 

52 else: 

53 raise ExecuteGet1Error('Expected database entry not found.') 

54 

55 

56g_databases = {} 

57 

58 

59def get_database(path): 

60 path = os.path.abspath(path) 

61 if path not in g_databases: 

62 g_databases[path] = Database(path) 

63 

64 return g_databases[path] 

65 

66 

67def close_database(database): 

68 path = os.path.abspath(database._database_path) 

69 database._conn.close() 

70 if path in g_databases: 

71 del g_databases[path] 

72 

73 

74class Transaction(object): 

75 def __init__( 

76 self, conn, 

77 label='', 

78 mode='immediate', 

79 retry_interval=0.1, 

80 callback=None): 

81 

82 self.cursor = conn.cursor() 

83 assert mode in ('deferred', 'immediate', 'exclusive') 

84 self.mode = mode 

85 self.depth = 0 

86 self.rollback_wanted = False 

87 self.retry_interval = retry_interval 

88 self.callback = callback 

89 self.label = label 

90 self.started = False 

91 

92 def begin(self): 

93 if self.depth == 0: 

94 tries = 0 

95 while True: 

96 try: 

97 tries += 1 

98 self.cursor.execute('BEGIN %s' % self.mode.upper()) 

99 self.started = True 

100 logger.debug( 

101 'Transaction started: %-30s (pid: %s, mode: %s)' 

102 % (self.label, os.getpid(), self.mode)) 

103 

104 self.total_changes_begin \ 

105 = self.cursor.connection.total_changes 

106 break 

107 

108 except sqlite3.OperationalError as e: 

109 if not str(e) == 'database is locked': 

110 raise 

111 

112 logger.info( 

113 'Database is locked retrying in %s s: %s ' 

114 '(pid: %s, tries: %i)' % ( 

115 self.retry_interval, self.label, 

116 os.getpid(), tries)) 

117 

118 time.sleep(self.retry_interval) 

119 

120 self.depth += 1 

121 

122 def commit(self): 

123 if not self.started: 

124 raise Exception( 

125 'Trying to commit without having started a transaction.') 

126 

127 self.depth -= 1 

128 if self.depth == 0: 

129 if not self.rollback_wanted: 

130 self.cursor.execute('COMMIT') 

131 self.started = False 

132 if self.total_changes_begin is not None: 

133 total_changes = self.cursor.connection.total_changes \ 

134 - self.total_changes_begin 

135 else: 

136 total_changes = None 

137 

138 if self.callback is not None and total_changes: 

139 self.callback('modified', total_changes) 

140 

141 logger.debug( 

142 'Transaction completed: %-30s ' 

143 '(pid: %s, changes: %i)' % ( 

144 self.label, os.getpid(), total_changes or 0)) 

145 

146 else: 

147 self.cursor.execute('ROLLBACK') 

148 self.started = False 

149 logger.warning('Deferred rollback executed.') 

150 logger.debug( 

151 'Transaction failed: %-30s (pid: %s)' % ( 

152 self.label, os.getpid())) 

153 self.rollback_wanted = False 

154 

155 def rollback(self): 

156 if not self.started: 

157 raise Exception( 

158 'Trying to rollback without having started a transaction.') 

159 

160 self.depth -= 1 

161 if self.depth == 0: 

162 self.cursor.execute('ROLLBACK') 

163 self.started = False 

164 

165 logger.debug( 

166 'Transaction failed: %-30s (pid: %s)' % ( 

167 self.label, os.getpid())) 

168 

169 self.rollback_wanted = False 

170 else: 

171 logger.warning('Deferred rollback scheduled.') 

172 self.rollback_wanted = True 

173 

174 def close(self): 

175 self.cursor.close() 

176 

177 def __enter__(self): 

178 self.begin() 

179 return self.cursor 

180 

181 def __exit__(self, exc_type, exc_value, traceback): 

182 if exc_type is None: 

183 self.commit() 

184 else: 

185 self.rollback() 

186 

187 if self.depth == 0: 

188 self.close() 

189 self.callback = None 

190 

191 

192class Database(object): 

193 ''' 

194 Shared meta-information database used by Squirrel. 

195 ''' 

196 

197 def __init__(self, database_path=':memory:', log_statements=False): 

198 self._database_path = database_path 

199 if database_path != ':memory:': 

200 util.ensuredirs(database_path) 

201 

202 try: 

203 logger.debug( 

204 'Opening connection to database (threadsafety: %i): %s', 

205 sqlite3.threadsafety, 

206 database_path) 

207 

208 self._conn = sqlite3.connect( 

209 database_path, 

210 isolation_level=None, 

211 check_same_thread=False if sqlite3.threadsafety else True) 

212 

213 except sqlite3.OperationalError: 

214 raise error.SquirrelError( 

215 'Cannot connect to database: %s' % database_path) 

216 

217 self._conn.text_factory = str 

218 self._tables = {} 

219 

220 if log_statements: 

221 self._conn.set_trace_callback(self._log_statement) 

222 

223 self._listeners = [] 

224 self._initialize_db() 

225 self._basepath = None 

226 

227 self.version = None 

228 

229 def set_basepath(self, basepath): 

230 if basepath is not None: 

231 self._basepath = os.path.abspath(basepath) 

232 else: 

233 self._basepath = None 

234 

235 def relpath(self, path): 

236 if self._basepath is not None and path.startswith( 

237 self._basepath + os.path.sep): 

238 return path[len(self._basepath) + 1:] 

239 else: 

240 return path 

241 

242 def abspath(self, path): 

243 if self._basepath is not None and not path.startswith('virtual:') \ 

244 and not path.startswith('client:') \ 

245 and not os.path.isabs(path): 

246 return os.path.join(self._basepath, path) 

247 else: 

248 return path 

249 

250 def _log_statement(self, statement): 

251 logger.debug(statement) 

252 

253 def get_connection(self): 

254 return self._conn 

255 

256 def transaction(self, label='', mode='immediate'): 

257 return Transaction( 

258 self._conn, 

259 label=label, 

260 mode=mode, 

261 callback=self._notify_listeners) 

262 

263 def add_listener(self, listener): 

264 listener_ref = util.smart_weakref(listener) 

265 self._listeners.append(listener_ref) 

266 return listener_ref 

267 

268 def remove_listener(self, listener_ref): 

269 self._listeners.remove(listener_ref) 

270 

271 def _notify_listeners(self, event, *args): 

272 dead = [] 

273 for listener_ref in self._listeners: 

274 listener = listener_ref() 

275 if listener is not None: 

276 listener(event, *args) 

277 else: 

278 dead.append(listener_ref) 

279 

280 for listener_ref in dead: 

281 self.remove_listener(listener_ref) 

282 

283 def _register_table(self, s): 

284 m = re.search(r'(\S+)\s*\(([^)]+)\)', s) 

285 table_name = m.group(1) 

286 dtypes = m.group(2) 

287 table_header = [] 

288 for dele in dtypes.split(','): 

289 table_header.append(dele.split()[:2]) 

290 

291 self._tables[table_name] = table_header 

292 

293 return s 

294 

295 def _initialize_db(self): 

296 with self.transaction('initialize') as cursor: 

297 cursor.execute( 

298 '''PRAGMA recursive_triggers = true''') 

299 

300 cursor.execute( 

301 '''PRAGMA busy_timeout = 30000''') 

302 

303 if 2 == len(list( 

304 cursor.execute( 

305 ''' 

306 SELECT name FROM sqlite_master 

307 WHERE type = 'table' AND name IN ( 

308 'files', 

309 'persistent') 

310 '''))): 

311 

312 try: 

313 self.version = versiontuple(execute_get1( 

314 cursor, 

315 ''' 

316 SELECT value FROM settings 

317 WHERE key == "version" 

318 ''')[0]) 

319 except sqlite3.OperationalError: 

320 raise error.SquirrelError( 

321 'Squirrel database in pre-release format found: %s\n' 

322 'Please remove the database file and reindex.' 

323 % self._database_path) 

324 

325 if self.version >= (1, 1, 0): 

326 raise error.SquirrelError( 

327 'Squirrel database "%s" is of version %i.%i.%i which ' 

328 'is not supported by this version of Pyrocko. Please ' 

329 'upgrade the Pyrocko library.' 

330 % ((self._database_path, ) + self.version)) 

331 

332 return 

333 

334 cursor.execute(self._register_table( 

335 ''' 

336 CREATE TABLE IF NOT EXISTS settings ( 

337 key text PRIMARY KEY, 

338 value text) 

339 ''')) 

340 

341 cursor.execute( 

342 'INSERT OR IGNORE INTO settings VALUES ("version", "1.0")') 

343 

344 self.version = execute_get1( 

345 cursor, 

346 'SELECT value FROM settings WHERE key == "version"') 

347 

348 cursor.execute(self._register_table( 

349 ''' 

350 CREATE TABLE IF NOT EXISTS files ( 

351 file_id integer PRIMARY KEY, 

352 path text, 

353 format text, 

354 mtime float, 

355 size integer) 

356 ''')) 

357 

358 cursor.execute( 

359 ''' 

360 CREATE UNIQUE INDEX IF NOT EXISTS index_files_path 

361 ON files (path) 

362 ''') 

363 

364 cursor.execute(self._register_table( 

365 ''' 

366 CREATE TABLE IF NOT EXISTS nuts ( 

367 nut_id integer PRIMARY KEY AUTOINCREMENT, 

368 file_id integer, 

369 file_segment integer, 

370 file_element integer, 

371 kind_id integer, 

372 kind_codes_id integer, 

373 tmin_seconds integer, 

374 tmin_offset integer, 

375 tmax_seconds integer, 

376 tmax_offset integer, 

377 kscale integer) 

378 ''')) 

379 

380 cursor.execute( 

381 ''' 

382 CREATE UNIQUE INDEX IF NOT EXISTS index_nuts_file_element 

383 ON nuts (file_id, file_segment, file_element) 

384 ''') 

385 

386 cursor.execute(self._register_table( 

387 ''' 

388 CREATE TABLE IF NOT EXISTS kind_codes ( 

389 kind_codes_id integer PRIMARY KEY, 

390 kind_id integer, 

391 codes text, 

392 deltat float) 

393 ''')) 

394 

395 cursor.execute( 

396 ''' 

397 CREATE UNIQUE INDEX IF NOT EXISTS index_kind_codes 

398 ON kind_codes (kind_id, codes, deltat) 

399 ''') 

400 

401 cursor.execute(self._register_table( 

402 ''' 

403 CREATE TABLE IF NOT EXISTS kind_codes_count ( 

404 kind_codes_id integer PRIMARY KEY, 

405 count integer) 

406 ''')) 

407 

408 cursor.execute( 

409 ''' 

410 CREATE INDEX IF NOT EXISTS index_nuts_file_id 

411 ON nuts (file_id) 

412 ''') 

413 

414 cursor.execute( 

415 ''' 

416 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_delete_file 

417 BEFORE DELETE ON files FOR EACH ROW 

418 BEGIN 

419 DELETE FROM nuts where file_id == old.file_id; 

420 END 

421 ''') 

422 

423 # trigger only on size to make silent update of mtime possible 

424 cursor.execute( 

425 ''' 

426 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_update_file 

427 BEFORE UPDATE OF size ON files FOR EACH ROW 

428 BEGIN 

429 DELETE FROM nuts where file_id == old.file_id; 

430 END 

431 ''') 

432 

433 cursor.execute( 

434 ''' 

435 CREATE TRIGGER IF NOT EXISTS increment_kind_codes 

436 BEFORE INSERT ON nuts FOR EACH ROW 

437 BEGIN 

438 INSERT OR IGNORE INTO kind_codes_count 

439 VALUES (new.kind_codes_id, 0); 

440 UPDATE kind_codes_count 

441 SET count = count + 1 

442 WHERE new.kind_codes_id == kind_codes_id; 

443 END 

444 ''') 

445 

446 cursor.execute( 

447 ''' 

448 CREATE TRIGGER IF NOT EXISTS decrement_kind_codes 

449 BEFORE DELETE ON nuts FOR EACH ROW 

450 BEGIN 

451 UPDATE kind_codes_count 

452 SET count = count - 1 

453 WHERE old.kind_codes_id == kind_codes_id; 

454 END 

455 ''') 

456 

457 cursor.execute(self._register_table( 

458 ''' 

459 CREATE TABLE IF NOT EXISTS persistent ( 

460 name text UNIQUE) 

461 ''')) 

462 

463 def dig(self, nuts, transaction=None): 

464 ''' 

465 Store or update content meta-information. 

466 

467 Given ``nuts`` are assumed to represent an up-to-date and complete 

468 inventory of a set of files. Any old information about these files is 

469 first pruned from the database (via database triggers). If such content 

470 is part of a live selection, it is also removed there. Then the new 

471 content meta-information is inserted into the main database. The 

472 content is not automatically inserted into the live selections again. 

473 It is in the responsibility of the selection object to perform this 

474 step. 

475 ''' 

476 

477 nuts = list(nuts) 

478 

479 if not nuts: 

480 return 

481 

482 files = set() 

483 kind_codes = set() 

484 for nut in nuts: 

485 files.add(( 

486 self.relpath(nut.file_path), 

487 nut.file_format, 

488 nut.file_mtime, 

489 nut.file_size)) 

490 kind_codes.add( 

491 (nut.kind_id, nut.codes.safe_str, nut.deltat or 0.0)) 

492 

493 with (transaction or self.transaction('dig')) as c: 

494 

495 c.executemany( 

496 'INSERT OR IGNORE INTO files VALUES (NULL,?,?,?,?)', files) 

497 

498 c.executemany( 

499 '''UPDATE files SET 

500 format = ?, mtime = ?, size = ? 

501 WHERE path == ? 

502 ''', 

503 ((x[1], x[2], x[3], x[0]) for x in files)) 

504 

505 c.executemany( 

506 'INSERT OR IGNORE INTO kind_codes VALUES (NULL,?,?,?)', 

507 kind_codes) 

508 

509 c.executemany( 

510 ''' 

511 INSERT INTO nuts VALUES 

512 (NULL, ( 

513 SELECT file_id FROM files 

514 WHERE path == ? 

515 ),?,?,?, 

516 ( 

517 SELECT kind_codes_id FROM kind_codes 

518 WHERE kind_id == ? AND codes == ? AND deltat == ? 

519 ), ?,?,?,?,?) 

520 ''', 

521 ((self.relpath(nut.file_path), 

522 nut.file_segment, nut.file_element, 

523 nut.kind_id, 

524 nut.kind_id, nut.codes.safe_str, nut.deltat or 0.0, 

525 nut.tmin_seconds, nut.tmin_offset, 

526 nut.tmax_seconds, nut.tmax_offset, 

527 nut.kscale) for nut in nuts)) 

528 

529 def undig(self, path): 

530 

531 path = self.relpath(abspath(path)) 

532 

533 sql = ''' 

534 SELECT 

535 files.path, 

536 files.format, 

537 files.mtime, 

538 files.size, 

539 nuts.file_segment, 

540 nuts.file_element, 

541 kind_codes.kind_id, 

542 kind_codes.codes, 

543 nuts.tmin_seconds, 

544 nuts.tmin_offset, 

545 nuts.tmax_seconds, 

546 nuts.tmax_offset, 

547 kind_codes.deltat 

548 FROM files 

549 INNER JOIN nuts ON files.file_id = nuts.file_id 

550 INNER JOIN kind_codes 

551 ON nuts.kind_codes_id == kind_codes.kind_codes_id 

552 WHERE path == ? 

553 ''' 

554 

555 return [Nut(values_nocheck=(self.abspath(row[0]),) + row[1:]) 

556 for row in self._conn.execute(sql, (path,))] 

557 

558 def undig_all(self): 

559 sql = ''' 

560 SELECT 

561 files.path, 

562 files.format, 

563 files.mtime, 

564 files.size, 

565 nuts.file_segment, 

566 nuts.file_element, 

567 kind_codes.kind_id, 

568 kind_codes.codes, 

569 nuts.tmin_seconds, 

570 nuts.tmin_offset, 

571 nuts.tmax_seconds, 

572 nuts.tmax_offset, 

573 kind_codes.deltat 

574 FROM files 

575 INNER JOIN nuts ON files.file_id == nuts.file_id 

576 INNER JOIN kind_codes 

577 ON nuts.kind_codes_id == kind_codes.kind_codes_id 

578 ''' 

579 

580 nuts = [] 

581 path = None 

582 for values in self._conn.execute(sql): 

583 if path is not None and values[0] != path: 

584 yield path, nuts 

585 nuts = [] 

586 

587 path = self.abspath(values[0]) 

588 

589 if values[1] is not None: 

590 nuts.append(Nut(values_nocheck=(path,) + values[1:])) 

591 

592 if path is not None: 

593 yield path, nuts 

594 

595 def undig_few(self, paths, format='detect'): 

596 for path in paths: 

597 nuts = self.undig(path) 

598 if nuts: 

599 yield (nuts[0].file_format, path), nuts 

600 else: 

601 yield (format, path), [] 

602 

603 def undig_many(self, paths, show_progress=True): 

604 selection = self.new_selection(paths, show_progress=show_progress) 

605 

606 for (_, path), nuts in selection.undig_grouped(): 

607 yield path, nuts 

608 

609 del selection 

610 

611 def new_selection(self, paths=None, format='detect', show_progress=True): 

612 from .selection import Selection 

613 selection = Selection(self) 

614 if paths: 

615 selection.add(paths, format=format, show_progress=show_progress) 

616 return selection 

617 

618 def undig_content(self, nut): 

619 return None 

620 

621 def remove(self, path): 

622 ''' 

623 Prune content meta-information about a given file. 

624 

625 All content pieces belonging to file ``path`` are removed from the 

626 main database and any attached live selections (via database triggers). 

627 ''' 

628 

629 path = self.relpath(abspath(path)) 

630 

631 with self.transaction('remove file') as cursor: 

632 cursor.execute( 

633 'DELETE FROM files WHERE path = ?', (path,)) 

634 

635 def remove_glob(self, pattern): 

636 ''' 

637 Prune content meta-information about files matching given pattern. 

638 

639 All content pieces belonging to files who's pathes match the given 

640 ``pattern`` are removed from the main database and any attached live 

641 selections (via database triggers). 

642 ''' 

643 

644 with self.transaction('remove file glob') as cursor: 

645 return cursor.execute( 

646 'DELETE FROM files WHERE path GLOB ?', (pattern,)).rowcount 

647 

648 def _remove_volatile(self): 

649 ''' 

650 Prune leftover volatile content from database. 

651 

652 If the cleanup handler of an attached selection is not called, e.g. due 

653 to a crash or terminated process, volatile content will not be removed 

654 properly. This method will delete such leftover entries. 

655 

656 This is a mainenance operatation which should only be called when no 

657 apps are using the database because it would remove volatile content 

658 currently used by the apps. 

659 ''' 

660 

661 with self.transaction('remove volatile') as cursor: 

662 return cursor.execute( 

663 ''' 

664 DELETE FROM files 

665 WHERE path LIKE 'virtual:volatile:%' 

666 ''').rowcount 

667 

668 def reset(self, path, transaction=None): 

669 ''' 

670 Prune information associated with a given file, but keep the file path. 

671 

672 This method is called when reading a file failed. File attributes, 

673 format, size and modification time are set to NULL. File content 

674 meta-information is removed from the database and any attached live 

675 selections (via database triggers). 

676 ''' 

677 

678 path = self.relpath(abspath(path)) 

679 

680 with (transaction or self.transaction('reset file')) as cursor: 

681 cursor.execute( 

682 ''' 

683 UPDATE files SET 

684 format = NULL, 

685 mtime = NULL, 

686 size = NULL 

687 WHERE path = ? 

688 ''', (path,)) 

689 

690 def silent_touch(self, path): 

691 ''' 

692 Update modification time of file without initiating reindexing. 

693 

694 Useful to prolong validity period of data with expiration date. 

695 ''' 

696 

697 apath = abspath(path) 

698 path = self.relpath(apath) 

699 

700 with self.transaction('silent touch') as cursor: 

701 

702 sql = 'SELECT format, size FROM files WHERE path = ?' 

703 fmt, size = execute_get1(cursor, sql, (path,)) 

704 

705 mod = io.get_backend(fmt) 

706 mod.touch(apath) 

707 file_stats = mod.get_stats(apath) 

708 

709 if file_stats[1] != size: 

710 raise FileLoadError( 

711 'Silent update for file "%s" failed: size has changed.' 

712 % apath) 

713 

714 sql = ''' 

715 UPDATE files 

716 SET mtime = ? 

717 WHERE path = ? 

718 ''' 

719 cursor.execute(sql, (file_stats[0], path)) 

720 

721 def _iter_codes_info( 

722 self, kind=None, codes=None, kind_codes_count='kind_codes_count'): 

723 

724 args = [] 

725 sel = '' 

726 if kind is not None: 

727 kind_id = to_kind_id(kind) 

728 

729 sel = 'AND kind_codes.kind_id == ?' 

730 args.append(to_kind_id(kind)) 

731 

732 if codes is not None: 

733 assert kind is not None # TODO supp by recursing possible kinds 

734 kind_id = to_kind_id(kind) 

735 pats = codes_patterns_for_kind(kind_id, codes) 

736 

737 if pats: 

738 # could optimize this by using IN for non-patterns 

739 sel += ' AND ( %s ) ' % ' OR '.join( 

740 ('kind_codes.codes GLOB ?',) * len(pats)) 

741 

742 args.extend(pat.safe_str for pat in pats) 

743 

744 sql = (''' 

745 SELECT 

746 kind_codes.kind_id, 

747 kind_codes.codes, 

748 kind_codes.deltat, 

749 kind_codes.kind_codes_id, 

750 %(kind_codes_count)s.count 

751 FROM %(kind_codes_count)s 

752 INNER JOIN kind_codes 

753 ON %(kind_codes_count)s.kind_codes_id 

754 == kind_codes.kind_codes_id 

755 WHERE %(kind_codes_count)s.count > 0 

756 ''' + sel + ''' 

757 ''') % {'kind_codes_count': kind_codes_count} 

758 

759 for kind_id, scodes, deltat, kcid, count in self._conn.execute( 

760 sql, args): 

761 

762 yield ( 

763 kind_id, to_codes_simple(kind_id, scodes), deltat, kcid, count) 

764 

765 def _iter_deltats(self, kind=None, kind_codes_count='kind_codes_count'): 

766 args = [] 

767 sel = '' 

768 if kind is not None: 

769 assert isinstance(kind, str) 

770 sel = 'AND kind_codes.kind_id == ?' 

771 args.append(to_kind_id(kind)) 

772 

773 sql = (''' 

774 SELECT DISTINCT kind_codes.deltat FROM %(kind_codes_count)s 

775 INNER JOIN kind_codes 

776 ON %(kind_codes_count)s.kind_codes_id 

777 == kind_codes.kind_codes_id 

778 WHERE %(kind_codes_count)s.count > 0 

779 ''' + sel + ''' 

780 ORDER BY kind_codes.deltat 

781 ''') % {'kind_codes_count': kind_codes_count} 

782 

783 for row in self._conn.execute(sql, args): 

784 yield row[0] 

785 

786 def _iter_codes(self, kind=None, kind_codes_count='kind_codes_count'): 

787 args = [] 

788 sel = '' 

789 if kind is not None: 

790 assert isinstance(kind, str) 

791 sel = 'AND kind_codes.kind_id == ?' 

792 args.append(to_kind_id(kind)) 

793 

794 sql = (''' 

795 SELECT DISTINCT kind_codes.kind_id, kind_codes.codes 

796 FROM %(kind_codes_count)s 

797 INNER JOIN kind_codes 

798 ON %(kind_codes_count)s.kind_codes_id 

799 == kind_codes.kind_codes_id 

800 WHERE %(kind_codes_count)s.count > 0 

801 ''' + sel + ''' 

802 ORDER BY kind_codes.codes 

803 

804 ''') % dict(kind_codes_count=kind_codes_count) 

805 

806 for row in self._conn.execute(sql, args): 

807 yield to_codes_simple(*row) 

808 

809 def _iter_kinds(self, codes=None, kind_codes_count='kind_codes_count'): 

810 args = [] 

811 sel = '' 

812 if codes is not None: 

813 sel = 'AND kind_codes.codes == ?' 

814 args.append(codes.safe_str) 

815 

816 sql = (''' 

817 SELECT DISTINCT kind_codes.kind_id FROM %(kind_codes_count)s 

818 INNER JOIN kind_codes 

819 ON %(kind_codes_count)s.kind_codes_id 

820 == kind_codes.kind_codes_id 

821 WHERE %(kind_codes_count)s.count > 0 

822 ''' + sel + ''' 

823 ORDER BY kind_codes.kind_id 

824 ''') % {'kind_codes_count': kind_codes_count} 

825 

826 for row in self._conn.execute(sql, args): 

827 yield to_kind(row[0]) 

828 

829 def iter_paths(self): 

830 for row in self._conn.execute('''SELECT path FROM files'''): 

831 yield self.abspath(row[0]) 

832 

833 def iter_nnuts_by_file(self): 

834 sql = ''' 

835 SELECT 

836 path, 

837 (SELECT COUNT(*) FROM nuts WHERE nuts.file_id = files.file_id) 

838 FROM files 

839 ''' 

840 for row in self._conn.execute(sql): 

841 yield (self.abspath(row[0]),) + row[1:] 

842 

843 def iter_kinds(self, codes=None): 

844 return self._iter_kinds(codes=codes) 

845 

846 def iter_codes(self, kind=None): 

847 return self._iter_codes(kind=kind) 

848 

849 def get_paths(self): 

850 return list(self.iter_paths()) 

851 

852 def get_kinds(self, codes=None): 

853 return list(self.iter_kinds(codes=codes)) 

854 

855 def get_codes(self, kind=None): 

856 return list(self.iter_codes(kind=kind)) 

857 

858 def get_counts(self, kind=None): 

859 d = {} 

860 for kind_id, codes, _, _, count in self._iter_codes_info(kind=kind): 

861 if kind_id not in d: 

862 v = d[kind_id] = {} 

863 else: 

864 v = d[kind_id] 

865 

866 if codes not in v: 

867 v[codes] = 0 

868 

869 v[codes] += count 

870 

871 if kind is not None: 

872 return d[to_kind_id(kind)] 

873 else: 

874 return dict((to_kind(kind_id), v) for (kind_id, v) in d.items()) 

875 

876 def get_nfiles(self): 

877 sql = '''SELECT COUNT(*) FROM files''' 

878 for row in self._conn.execute(sql): 

879 return row[0] 

880 

881 def get_nnuts(self): 

882 sql = '''SELECT COUNT(*) FROM nuts''' 

883 for row in self._conn.execute(sql): 

884 return row[0] 

885 

886 def get_nnuts_by_file(self): 

887 return list(self.iter_nnuts_by_file()) 

888 

889 def get_total_size(self): 

890 sql = ''' 

891 SELECT SUM(files.size) FROM files 

892 ''' 

893 

894 for row in self._conn.execute(sql): 

895 return row[0] or 0 

896 

897 def get_persistent_names(self): 

898 sql = ''' 

899 SELECT name FROM persistent 

900 ''' 

901 return [row[0] for row in self._conn.execute(sql)] 

902 

903 def get_stats(self): 

904 return DatabaseStats( 

905 nfiles=self.get_nfiles(), 

906 nnuts=self.get_nnuts(), 

907 kinds=self.get_kinds(), 

908 codes=self.get_codes(), 

909 counts=self.get_counts(), 

910 total_size=self.get_total_size(), 

911 persistent=self.get_persistent_names()) 

912 

913 def __str__(self): 

914 return str(self.get_stats()) 

915 

916 def print_tables(self, stream=None): 

917 for table in [ 

918 'persistent', 

919 'files', 

920 'nuts', 

921 'kind_codes', 

922 'kind_codes_count']: 

923 

924 self.print_table(table, stream=stream) 

925 

926 def print_table(self, name, stream=None): 

927 

928 if stream is None: 

929 stream = sys.stdout 

930 

931 class hstr(str): 

932 def __repr__(self): 

933 return self 

934 

935 w = stream.write 

936 w('\n') 

937 w('\n') 

938 w(name) 

939 w('\n') 

940 sql = 'SELECT * FROM %s' % name 

941 tab = [] 

942 if name in self._tables: 

943 headers = self._tables[name] 

944 tab.append([None for _ in headers]) 

945 tab.append([hstr(x[0]) for x in headers]) 

946 tab.append([hstr(x[1]) for x in headers]) 

947 tab.append([None for _ in headers]) 

948 

949 for row in self._conn.execute(sql): 

950 tab.append([x for x in row]) 

951 

952 widths = [ 

953 max((len(repr(x)) if x is not None else 0) for x in col) 

954 for col in zip(*tab)] 

955 

956 for row in tab: 

957 w(' '.join( 

958 (repr(x).ljust(wid) if x is not None else ''.ljust(wid, '-')) 

959 for (x, wid) in zip(row, widths))) 

960 

961 w('\n') 

962 

963 w('\n') 

964 

965 

966class DatabaseStats(Object): 

967 ''' 

968 Container to hold statistics about contents cached in meta-information db. 

969 ''' 

970 

971 nfiles = Int.T( 

972 help='Number of files in database.') 

973 nnuts = Int.T( 

974 help='Number of index nuts in database.') 

975 codes = List.T( 

976 Tuple.T(content_t=String.T()), 

977 help='Available code sequences in database, e.g. ' 

978 '(agency, network, station, location) for stations nuts.') 

979 kinds = List.T( 

980 String.T(), 

981 help='Available content types in database.') 

982 total_size = Int.T( 

983 help='Aggregated file size [bytes] of files referenced in database.') 

984 counts = Dict.T( 

985 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()), 

986 help='Breakdown of how many nuts of any content type and code ' 

987 'sequence are available in database, ``counts[kind][codes]``.') 

988 persistent = List.T( 

989 String.T(), 

990 help='Names of persistent selections stored in database.') 

991 

992 def __str__(self): 

993 kind_counts = dict( 

994 (kind, sum(self.counts[kind].values())) for kind in self.kinds) 

995 

996 codes = [c.safe_str for c in self.codes] 

997 

998 if len(codes) > 20: 

999 scodes = '\n' + util.ewrap(codes[:10], indent=' ') \ 

1000 + '\n [%i more]\n' % (len(codes) - 20) \ 

1001 + util.ewrap(codes[-10:], indent=' ') 

1002 else: 

1003 scodes = '\n' + util.ewrap(codes, indent=' ') \ 

1004 if codes else '<none>' 

1005 

1006 s = ''' 

1007Available codes: %s 

1008Number of files: %i 

1009Total size of known files: %s 

1010Number of index nuts: %i 

1011Available content kinds: %s 

1012Persistent selections: %s''' % ( 

1013 scodes, 

1014 self.nfiles, 

1015 util.human_bytesize(self.total_size), 

1016 self.nnuts, 

1017 ', '.join('%s: %i' % ( 

1018 kind, kind_counts[kind]) for kind in sorted(self.kinds)), 

1019 ', '.join(self.persistent)) 

1020 

1021 return s 

1022 

1023 

1024__all__ = [ 

1025 'Database', 

1026 'DatabaseStats', 

1027]