1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6from __future__ import absolute_import, print_function 

7 

8import sys 

9import os 

10import logging 

11import sqlite3 

12import re 

13import time 

14import types 

15import weakref 

16 

17from pyrocko.io.io_common import FileLoadError 

18from pyrocko import util 

19from pyrocko.guts import Object, Int, List, Dict, Tuple, String 

20from . import error, io 

21from .model import Nut, to_kind_id, to_kind, to_codes_simple 

22from .error import SquirrelError 

23 

24logger = logging.getLogger('psq.database') 

25 

26guts_prefix = 'squirrel' 

27 

28 

29def abspath(path): 

30 if not path.startswith('virtual:') and not path.startswith('client:'): 

31 return os.path.abspath(path) 

32 else: 

33 return path 

34 

35 

36def versiontuple(s): 

37 fill = [0, 0, 0] 

38 vals = [int(x) for x in s.split('.')] 

39 fill[:len(vals)] = vals 

40 return tuple(fill) 

41 

42 

43class ExecuteGet1Error(SquirrelError): 

44 pass 

45 

46 

47def execute_get1(connection, sql, args=()): 

48 rows = list(connection.execute(sql, args)) 

49 if len(rows) == 1: 

50 return rows[0] 

51 else: 

52 raise ExecuteGet1Error('Expected database entry not found.') 

53 

54 

55g_databases = {} 

56 

57 

58def get_database(path): 

59 path = os.path.abspath(path) 

60 if path not in g_databases: 

61 g_databases[path] = Database(path) 

62 

63 return g_databases[path] 

64 

65 

66def close_database(database): 

67 path = os.path.abspath(database._database_path) 

68 database._conn.close() 

69 if path in g_databases: 

70 del g_databases[path] 

71 

72 

73class Transaction(object): 

74 def __init__( 

75 self, conn, 

76 label='', 

77 mode='immediate', 

78 retry_interval=0.1, 

79 callback=None): 

80 

81 self.cursor = conn.cursor() 

82 assert mode in ('deferred', 'immediate', 'exclusive') 

83 self.mode = mode 

84 self.depth = 0 

85 self.rollback_wanted = False 

86 self.retry_interval = retry_interval 

87 self.callback = callback 

88 self.label = label 

89 self.started = False 

90 

91 def begin(self): 

92 if self.depth == 0: 

93 tries = 0 

94 while True: 

95 try: 

96 tries += 1 

97 self.cursor.execute('BEGIN %s' % self.mode.upper()) 

98 self.started = True 

99 logger.debug( 

100 'Transaction started: %-30s (pid: %s, mode: %s)' 

101 % (self.label, os.getpid(), self.mode)) 

102 

103 self.total_changes_begin \ 

104 = self.cursor.connection.total_changes 

105 break 

106 

107 except sqlite3.OperationalError as e: 

108 if not str(e) == 'database is locked': 

109 raise 

110 

111 logger.info( 

112 'Database is locked retrying in %s s: %s ' 

113 '(pid: %s, tries: %i)' % ( 

114 self.retry_interval, self.label, 

115 os.getpid(), tries)) 

116 

117 time.sleep(self.retry_interval) 

118 

119 self.depth += 1 

120 

121 def commit(self): 

122 if not self.started: 

123 raise Exception( 

124 'Trying to commit without having started a transaction.') 

125 

126 self.depth -= 1 

127 if self.depth == 0: 

128 if not self.rollback_wanted: 

129 self.cursor.execute('COMMIT') 

130 self.started = False 

131 if self.total_changes_begin is not None: 

132 total_changes = self.cursor.connection.total_changes \ 

133 - self.total_changes_begin 

134 else: 

135 total_changes = None 

136 

137 if self.callback is not None and total_changes: 

138 self.callback('modified', total_changes) 

139 

140 logger.debug( 

141 'Transaction completed: %-30s ' 

142 '(pid: %s, changes: %i)' % ( 

143 self.label, os.getpid(), total_changes or 0)) 

144 

145 else: 

146 self.cursor.execute('ROLLBACK') 

147 self.started = False 

148 logger.warning('Deferred rollback executed.') 

