1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import sys 

7import os 

8import logging 

9import sqlite3 

10import re 

11import time 

12import types 

13import weakref 

14 

15from pyrocko.io.io_common import FileLoadError 

16from pyrocko import util 

17from pyrocko.guts import Object, Int, List, Dict, Tuple, String 

18from . import error, io 

19from .model import Nut, to_kind_id, to_kind, to_codes_simple, \ 

20 codes_patterns_for_kind 

21from .error import SquirrelError 

22 

23logger = logging.getLogger('psq.database') 

24 

25guts_prefix = 'squirrel' 

26 

27 

28def abspath(path): 

29 if not path.startswith('virtual:') and not path.startswith('client:'): 

30 return os.path.abspath(path) 

31 else: 

32 return path 

33 

34 

35def versiontuple(s): 

36 fill = [0, 0, 0] 

37 vals = [int(x) for x in s.split('.')] 

38 fill[:len(vals)] = vals 

39 return tuple(fill) 

40 

41 

42class ExecuteGet1Error(SquirrelError): 

43 pass 

44 

45 

46def execute_get1(connection, sql, args=()): 

47 rows = list(connection.execute(sql, args)) 

48 if len(rows) == 1: 

49 return rows[0] 

50 else: 

51 raise ExecuteGet1Error('Expected database entry not found.') 

52 

53 

54g_databases = {} 

55 

56 

57def get_database(path): 

58 path = os.path.abspath(path) 

59 if path not in g_databases: 

60 g_databases[path] = Database(path) 

61 

62 return g_databases[path] 

63 

64 

65def close_database(database): 

66 path = os.path.abspath(database._database_path) 

67 database._conn.close() 

68 if path in g_databases: 

69 del g_databases[path] 

70 

71 

72class Transaction(object): 

73 def __init__( 

74 self, conn, 

75 label='', 

76 mode='immediate', 

77 retry_interval=0.1, 

78 callback=None): 

79 

80 self.cursor = conn.cursor() 

81 assert mode in ('deferred', 'immediate', 'exclusive') 

82 self.mode = mode 

83 self.depth = 0 

84 self.rollback_wanted = False 

85 self.retry_interval = retry_interval 

86 self.callback = callback 

87 self.label = label 

88 self.started = False 

89 

90 def begin(self): 

91 if self.depth == 0: 

92 tries = 0 

93 while True: 

94 try: 

95 tries += 1 

96 self.cursor.execute('BEGIN %s' % self.mode.upper()) 

97 self.started = True 

98 logger.debug( 

99 'Transaction started: %-30s (pid: %s, mode: %s)' 

100 % (self.label, os.getpid(), self.mode)) 

101 

102 self.total_changes_begin \ 

103 = self.cursor.connection.total_changes 

104 break 

105 

106 except sqlite3.OperationalError as e: 

107 if not str(e) == 'database is locked': 

108 raise 

109 

110 logger.info( 

111 'Database is locked retrying in %s s: %s ' 

112 '(pid: %s, tries: %i)' % ( 

113 self.retry_interval, self.label, 

114 os.getpid(), tries)) 

115 

116 time.sleep(self.retry_interval) 

117 

118 self.depth += 1 

119 

120 def commit(self): 

121 if not self.started: 

122 raise Exception( 

123 'Trying to commit without having started a transaction.') 

124 

125 self.depth -= 1 

126 if self.depth == 0: 

127 if not self.rollback_wanted: 

128 self.cursor.execute('COMMIT') 

129 self.started = False 

130 if self.total_changes_begin is not None: 

131 total_changes = self.cursor.connection.total_changes \ 

132 - self.total_changes_begin 

133 else: 

134 total_changes = None 

135 

136 if self.callback is not None and total_changes: 

137 self.callback('modified', total_changes) 

138 

139 logger.debug( 

140 'Transaction completed: %-30s ' 

141 '(pid: %s, changes: %i)' % ( 

142 self.label, os.getpid(), total_changes or 0)) 

143 

144 else: 

145 self.cursor.execute('ROLLBACK') 

146 self.started = False 

147 logger.warning('Deferred rollback executed.') 

148 logger.debug( 

149 'Transaction failed: %-30s (pid: %s)' % ( 

150 self.label, os.getpid())) 