149 logger.debug( 

150 'Transaction failed: %-30s (pid: %s)' % ( 

151 self.label, os.getpid())) 

152 self.rollback_wanted = False 

153 

154 def rollback(self): 

155 if not self.started: 

156 raise Exception( 

157 'Trying to rollback without having started a transaction.') 

158 

159 self.depth -= 1 

160 if self.depth == 0: 

161 self.cursor.execute('ROLLBACK') 

162 self.started = False 

163 

164 logger.debug( 

165 'Transaction failed: %-30s (pid: %s)' % ( 

166 self.label, os.getpid())) 

167 

168 self.rollback_wanted = False 

169 else: 

170 logger.warning('Deferred rollback scheduled.') 

171 self.rollback_wanted = True 

172 

173 def close(self): 

174 self.cursor.close() 

175 

176 def __enter__(self): 

177 self.begin() 

178 return self.cursor 

179 

180 def __exit__(self, exc_type, exc_value, traceback): 

181 if exc_type is None: 

182 self.commit() 

183 else: 

184 self.rollback() 

185 

186 if self.depth == 0: 

187 self.close() 

188 self.callback = None 

189 

190 

191class Database(object): 

192 ''' 

193 Shared meta-information database used by Squirrel. 

194 ''' 

195 

196 def __init__(self, database_path=':memory:', log_statements=False): 

197 self._database_path = database_path 

198 if database_path != ':memory:': 

199 util.ensuredirs(database_path) 

200 

201 try: 

202 logger.debug('Opening connection to database: %s' % database_path) 

203 self._conn = sqlite3.connect(database_path, isolation_level=None) 

204 except sqlite3.OperationalError: 

205 raise error.SquirrelError( 

206 'Cannot connect to database: %s' % database_path) 

207 

208 self._conn.text_factory = str 

209 self._tables = {} 

210 

211 if log_statements: 

212 self._conn.set_trace_callback(self._log_statement) 

213 

214 self._listeners = [] 

215 self._initialize_db() 

216 self._basepath = None 

217 

218 self.version = None 

219 

220 def set_basepath(self, basepath): 

221 if basepath is not None: 

222 self._basepath = os.path.abspath(basepath) 

223 else: 

224 self._basepath = None 

225 

226 def relpath(self, path): 

227 if self._basepath is not None and path.startswith( 

228 self._basepath + os.path.sep): 

229 return path[len(self._basepath) + 1:] 

230 else: 

231 return path 

232 

233 def abspath(self, path): 

234 if self._basepath is not None and not path.startswith('virtual:') \ 

235 and not path.startswith('client:') \ 

236 and not os.path.isabs(path): 

237 return os.path.join(self._basepath, path) 

238 else: 

239 return path 

240 

241 def _log_statement(self, statement): 

242 logger.debug(statement) 

243 

244 def get_connection(self): 

245 return self._conn 

246 

247 def transaction(self, label='', mode='immediate'): 

248 return Transaction( 

249 self._conn, 

250 label=label, 

251 mode=mode, 

252 callback=self._notify_listeners) 

253 

254 def add_listener(self, listener): 

255 if isinstance(listener, types.MethodType): 

256 listener_ref = weakref.WeakMethod(listener) 

257 else: 

258 listener_ref = weakref.ref(listener) 

259 

260 self._listeners.append(listener_ref) 

261 return listener_ref 

262 

263 def remove_listener(self, listener_ref): 

264 self._listeners.remove(listener_ref) 

265 

266 def _notify_listeners(self, event, *args): 

267 dead = [] 

268 for listener_ref in self._listeners: 

269 listener = listener_ref() 

270 if listener is not None: 

271 listener(event, *args) 

272 else: 

273 dead.append(listener_ref) 

274 

275 for listener_ref in dead: 

276 self.remove_listener(listener_ref) 

277 

278 def _register_table(self, s): 

279 m = re.search(r'(\S+)\s*\(([^)]+)\)', s) 

280 table_name = m.group(1) 

281 dtypes = m.group(2) 

282 table_header = [] 

283 for dele in dtypes.split(','): 

284 table_header.append(dele.split()[:2]) 

285 

286 self._tables[table_name] = table_header 

287 

288 return s 

289 

290 def _initialize_db(self): 

291 with self.transaction('initialize') as cursor: 

292 cursor.execute( 

293 '''PRAGMA recursive_triggers = true''') 

294 

295 cursor.execute( 

296 '''PRAGMA busy_timeout = 30000''') 

297 

298 if 2 == len(list( 

299 cursor.execute( 

300 ''' 

301 SELECT name FROM sqlite_master 

302 WHERE type = 'table' AND name IN ( 

303 'files', 

304 'persistent') 

305 '''))): 

306 

307 try: 

308 self.version = versiontuple(execute_get1( 

309 cursor, 

310 ''' 

311 SELECT value FROM settings 

312 WHERE key == "version" 

313 ''')[0]) 

314 except sqlite3.OperationalError: 

315 raise error.SquirrelError( 

316 'Squirrel database in pre-release format found: %s\n' 

317 'Please remove the database file and reindex.' 

318 % self._database_path) 

319 

320 if self.version >= (1, 1, 0): 

321 raise error.SquirrelError( 

322 'Squirrel database "%s" is of version %i.%i.%i which ' 

323 'is not supported by this version of Pyrocko. Please ' 

324 'upgrade the Pyrocko library.' 

325 % ((self._database_path, ) + self.version)) 

326 

327 return 

328 

329 cursor.execute(self._register_table( 

330 ''' 

331 CREATE TABLE IF NOT EXISTS settings ( 

332 key text PRIMARY KEY, 

333 value text) 

334 ''')) 

335 

336 cursor.execute( 

337 'INSERT OR IGNORE INTO settings VALUES ("version", "1.0")') 

338 

339 self.version = execute_get1( 

340 cursor, 

341 'SELECT value FROM settings WHERE key == "version"') 

342 

343 cursor.execute(self._register_table( 

344 ''' 

345 CREATE TABLE IF NOT EXISTS files ( 

346 file_id integer PRIMARY KEY, 

347 path text, 

348 format text, 

349 mtime float, 

350 size integer) 

351 ''')) 

352 

353 cursor.execute( 

354 ''' 

355 CREATE UNIQUE INDEX IF NOT EXISTS index_files_path 

356 ON files (path) 

357 ''') 

358 

359 cursor.execute(self._register_table( 

360 ''' 

361 CREATE TABLE IF NOT EXISTS nuts ( 

362 nut_id integer PRIMARY KEY AUTOINCREMENT, 

363 file_id integer, 

364 file_segment integer, 

365 file_element integer, 

366 kind_id integer, 

367 kind_codes_id integer, 

368 tmin_seconds integer, 

369 tmin_offset integer, 

370 tmax_seconds integer, 

371 tmax_offset integer, 

372 kscale integer) 

373 ''')) 

374 

375 cursor.execute( 

376 ''' 

377 CREATE UNIQUE INDEX IF NOT EXISTS index_nuts_file_element 

378 ON nuts (file_id, file_segment, file_element) 

379 ''') 

380 

381 cursor.execute(self._register_table( 

382 ''' 

383 CREATE TABLE IF NOT EXISTS kind_codes ( 

384 kind_codes_id integer PRIMARY KEY, 

385 kind_id integer, 

386 codes text, 

387 deltat float) 

388 ''')) 

389 

390 cursor.execute( 

391 ''' 

392 CREATE UNIQUE INDEX IF NOT EXISTS index_kind_codes 

393 ON kind_codes (kind_id, codes, deltat) 

394 ''') 

395 

396 cursor.execute(self._register_table( 

397 ''' 

398 CREATE TABLE IF NOT EXISTS kind_codes_count ( 

399 kind_codes_id integer PRIMARY KEY, 

400 count integer) 

401 ''')) 

402 

403 cursor.execute( 

404 ''' 

405 CREATE INDEX IF NOT EXISTS index_nuts_file_id 

406 ON nuts (file_id) 

407 ''') 

408 