151 self.rollback_wanted = False 

152 

153 def rollback(self): 

154 if not self.started: 

155 raise Exception( 

156 'Trying to rollback without having started a transaction.') 

157 

158 self.depth -= 1 

159 if self.depth == 0: 

160 self.cursor.execute('ROLLBACK') 

161 self.started = False 

162 

163 logger.debug( 

164 'Transaction failed: %-30s (pid: %s)' % ( 

165 self.label, os.getpid())) 

166 

167 self.rollback_wanted = False 

168 else: 

169 logger.warning('Deferred rollback scheduled.') 

170 self.rollback_wanted = True 

171 

172 def close(self): 

173 self.cursor.close() 

174 

175 def __enter__(self): 

176 self.begin() 

177 return self.cursor 

178 

179 def __exit__(self, exc_type, exc_value, traceback): 

180 if exc_type is None: 

181 self.commit() 

182 else: 

183 self.rollback() 

184 

185 if self.depth == 0: 

186 self.close() 

187 self.callback = None 

188 

189 

190class Database(object): 

191 ''' 

192 Shared meta-information database used by Squirrel. 

193 ''' 

194 

195 def __init__(self, database_path=':memory:', log_statements=False): 

196 self._database_path = database_path 

197 if database_path != ':memory:': 

198 util.ensuredirs(database_path) 

199 

200 try: 

201 logger.debug( 

202 'Opening connection to database (threadsafety: %i): %s', 

203 sqlite3.threadsafety, 

204 database_path) 

205 

206 self._conn = sqlite3.connect( 

207 database_path, 

208 isolation_level=None, 

209 check_same_thread=False if sqlite3.threadsafety else True) 

210 

211 except sqlite3.OperationalError: 

212 raise error.SquirrelError( 

213 'Cannot connect to database: %s' % database_path) 

214 

215 self._conn.text_factory = str 

216 self._tables = {} 

217 

218 if log_statements: 

219 self._conn.set_trace_callback(self._log_statement) 

220 

221 self._listeners = [] 

222 self._initialize_db() 

223 self._basepath = None 

224 

225 self.version = None 

226 

227 def set_basepath(self, basepath): 

228 if basepath is not None: 

229 self._basepath = os.path.abspath(basepath) 

230 else: 

231 self._basepath = None 

232 

233 def relpath(self, path): 

234 if self._basepath is not None and path.startswith( 

235 self._basepath + os.path.sep): 

236 return path[len(self._basepath) + 1:] 

237 else: 

238 return path 

239 

240 def abspath(self, path): 

241 if self._basepath is not None and not path.startswith('virtual:') \ 

242 and not path.startswith('client:') \ 

243 and not os.path.isabs(path): 

244 return os.path.join(self._basepath, path) 

245 else: 

246 return path 

247 

248 def _log_statement(self, statement): 

249 logger.debug(statement) 

250 

251 def get_connection(self): 

252 return self._conn 

253 

254 def transaction(self, label='', mode='immediate'): 

255 return Transaction( 

256 self._conn, 

257 label=label, 

258 mode=mode, 

259 callback=self._notify_listeners) 

260 

261 def add_listener(self, listener): 

262 if isinstance(listener, types.MethodType): 

263 listener_ref = weakref.WeakMethod(listener) 

264 else: 

265 listener_ref = weakref.ref(listener) 

266 

267 self._listeners.append(listener_ref) 

268 return listener_ref 

269 

270 def remove_listener(self, listener_ref): 

271 self._listeners.remove(listener_ref) 

272 

273 def _notify_listeners(self, event, *args): 

274 dead = [] 

275 for listener_ref in self._listeners: 

276 listener = listener_ref() 

277 if listener is not None: 

278 listener(event, *args) 

279 else: 

280 dead.append(listener_ref) 

281 

282 for listener_ref in dead: 

283 self.remove_listener(listener_ref) 

284 

285 def _register_table(self, s): 

286 m = re.search(r'(\S+)\s*\(([^)]+)\)', s) 

287 table_name = m.group(1) 

288 dtypes = m.group(2) 

289 table_header = [] 

290 for dele in dtypes.split(','): 

291 table_header.append(dele.split()[:2]) 