409 cursor.execute( 

410 ''' 

411 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_delete_file 

412 BEFORE DELETE ON files FOR EACH ROW 

413 BEGIN 

414 DELETE FROM nuts where file_id == old.file_id; 

415 END 

416 ''') 

417 

418 # trigger only on size to make silent update of mtime possible 

419 cursor.execute( 

420 ''' 

421 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_update_file 

422 BEFORE UPDATE OF size ON files FOR EACH ROW 

423 BEGIN 

424 DELETE FROM nuts where file_id == old.file_id; 

425 END 

426 ''') 

427 

428 cursor.execute( 

429 ''' 

430 CREATE TRIGGER IF NOT EXISTS increment_kind_codes 

431 BEFORE INSERT ON nuts FOR EACH ROW 

432 BEGIN 

433 INSERT OR IGNORE INTO kind_codes_count 

434 VALUES (new.kind_codes_id, 0); 

435 UPDATE kind_codes_count 

436 SET count = count + 1 

437 WHERE new.kind_codes_id == kind_codes_id; 

438 END 

439 ''') 

440 

441 cursor.execute( 

442 ''' 

443 CREATE TRIGGER IF NOT EXISTS decrement_kind_codes 

444 BEFORE DELETE ON nuts FOR EACH ROW 

445 BEGIN 

446 UPDATE kind_codes_count 

447 SET count = count - 1 

448 WHERE old.kind_codes_id == kind_codes_id; 

449 END 

450 ''') 

451 

452 cursor.execute(self._register_table( 

453 ''' 

454 CREATE TABLE IF NOT EXISTS persistent ( 

455 name text UNIQUE) 

456 ''')) 

457 

458 def dig(self, nuts, transaction=None): 

459 ''' 

460 Store or update content meta-information. 

461 

462 Given ``nuts`` are assumed to represent an up-to-date and complete 

463 inventory of a set of files. Any old information about these files is 

464 first pruned from the database (via database triggers). If such content 

465 is part of a live selection, it is also removed there. Then the new 

466 content meta-information is inserted into the main database. The 

467 content is not automatically inserted into the live selections again. 

468 It is in the responsibility of the selection object to perform this 

469 step. 

470 ''' 

471 

472 nuts = list(nuts) 

473 

474 if not nuts: 

475 return 

476 

477 files = set() 

478 kind_codes = set() 

479 for nut in nuts: 

480 files.add(( 

481 self.relpath(nut.file_path), 

482 nut.file_format, 

483 nut.file_mtime, 

484 nut.file_size)) 

485 kind_codes.add( 

486 (nut.kind_id, nut.codes.safe_str, nut.deltat or 0.0)) 

487 

488 with (transaction or self.transaction('dig')) as c: 

489 

490 c.executemany( 

491 'INSERT OR IGNORE INTO files VALUES (NULL,?,?,?,?)', files) 

492 

493 c.executemany( 

494 '''UPDATE files SET 

495 format = ?, mtime = ?, size = ? 

496 WHERE path == ? 

497 ''', 

498 ((x[1], x[2], x[3], x[0]) for x in files)) 

499 

500 c.executemany( 

501 'INSERT OR IGNORE INTO kind_codes VALUES (NULL,?,?,?)', 

502 kind_codes) 

503 

504 c.executemany( 

505 ''' 

506 INSERT INTO nuts VALUES 

507 (NULL, ( 

508 SELECT file_id FROM files 

509 WHERE path == ? 

510 ),?,?,?, 

511 ( 

512 SELECT kind_codes_id FROM kind_codes 

513 WHERE kind_id == ? AND codes == ? AND deltat == ? 

514 ), ?,?,?,?,?) 

515 ''', 

516 ((self.relpath(nut.file_path), 

517 nut.file_segment, nut.file_element, 

518 nut.kind_id, 

519 nut.kind_id, nut.codes.safe_str, nut.deltat or 0.0, 

520 nut.tmin_seconds, nut.tmin_offset, 

521 nut.tmax_seconds, nut.tmax_offset, 

522 nut.kscale) for nut in nuts)) 

523 

524 def undig(self, path): 

525 

526 path = self.relpath(abspath(path)) 

527 