292 

293 self._tables[table_name] = table_header 

294 

295 return s 

296 

297 def _initialize_db(self): 

298 with self.transaction('initialize') as cursor: 

299 cursor.execute( 

300 '''PRAGMA recursive_triggers = true''') 

301 

302 cursor.execute( 

303 '''PRAGMA busy_timeout = 30000''') 

304 

305 if 2 == len(list( 

306 cursor.execute( 

307 ''' 

308 SELECT name FROM sqlite_master 

309 WHERE type = 'table' AND name IN ( 

310 'files', 

311 'persistent') 

312 '''))): 

313 

314 try: 

315 self.version = versiontuple(execute_get1( 

316 cursor, 

317 ''' 

318 SELECT value FROM settings 

319 WHERE key == "version" 

320 ''')[0]) 

321 except sqlite3.OperationalError: 

322 raise error.SquirrelError( 

323 'Squirrel database in pre-release format found: %s\n' 

324 'Please remove the database file and reindex.' 

325 % self._database_path) 

326 

327 if self.version >= (1, 1, 0): 

328 raise error.SquirrelError( 

329 'Squirrel database "%s" is of version %i.%i.%i which ' 

330 'is not supported by this version of Pyrocko. Please ' 

331 'upgrade the Pyrocko library.' 

332 % ((self._database_path, ) + self.version)) 

333 

334 return 

335 

336 cursor.execute(self._register_table( 

337 ''' 

338 CREATE TABLE IF NOT EXISTS settings ( 

339 key text PRIMARY KEY, 

340 value text) 

341 ''')) 

342 

343 cursor.execute( 

344 'INSERT OR IGNORE INTO settings VALUES ("version", "1.0")') 

345 

346 self.version = execute_get1( 

347 cursor, 

348 'SELECT value FROM settings WHERE key == "version"') 

349 

350 cursor.execute(self._register_table( 

351 ''' 

352 CREATE TABLE IF NOT EXISTS files ( 

353 file_id integer PRIMARY KEY, 

354 path text, 

355 format text, 

356 mtime float, 

357 size integer) 

358 ''')) 

359 

360 cursor.execute( 

361 ''' 

362 CREATE UNIQUE INDEX IF NOT EXISTS index_files_path 

363 ON files (path) 

364 ''') 

365 

366 cursor.execute(self._register_table( 

367 ''' 

368 CREATE TABLE IF NOT EXISTS nuts ( 

369 nut_id integer PRIMARY KEY AUTOINCREMENT, 

370 file_id integer, 

371 file_segment integer, 

372 file_element integer, 

373 kind_id integer, 

374 kind_codes_id integer, 

375 tmin_seconds integer, 

376 tmin_offset integer, 

377 tmax_seconds integer, 

378 tmax_offset integer, 

379 kscale integer) 

380 ''')) 

381 

382 cursor.execute( 

383 ''' 

384 CREATE UNIQUE INDEX IF NOT EXISTS index_nuts_file_element 

385 ON nuts (file_id, file_segment, file_element) 

386 ''') 

387 

388 cursor.execute(self._register_table( 

389 ''' 

390 CREATE TABLE IF NOT EXISTS kind_codes ( 

391 kind_codes_id integer PRIMARY KEY, 

392 kind_id integer, 

393 codes text, 

394 deltat float) 

395 ''')) 

396 

397 cursor.execute( 

398 ''' 

399 CREATE UNIQUE INDEX IF NOT EXISTS index_kind_codes 

400 ON kind_codes (kind_id, codes, deltat) 

401 ''') 

402 

403 cursor.execute(self._register_table( 

404 ''' 

405 CREATE TABLE IF NOT EXISTS kind_codes_count ( 

406 kind_codes_id integer PRIMARY KEY, 

407 count integer) 

408 ''')) 

409 

410 cursor.execute( 

411 ''' 

412 CREATE INDEX IF NOT EXISTS index_nuts_file_id 

413 ON nuts (file_id) 

414 ''') 

415 

416 cursor.execute( 

417 ''' 

418 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_delete_file 

419 BEFORE DELETE ON files FOR EACH ROW 

420 BEGIN 

421 DELETE FROM nuts where file_id == old.file_id; 

422 END 

423 ''') 