528 sql = ''' 

529 SELECT 

530 files.path, 

531 files.format, 

532 files.mtime, 

533 files.size, 

534 nuts.file_segment, 

535 nuts.file_element, 

536 kind_codes.kind_id, 

537 kind_codes.codes, 

538 nuts.tmin_seconds, 

539 nuts.tmin_offset, 

540 nuts.tmax_seconds, 

541 nuts.tmax_offset, 

542 kind_codes.deltat 

543 FROM files 

544 INNER JOIN nuts ON files.file_id = nuts.file_id 

545 INNER JOIN kind_codes 

546 ON nuts.kind_codes_id == kind_codes.kind_codes_id 

547 WHERE path == ? 

548 ''' 

549 

550 return [Nut(values_nocheck=(self.abspath(row[0]),) + row[1:]) 

551 for row in self._conn.execute(sql, (path,))] 

552 

553 def undig_all(self): 

554 sql = ''' 

555 SELECT 

556 files.path, 

557 files.format, 

558 files.mtime, 

559 files.size, 

560 nuts.file_segment, 

561 nuts.file_element, 

562 kind_codes.kind_id, 

563 kind_codes.codes, 

564 nuts.tmin_seconds, 

565 nuts.tmin_offset, 

566 nuts.tmax_seconds, 

567 nuts.tmax_offset, 

568 kind_codes.deltat 

569 FROM files 

570 INNER JOIN nuts ON files.file_id == nuts.file_id 

571 INNER JOIN kind_codes 

572 ON nuts.kind_codes_id == kind_codes.kind_codes_id 

573 ''' 

574 

575 nuts = [] 

576 path = None 

577 for values in self._conn.execute(sql): 

578 if path is not None and values[0] != path: 

579 yield path, nuts 

580 nuts = [] 

581 

582 path = self.abspath(values[0]) 

583 

584 if values[1] is not None: 

585 nuts.append(Nut(values_nocheck=(path,) + values[1:])) 

586 

587 if path is not None: 

588 yield path, nuts 

589 

590 def undig_few(self, paths, format='detect'): 

591 for path in paths: 

592 nuts = self.undig(path) 

593 if nuts: 

594 yield (nuts[0].file_format, path), nuts 

595 else: 

596 yield (format, path), [] 

597 

598 def undig_many(self, paths, show_progress=True): 

599 selection = self.new_selection(paths, show_progress=show_progress) 

600 

601 for (_, path), nuts in selection.undig_grouped(): 

602 yield path, nuts 

603 

604 del selection 

605 

606 def new_selection(self, paths=None, format='detect', show_progress=True): 

607 from .selection import Selection 

608 selection = Selection(self) 

609 if paths: 

610 selection.add(paths, format=format, show_progress=show_progress) 

611 return selection 

612 

613 def undig_content(self, nut): 

614 return None 

615 

616 def remove(self, path): 

617 ''' 

618 Prune content meta-information about a given file. 

619 

620 All content pieces belonging to file ``path`` are removed from the 

621 main database and any attached live selections (via database triggers). 

622 ''' 

623 

624 path = self.relpath(abspath(path)) 

625 

626 with self.transaction('remove file') as cursor: 

627 cursor.execute( 

628 'DELETE FROM files WHERE path = ?', (path,)) 

629 

630 def remove_glob(self, pattern): 

631 ''' 

632 Prune content meta-information about files matching given pattern. 

633 

634 All content pieces belonging to files who's pathes match the given 

635 ``pattern`` are removed from the main database and any attached live 

636 selections (via database triggers). 

637 ''' 

638 

639 with self.transaction('remove file glob') as cursor: 

640 return cursor.execute( 

641 'DELETE FROM files WHERE path GLOB ?', (pattern,)).rowcount 

642 

643 def _remove_volatile(self): 

644 ''' 

645 Prune leftover volatile content from database. 

646 

647 If the cleanup handler of an attached selection is not called, e.g. due 

648 to a crash or terminated process, volatile content will not be removed 

649 properly. This method will delete such leftover entries. 

650 

651 This is a mainenance operatation which should only be called when no 

652 apps are using the database because it would remove volatile content 

653 currently used by the apps. 

654 ''' 