424 

425 # trigger only on size to make silent update of mtime possible 

426 cursor.execute( 

427 ''' 

428 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_update_file 

429 BEFORE UPDATE OF size ON files FOR EACH ROW 

430 BEGIN 

431 DELETE FROM nuts where file_id == old.file_id; 

432 END 

433 ''') 

434 

435 cursor.execute( 

436 ''' 

437 CREATE TRIGGER IF NOT EXISTS increment_kind_codes 

438 BEFORE INSERT ON nuts FOR EACH ROW 

439 BEGIN 

440 INSERT OR IGNORE INTO kind_codes_count 

441 VALUES (new.kind_codes_id, 0); 

442 UPDATE kind_codes_count 

443 SET count = count + 1 

444 WHERE new.kind_codes_id == kind_codes_id; 

445 END 

446 ''') 

447 

448 cursor.execute( 

449 ''' 

450 CREATE TRIGGER IF NOT EXISTS decrement_kind_codes 

451 BEFORE DELETE ON nuts FOR EACH ROW 

452 BEGIN 

453 UPDATE kind_codes_count 

454 SET count = count - 1 

455 WHERE old.kind_codes_id == kind_codes_id; 

456 END 

457 ''') 

458 

459 cursor.execute(self._register_table( 

460 ''' 

461 CREATE TABLE IF NOT EXISTS persistent ( 

462 name text UNIQUE) 

463 ''')) 

464 

465 def dig(self, nuts, transaction=None): 

466 ''' 

467 Store or update content meta-information. 

468 

469 Given ``nuts`` are assumed to represent an up-to-date and complete 

470 inventory of a set of files. Any old information about these files is 

471 first pruned from the database (via database triggers). If such content 

472 is part of a live selection, it is also removed there. Then the new 

473 content meta-information is inserted into the main database. The 

474 content is not automatically inserted into the live selections again. 

475 It is in the responsibility of the selection object to perform this 

476 step. 

477 ''' 

478 

479 nuts = list(nuts) 

480 

481 if not nuts: 

482 return 

483 

484 files = set() 

485 kind_codes = set() 

486 for nut in nuts: 

487 files.add(( 

488 self.relpath(nut.file_path), 

489 nut.file_format, 

490 nut.file_mtime, 

491 nut.file_size)) 

492 kind_codes.add( 

493 (nut.kind_id, nut.codes.safe_str, nut.deltat or 0.0)) 

494 

495 with (transaction or self.transaction('dig')) as c: 

496 

497 c.executemany( 

498 'INSERT OR IGNORE INTO files VALUES (NULL,?,?,?,?)', files) 

499 

500 c.executemany( 

501 '''UPDATE files SET 

502 format = ?, mtime = ?, size = ? 

503 WHERE path == ? 

504 ''', 

505 ((x[1], x[2], x[3], x[0]) for x in files)) 

506 

507 c.executemany( 

508 'INSERT OR IGNORE INTO kind_codes VALUES (NULL,?,?,?)', 

509 kind_codes) 

510 

511 c.executemany( 

512 ''' 

513 INSERT INTO nuts VALUES 

514 (NULL, ( 

515 SELECT file_id FROM files 

516 WHERE path == ? 

517 ),?,?,?, 

518 ( 

519 SELECT kind_codes_id FROM kind_codes 

520 WHERE kind_id == ? AND codes == ? AND deltat == ? 

521 ), ?,?,?,?,?) 

522 ''', 

523 ((self.relpath(nut.file_path), 

524 nut.file_segment, nut.file_element, 

525 nut.kind_id, 

526 nut.kind_id, nut.codes.safe_str, nut.deltat or 0.0, 

527 nut.tmin_seconds, nut.tmin_offset, 

528 nut.tmax_seconds, nut.tmax_offset, 

529 nut.kscale) for nut in nuts)) 

530 

531 def undig(self, path): 

532 

533 path = self.relpath(abspath(path)) 

534 

535 sql = ''' 

536 SELECT 

537 files.path, 

538 files.format, 

539 files.mtime, 

540 files.size, 

541 nuts.file_segment, 

542 nuts.file_element, 

543 kind_codes.kind_id, 

544 kind_codes.codes, 

545 nuts.tmin_seconds, 

546 nuts.tmin_offset, 

547 nuts.tmax_seconds, 

548 nuts.tmax_offset, 

549 kind_codes.deltat 

550 FROM files 

551 INNER JOIN nuts ON files.file_id = nuts.file_id 

552 INNER JOIN kind_codes 

553 ON nuts.kind_codes_id == kind_codes.kind_codes_id 

554 WHERE path == ? 

555 ''' 

556 

557 return [Nut(values_nocheck=(self.abspath(row[0]),) + row[1:]) 

558 for row in self._conn.execute(sql, (path,))] 

559 

560 def undig_all(self): 

561 sql = ''' 

562 SELECT 

563 files.path, 

564 files.format, 

565 files.mtime, 

566 files.size, 

567 nuts.file_segment, 

568 nuts.file_element, 

569 kind_codes.kind_id, 

570 kind_codes.codes, 

571 nuts.tmin_seconds, 

572 nuts.tmin_offset, 

573 nuts.tmax_seconds, 

574 nuts.tmax_offset, 

575 kind_codes.deltat 

576 FROM files 

577 INNER JOIN nuts ON files.file_id == nuts.file_id 

578 INNER JOIN kind_codes 

579 ON nuts.kind_codes_id == kind_codes.kind_codes_id 

580 ''' 

581 

582 nuts = [] 

583 path = None 

584 for values in self._conn.execute(sql): 

585 if path is not None and values[0] != path: 

586 yield path, nuts 

587 nuts = [] 

588 

589 path = self.abspath(values[0]) 

590 

591 if values[1] is not None: 

592 nuts.append(Nut(values_nocheck=(path,) + values[1:])) 

593 

594 if path is not None: 

595 yield path, nuts 

596 

597 def undig_few(self, paths, format='detect'): 

598 for path in paths: 

599 nuts = self.undig(path) 

600 if nuts: 

601 yield (nuts[0].file_format, path), nuts 

602 else: 

603 yield (format, path), [] 

604 

605 def undig_many(self, paths, show_progress=True): 

606 selection = self.new_selection(paths, show_progress=show_progress) 

607 

608 for (_, path), nuts in selection.undig_grouped(): 

609 yield path, nuts 

610 

611 del selection 

612 

613 def new_selection(self, paths=None, format='detect', show_progress=True): 

614 from .selection import Selection 

615 selection = Selection(self) 

616 if paths: 

617 selection.add(paths, format=format, show_progress=show_progress) 

618 return selection 

619 

620 def undig_content(self, nut): 

621 return None 

622 

623 def remove(self, path): 

624 ''' 

625 Prune content meta-information about a given file. 

626 

627 All content pieces belonging to file ``path`` are removed from the 

628 main database and any attached live selections (via database triggers). 

629 ''' 

630 

631 path = self.relpath(abspath(path)) 

632 

633 with self.transaction('remove file') as cursor: 

634 cursor.execute( 

635 'DELETE FROM files WHERE path = ?', (path,)) 

636 

637 def remove_glob(self, pattern): 

638 ''' 

639 Prune content meta-information about files matching given pattern. 

640 

641 All content pieces belonging to files who's pathes match the given 

642 ``pattern`` are removed from the main database and any attached live 

643 selections (via database triggers). 

644 ''' 

645 

646 with self.transaction('remove file glob') as cursor: 

647 return cursor.execute( 

648 'DELETE FROM files WHERE path GLOB ?', (pattern,)).rowcount 

649 

650 def _remove_volatile(self): 

651 ''' 

652 Prune leftover volatile content from database. 

653 

654 If the cleanup handler of an attached selection is not called, e.g. due 

655 to a crash or terminated process, volatile content will not be removed 

656 properly. This method will delete such leftover entries. 

657 

658 This is a mainenance operatation which should only be called when no 

659 apps are using the database because it would remove volatile content 

660 currently used by the apps. 

661 ''' 

662 

663 with self.transaction('remove volatile') as cursor: 

664 return cursor.execute( 

665 ''' 

666 DELETE FROM files 

667 WHERE path LIKE 'virtual:volatile:%' 

668 ''').rowcount 

669 

670 def reset(self, path, transaction=None): 