655 

656 with self.transaction('remove volatile') as cursor: 

657 return cursor.execute( 

658 ''' 

659 DELETE FROM files 

660 WHERE path LIKE 'virtual:volatile:%' 

661 ''').rowcount 

662 

663 def reset(self, path, transaction=None): 

664 ''' 

665 Prune information associated with a given file, but keep the file path. 

666 

667 This method is called when reading a file failed. File attributes, 

668 format, size and modification time are set to NULL. File content 

669 meta-information is removed from the database and any attached live 

670 selections (via database triggers). 

671 ''' 

672 

673 path = self.relpath(abspath(path)) 

674 

675 with (transaction or self.transaction('reset file')) as cursor: 

676 cursor.execute( 

677 ''' 

678 UPDATE files SET 

679 format = NULL, 

680 mtime = NULL, 

681 size = NULL 

682 WHERE path = ? 

683 ''', (path,)) 

684 

685 def silent_touch(self, path): 

686 ''' 

687 Update modification time of file without initiating reindexing. 

688 

689 Useful to prolong validity period of data with expiration date. 

690 ''' 

691 

692 apath = abspath(path) 

693 path = self.relpath(apath) 

694 

695 with self.transaction('silent touch') as cursor: 

696 

697 sql = 'SELECT format, size FROM files WHERE path = ?' 

698 fmt, size = execute_get1(cursor, sql, (path,)) 

699 

700 mod = io.get_backend(fmt) 

701 mod.touch(apath) 

702 file_stats = mod.get_stats(apath) 

703 

704 if file_stats[1] != size: 

705 raise FileLoadError( 

706 'Silent update for file "%s" failed: size has changed.' 

707 % apath) 

708 

709 sql = ''' 

710 UPDATE files 

711 SET mtime = ? 

712 WHERE path = ? 

713 ''' 

714 cursor.execute(sql, (file_stats[0], path)) 

715 

716 def _iter_codes_info(self, kind=None, kind_codes_count='kind_codes_count'): 

717 args = [] 

718 sel = '' 

719 if kind is not None: 

720 sel = 'AND kind_codes.kind_id == ?' 

721 args.append(to_kind_id(kind)) 

722 

723 sql = (''' 

724 SELECT 

725 kind_codes.kind_id, 

726 kind_codes.codes, 

727 kind_codes.deltat, 

728 kind_codes.kind_codes_id, 

729 %(kind_codes_count)s.count 

730 FROM %(kind_codes_count)s 

731 INNER JOIN kind_codes 

732 ON %(kind_codes_count)s.kind_codes_id 

733 == kind_codes.kind_codes_id 

734 WHERE %(kind_codes_count)s.count > 0 

735 ''' + sel + ''' 

736 ''') % {'kind_codes_count': kind_codes_count} 

737 

738 for kind_id, scodes, deltat, kcid, count in self._conn.execute( 

739 sql, args): 

740 

741 yield ( 

742 kind_id, to_codes_simple(kind_id, scodes), deltat, kcid, count) 

743 

744 def _iter_deltats(self, kind=None, kind_codes_count='kind_codes_count'): 

745 args = [] 

746 sel = '' 

747 if kind is not None: 

748 assert isinstance(kind, str) 

749 sel = 'AND kind_codes.kind_id == ?' 

750 args.append(to_kind_id(kind)) 

751 

752 sql = (''' 

753 SELECT DISTINCT kind_codes.deltat FROM %(kind_codes_count)s 

754 INNER JOIN kind_codes 

755 ON %(kind_codes_count)s.kind_codes_id 

756 == kind_codes.kind_codes_id 

757 WHERE %(kind_codes_count)s.count > 0 

758 ''' + sel + ''' 

759 ORDER BY kind_codes.deltat 

760 ''') % {'kind_codes_count': kind_codes_count} 

761 

762 for row in self._conn.execute(sql, args): 

763 yield row[0] 

764 

765 def _iter_codes(self, kind=None, kind_codes_count='kind_codes_count'): 

766 args = [] 

767 sel = '' 