671 ''' 

672 Prune information associated with a given file, but keep the file path. 

673 

674 This method is called when reading a file failed. File attributes, 

675 format, size and modification time are set to NULL. File content 

676 meta-information is removed from the database and any attached live 

677 selections (via database triggers). 

678 ''' 

679 

680 path = self.relpath(abspath(path)) 

681 

682 with (transaction or self.transaction('reset file')) as cursor: 

683 cursor.execute( 

684 ''' 

685 UPDATE files SET 

686 format = NULL, 

687 mtime = NULL, 

688 size = NULL 

689 WHERE path = ? 

690 ''', (path,)) 

691 

692 def silent_touch(self, path): 

693 ''' 

694 Update modification time of file without initiating reindexing. 

695 

696 Useful to prolong validity period of data with expiration date. 

697 ''' 

698 

699 apath = abspath(path) 

700 path = self.relpath(apath) 

701 

702 with self.transaction('silent touch') as cursor: 

703 

704 sql = 'SELECT format, size FROM files WHERE path = ?' 

705 fmt, size = execute_get1(cursor, sql, (path,)) 

706 

707 mod = io.get_backend(fmt) 

708 mod.touch(apath) 

709 file_stats = mod.get_stats(apath) 

710 

711 if file_stats[1] != size: 

712 raise FileLoadError( 

713 'Silent update for file "%s" failed: size has changed.' 

714 % apath) 

715 

716 sql = ''' 

717 UPDATE files 

718 SET mtime = ? 

719 WHERE path = ? 

720 ''' 

721 cursor.execute(sql, (file_stats[0], path)) 

722 

723 def _iter_codes_info( 

724 self, kind=None, codes=None, kind_codes_count='kind_codes_count'): 

725 

726 args = [] 

727 sel = '' 

728 if kind is not None: 

729 kind_id = to_kind_id(kind) 

730 

731 sel = 'AND kind_codes.kind_id == ?' 

732 args.append(to_kind_id(kind)) 

733 

734 if codes is not None: 

735 assert kind is not None # TODO supp by recursing possible kinds 

736 kind_id = to_kind_id(kind) 

737 pats = codes_patterns_for_kind(kind_id, codes) 

738 

739 if pats: 

740 # could optimize this by using IN for non-patterns 

741 sel += ' AND ( %s ) ' % ' OR '.join( 

742 ('kind_codes.codes GLOB ?',) * len(pats)) 

743 

744 args.extend(pat.safe_str for pat in pats) 

745 

746 sql = (''' 

747 SELECT 

748 kind_codes.kind_id, 

749 kind_codes.codes, 

750 kind_codes.deltat, 

751 kind_codes.kind_codes_id, 

752 %(kind_codes_count)s.count 

753 FROM %(kind_codes_count)s 

754 INNER JOIN kind_codes 

755 ON %(kind_codes_count)s.kind_codes_id 

756 == kind_codes.kind_codes_id 

757 WHERE %(kind_codes_count)s.count > 0 

758 ''' + sel + ''' 

759 ''') % {'kind_codes_count': kind_codes_count} 

760 

761 for kind_id, scodes, deltat, kcid, count in self._conn.execute( 

762 sql, args): 

763 

764 yield ( 

765 kind_id, to_codes_simple(kind_id, scodes), deltat, kcid, count) 

766 

767 def _iter_deltats(self, kind=None, kind_codes_count='kind_codes_count'): 

768 args = [] 

769 sel = '' 

770 if kind is not None: 

771 assert isinstance(kind, str) 

772 sel = 'AND kind_codes.kind_id == ?' 

773 args.append(to_kind_id(kind)) 

774 

775 sql = (''' 

776 SELECT DISTINCT kind_codes.deltat FROM %(kind_codes_count)s 

777 INNER JOIN kind_codes 

778 ON %(kind_codes_count)s.kind_codes_id 

779 == kind_codes.kind_codes_id 

780 WHERE %(kind_codes_count)s.count > 0 

781 ''' + sel + ''' 

782 ORDER BY kind_codes.deltat 

783 ''') % {'kind_codes_count': kind_codes_count} 

784 

785 for row in self._conn.execute(sql, args): 

786 yield row[0] 

787 

788 def _iter_codes(self, kind=None, kind_codes_count='kind_codes_count'): 

789 args = [] 

790 sel = '' 

791 if kind is not None: 

792 assert isinstance(kind, str) 

793 sel = 'AND kind_codes.kind_id == ?' 

794 args.append(to_kind_id(kind)) 

795 

796 sql = (''' 

797 SELECT DISTINCT kind_codes.kind_id, kind_codes.codes 

798 FROM %(kind_codes_count)s 

799 INNER JOIN kind_codes 

800 ON %(kind_codes_count)s.kind_codes_id 

801 == kind_codes.kind_codes_id 

802 WHERE %(kind_codes_count)s.count > 0 

803 ''' + sel + ''' 

804 ORDER BY kind_codes.codes 

805 

806 ''') % dict(kind_codes_count=kind_codes_count) 

807 

808 for row in self._conn.execute(sql, args): 

809 yield to_codes_simple(*row) 

810 

811 def _iter_kinds(self, codes=None, kind_codes_count='kind_codes_count'): 

812 args = [] 

813 sel = '' 

814 if codes is not None: 

815 sel = 'AND kind_codes.codes == ?' 

816 args.append(codes.safe_str) 

817 

818 sql = (''' 

819 SELECT DISTINCT kind_codes.kind_id FROM %(kind_codes_count)s 

820 INNER JOIN kind_codes 

821 ON %(kind_codes_count)s.kind_codes_id 

822 == kind_codes.kind_codes_id 

823 WHERE %(kind_codes_count)s.count > 0 

824 ''' + sel + ''' 

825 ORDER BY kind_codes.kind_id 

826 ''') % {'kind_codes_count': kind_codes_count} 

827 

828 for row in self._conn.execute(sql, args): 

829 yield to_kind(row[0]) 

830 

831 def iter_paths(self): 

832 for row in self._conn.execute('''SELECT path FROM files'''): 

833 yield self.abspath(row[0]) 

834 

835 def iter_nnuts_by_file(self): 

836 sql = ''' 

837 SELECT 

838 path, 

839 (SELECT COUNT(*) FROM nuts WHERE nuts.file_id = files.file_id) 

840 FROM files 

841 ''' 

842 for row in self._conn.execute(sql): 

843 yield (self.abspath(row[0]),) + row[1:] 

844 

845 def iter_kinds(self, codes=None): 

846 return self._iter_kinds(codes=codes) 

847 

848 def iter_codes(self, kind=None): 

849 return self._iter_codes(kind=kind) 

850 

851 def get_paths(self): 

852 return list(self.iter_paths()) 

853 

854 def get_kinds(self, codes=None): 

855 return list(self.iter_kinds(codes=codes)) 

856 

857 def get_codes(self, kind=None): 

858 return list(self.iter_codes(kind=kind)) 

859 

860 def get_counts(self, kind=None): 

861 d = {} 

862 for kind_id, codes, _, _, count in self._iter_codes_info(kind=kind): 

863 if kind_id not in d: 

864 v = d[kind_id] = {} 

865 else: 

866 v = d[kind_id] 

867 

868 if codes not in v: 

869 v[codes] = 0 

870 

871 v[codes] += count 

872 

873 if kind is not None: 

874 return d[to_kind_id(kind)] 

875 else: 

876 return dict((to_kind(kind_id), v) for (kind_id, v) in d.items()) 

877 

878 def get_nfiles(self): 

879 sql = '''SELECT COUNT(*) FROM files''' 

880 for row in self._conn.execute(sql): 

881 return row[0] 

882 

883 def get_nnuts(self): 

884 sql = '''SELECT COUNT(*) FROM nuts''' 

885 for row in self._conn.execute(sql): 

886 return row[0] 

887 

888 def get_nnuts_by_file(self): 

889 return list(self.iter_nnuts_by_file()) 

890 

891 def get_total_size(self): 

892 sql = ''' 

893 SELECT SUM(files.size) FROM files 

894 ''' 

895 

896 for row in self._conn.execute(sql): 

897 return row[0] or 0 

898 

899 def get_persistent_names(self): 

900 sql = ''' 

901 SELECT name FROM persistent 

902 ''' 