768 if kind is not None: 

769 assert isinstance(kind, str) 

770 sel = 'AND kind_codes.kind_id == ?' 

771 args.append(to_kind_id(kind)) 

772 

773 sql = (''' 

774 SELECT DISTINCT kind_codes.kind_id, kind_codes.codes 

775 FROM %(kind_codes_count)s 

776 INNER JOIN kind_codes 

777 ON %(kind_codes_count)s.kind_codes_id 

778 == kind_codes.kind_codes_id 

779 WHERE %(kind_codes_count)s.count > 0 

780 ''' + sel + ''' 

781 ORDER BY kind_codes.codes 

782 ''') % dict(kind_codes_count=kind_codes_count) 

783 

784 for row in self._conn.execute(sql, args): 

785 yield to_codes_simple(*row) 

786 

787 def _iter_kinds(self, codes=None, kind_codes_count='kind_codes_count'): 

788 args = [] 

789 sel = '' 

790 if codes is not None: 

791 sel = 'AND kind_codes.codes == ?' 

792 args.append(codes.safe_str) 

793 

794 sql = (''' 

795 SELECT DISTINCT kind_codes.kind_id FROM %(kind_codes_count)s 

796 INNER JOIN kind_codes 

797 ON %(kind_codes_count)s.kind_codes_id 

798 == kind_codes.kind_codes_id 

799 WHERE %(kind_codes_count)s.count > 0 

800 ''' + sel + ''' 

801 ORDER BY kind_codes.kind_id 

802 ''') % {'kind_codes_count': kind_codes_count} 

803 

804 for row in self._conn.execute(sql, args): 

805 yield to_kind(row[0]) 

806 

807 def iter_paths(self): 

808 for row in self._conn.execute('''SELECT path FROM files'''): 

809 yield self.abspath(row[0]) 

810 

811 def iter_nnuts_by_file(self): 

812 sql = ''' 

813 SELECT 

814 path, 

815 (SELECT COUNT(*) FROM nuts WHERE nuts.file_id = files.file_id) 

816 FROM files 

817 ''' 

818 for row in self._conn.execute(sql): 

819 yield (self.abspath(row[0]),) + row[1:] 

820 

821 def iter_kinds(self, codes=None): 

822 return self._iter_kinds(codes=codes) 

823 

824 def iter_codes(self, kind=None): 

825 return self._iter_codes(kind=kind) 

826 

827 def get_paths(self): 

828 return list(self.iter_paths()) 

829 

830 def get_kinds(self, codes=None): 

831 return list(self.iter_kinds(codes=codes)) 

832 

833 def get_codes(self, kind=None): 

834 return list(self.iter_codes(kind=kind)) 

835 

836 def get_counts(self, kind=None): 

837 d = {} 

838 for kind_id, codes, _, _, count in self._iter_codes_info(kind=kind): 

839 if kind_id not in d: 

840 v = d[kind_id] = {} 

841 else: 

842 v = d[kind_id] 

843 

844 if codes not in v: 

845 v[codes] = 0 

846 

847 v[codes] += count 

848 

849 if kind is not None: 

850 return d[to_kind_id(kind)] 

851 else: 

852 return dict((to_kind(kind_id), v) for (kind_id, v) in d.items()) 

853 

854 def get_nfiles(self): 

855 sql = '''SELECT COUNT(*) FROM files''' 

856 for row in self._conn.execute(sql): 

857 return row[0] 

858 

859 def get_nnuts(self): 

860 sql = '''SELECT COUNT(*) FROM nuts''' 

861 for row in self._conn.execute(sql): 

862 return row[0] 

863 

864 def get_nnuts_by_file(self): 

865 return list(self.iter_nnuts_by_file()) 

866 

867 def get_total_size(self): 

868 sql = ''' 

869 SELECT SUM(files.size) FROM files 

870 ''' 

871 

872 for row in self._conn.execute(sql): 

873 return row[0] or 0 

874 

875 def get_persistent_names(self): 

876 sql = ''' 

877 SELECT name FROM persistent 

878 ''' 

879 return [row[0] for row in self._conn.execute(sql)] 