903 return [row[0] for row in self._conn.execute(sql)] 

904 

905 def get_stats(self): 

906 return DatabaseStats( 

907 nfiles=self.get_nfiles(), 

908 nnuts=self.get_nnuts(), 

909 kinds=self.get_kinds(), 

910 codes=self.get_codes(), 

911 counts=self.get_counts(), 

912 total_size=self.get_total_size(), 

913 persistent=self.get_persistent_names()) 

914 

915 def __str__(self): 

916 return str(self.get_stats()) 

917 

918 def print_tables(self, stream=None): 

919 for table in [ 

920 'persistent', 

921 'files', 

922 'nuts', 

923 'kind_codes', 

924 'kind_codes_count']: 

925 

926 self.print_table(table, stream=stream) 

927 

928 def print_table(self, name, stream=None): 

929 

930 if stream is None: 

931 stream = sys.stdout 

932 

933 class hstr(str): 

934 def __repr__(self): 

935 return self 

936 

937 w = stream.write 

938 w('\n') 

939 w('\n') 

940 w(name) 

941 w('\n') 

942 sql = 'SELECT * FROM %s' % name 

943 tab = [] 

944 if name in self._tables: 

945 headers = self._tables[name] 

946 tab.append([None for _ in headers]) 

947 tab.append([hstr(x[0]) for x in headers]) 

948 tab.append([hstr(x[1]) for x in headers]) 

949 tab.append([None for _ in headers]) 

950 

951 for row in self._conn.execute(sql): 

952 tab.append([x for x in row]) 

953 

954 widths = [ 

955 max((len(repr(x)) if x is not None else 0) for x in col) 

956 for col in zip(*tab)] 

957 

958 for row in tab: 

959 w(' '.join( 

960 (repr(x).ljust(wid) if x is not None else ''.ljust(wid, '-')) 

961 for (x, wid) in zip(row, widths))) 

962 

963 w('\n') 

964 

965 w('\n') 

966 

967 

968class DatabaseStats(Object): 

969 ''' 

970 Container to hold statistics about contents cached in meta-information db. 

971 ''' 

972 

973 nfiles = Int.T( 

974 help='Number of files in database.') 

975 nnuts = Int.T( 

976 help='Number of index nuts in database.') 

977 codes = List.T( 

978 Tuple.T(content_t=String.T()), 

979 help='Available code sequences in database, e.g. ' 

980 '(agency, network, station, location) for stations nuts.') 

981 kinds = List.T( 

982 String.T(), 

983 help='Available content types in database.') 

984 total_size = Int.T( 

985 help='Aggregated file size [bytes] of files referenced in database.') 

986 counts = Dict.T( 

987 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()), 

988 help='Breakdown of how many nuts of any content type and code ' 

989 'sequence are available in database, ``counts[kind][codes]``.') 

990 persistent = List.T( 

991 String.T(), 

992 help='Names of persistent selections stored in database.') 

993 

994 def __str__(self): 

995 kind_counts = dict( 

996 (kind, sum(self.counts[kind].values())) for kind in self.kinds) 

997 

998 codes = [c.safe_str for c in self.codes] 

999 

1000 if len(codes) > 20: 

1001 scodes = '\n' + util.ewrap(codes[:10], indent=' ') \ 

1002 + '\n [%i more]\n' % (len(codes) - 20) \ 

1003 + util.ewrap(codes[-10:], indent=' ') 

1004 else: 

1005 scodes = '\n' + util.ewrap(codes, indent=' ') \ 

1006 if codes else '<none>' 

1007 

1008 s = ''' 

1009Available codes: %s 

1010Number of files: %i 

1011Total size of known files: %s 

1012Number of index nuts: %i 

1013Available content kinds: %s 

1014Persistent selections: %s''' % ( 

1015 scodes, 

1016 self.nfiles, 

1017 util.human_bytesize(self.total_size), 

1018 self.nnuts, 

1019 ', '.join('%s: %i' % ( 

1020 kind, kind_counts[kind]) for kind in sorted(self.kinds)), 

1021 ', '.join(self.persistent)) 

1022 

1023 return s 

1024 

1025 

1026__all__ = [ 

1027 'Database', 

1028 'DatabaseStats', 

1029]