880 

881 def get_stats(self): 

882 return DatabaseStats( 

883 nfiles=self.get_nfiles(), 

884 nnuts=self.get_nnuts(), 

885 kinds=self.get_kinds(), 

886 codes=self.get_codes(), 

887 counts=self.get_counts(), 

888 total_size=self.get_total_size(), 

889 persistent=self.get_persistent_names()) 

890 

891 def __str__(self): 

892 return str(self.get_stats()) 

893 

894 def print_tables(self, stream=None): 

895 for table in [ 

896 'persistent', 

897 'files', 

898 'nuts', 

899 'kind_codes', 

900 'kind_codes_count']: 

901 

902 self.print_table(table, stream=stream) 

903 

904 def print_table(self, name, stream=None): 

905 

906 if stream is None: 

907 stream = sys.stdout 

908 

909 class hstr(str): 

910 def __repr__(self): 

911 return self 

912 

913 w = stream.write 

914 w('\n') 

915 w('\n') 

916 w(name) 

917 w('\n') 

918 sql = 'SELECT * FROM %s' % name 

919 tab = [] 

920 if name in self._tables: 

921 headers = self._tables[name] 

922 tab.append([None for _ in headers]) 

923 tab.append([hstr(x[0]) for x in headers]) 

924 tab.append([hstr(x[1]) for x in headers]) 

925 tab.append([None for _ in headers]) 

926 

927 for row in self._conn.execute(sql): 

928 tab.append([x for x in row]) 

929 

930 widths = [ 

931 max((len(repr(x)) if x is not None else 0) for x in col) 

932 for col in zip(*tab)] 

933 

934 for row in tab: 

935 w(' '.join( 

936 (repr(x).ljust(wid) if x is not None else ''.ljust(wid, '-')) 

937 for (x, wid) in zip(row, widths))) 

938 

939 w('\n') 

940 

941 w('\n') 

942 

943 

944class DatabaseStats(Object): 

945 ''' 

946 Container to hold statistics about contents cached in meta-information db. 

947 ''' 

948 

949 nfiles = Int.T( 

950 help='Number of files in database.') 

951 nnuts = Int.T( 

952 help='Number of index nuts in database.') 

953 codes = List.T( 

954 Tuple.T(content_t=String.T()), 

955 help='Available code sequences in database, e.g. ' 

956 '(agency, network, station, location) for stations nuts.') 

957 kinds = List.T( 

958 String.T(), 

959 help='Available content types in database.') 

960 total_size = Int.T( 

961 help='Aggregated file size [bytes] of files referenced in database.') 

962 counts = Dict.T( 

963 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()), 

964 help='Breakdown of how many nuts of any content type and code ' 

965 'sequence are available in database, ``counts[kind][codes]``.') 

966 persistent = List.T( 

967 String.T(), 

968 help='Names of persistent selections stored in database.') 

969 

970 def __str__(self): 

971 kind_counts = dict( 

972 (kind, sum(self.counts[kind].values())) for kind in self.kinds) 

973 

974 codes = [c.safe_str for c in self.codes] 

975 

976 if len(codes) > 20: 

977 scodes = '\n' + util.ewrap(codes[:10], indent=' ') \ 

978 + '\n [%i more]\n' % (len(codes) - 20) \ 

979 + util.ewrap(codes[-10:], indent=' ') 

980 else: 

981 scodes = '\n' + util.ewrap(codes, indent=' ') \ 

982 if codes else '<none>' 

983 

984 s = ''' 

985Available codes: %s 

986Number of files: %i 

987Total size of known files: %s 

988Number of index nuts: %i 

989Available content kinds: %s 

990Persistent selections: %s''' % ( 

991 scodes, 

992 self.nfiles, 

993 util.human_bytesize(self.total_size), 

994 self.nnuts, 

995 ', '.join('%s: %i' % ( 

996 kind, kind_counts[kind]) for kind in sorted(self.kinds)), 

997 ', '.join(self.persistent)) 

998 

999 return s 

1000 

1001 

1002__all__ = [ 

1003 'Database', 

1004 'DatabaseStats', 

1005]