Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/database.py: 83%

532 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-12-04 10:41 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Database interface code. 

8''' 

9 

10import sys 

11import os 

12import logging 

13import sqlite3 

14import re 

15import time 

16import threading 

17 

18from pyrocko.io.io_common import FileLoadError 

19from pyrocko import util 

20from pyrocko.guts import Object, Int, List, Dict, Tuple, String 

21from . import error, io 

22from .model import Nut, to_kind_id, to_kind, to_codes_simple, \ 

23 codes_patterns_for_kind 

24from .error import SquirrelError 

25 

26logger = logging.getLogger('psq.database') 

27 

28guts_prefix = 'squirrel' 

29 

30RLOCK_DEBUG = False 

31 

32 

33def tid(): 

34 return threading.get_ident() 

35 

36 

37def abspath(path): 

38 if not path.startswith('virtual:') and not path.startswith('client:'): 

39 return os.path.abspath(path) 

40 else: 

41 return path 

42 

43 

44def versiontuple(s): 

45 fill = [0, 0, 0] 

46 vals = [int(x) for x in s.split('.')] 

47 fill[:len(vals)] = vals 

48 return tuple(fill) 

49 

50 

51class ExecuteGet1Error(SquirrelError): 

52 pass 

53 

54 

55def execute_get1(connection, sql, args=()): 

56 rows = list(connection.execute(sql, args)) 

57 if len(rows) == 1: 

58 return rows[0] 

59 else: 

60 raise ExecuteGet1Error('Expected database entry not found.') 

61 

62 

63g_databases = {} 

64 

65 

66def get_database(path): 

67 path = os.path.abspath(path) 

68 if path not in g_databases: 

69 g_databases[path] = Database(path) 

70 

71 return g_databases[path] 

72 

73 

74def close_database(database): 

75 path = os.path.abspath(database._database_path) 

76 database._conn.close() 

77 if path in g_databases: 

78 del g_databases[path] 

79 

80 

81icolor = 0 

82 

83ansi_colors = [ 

84 '\033[%im' % i for i in range(31, 37 + 1)] 

85 

86ansi_reset = '\033[39m' 

87 

88 

89def next_color(): 

90 global icolor 

91 color = ansi_colors[icolor % len(ansi_colors)] 

92 icolor += 1 

93 return color 

94 

95 

96thread_colors = {} 

97process_colors = {} 

98 

99 

100def thread_color(): 

101 

102 itid = tid() 

103 if itid not in thread_colors: 

104 thread_colors[itid] = next_color() 

105 return thread_colors[itid] 

106 

107 

108def process_color(): 

109 ipid = os.getpid() 

110 return ansi_colors[ipid % len(ansi_colors)] 

111 

112 

113def color_tid_pid(): 

114 return '%stid: %s%s, %spid: %s%s' % ( 

115 thread_color(), tid(), ansi_reset, 

116 process_color(), os.getpid(), ansi_reset) 

117 

118 

119def get_RLock(): 

120 if RLOCK_DEBUG: 

121 RLockBase = threading.RLock().__class__ 

122 

123 class RLockDebug(RLockBase): 

124 def acquire(self): 

125 logger.debug('Waiting for lock (%s).' % color_tid_pid()) 

126 ret = RLockBase.acquire(self) 

127 logger.debug('Got lock (%s).' % color_tid_pid()) 

128 return ret 

129 

130 def release(self): 

131 logger.debug( 

132 'Releasing lock (%s).' % color_tid_pid()) 

133 return RLockBase.release(self) 

134 

135 return RLockDebug() 

136 else: 

137 return threading.RLock() 

138 

139 

140LOCK = get_RLock() 

141 

142 

143class Transaction(object): 

144 def __init__( 

145 self, conn, 

146 label='', 

147 mode='immediate', 

148 retry_interval=0.1, 

149 callback=None): 

150 

151 self.cursor = conn.cursor() 

152 assert mode in ('deferred', 'immediate', 'exclusive') 

153 self.mode = mode 

154 self.depth = 0 

155 self.rollback_wanted = False 

156 self.retry_interval = retry_interval 

157 self.callback = callback 

158 self.label = label 

159 self.started = False 

160 self.closed = False 

161 self.debug = logger.isEnabledFor(logging.DEBUG) 

162 

163 def begin(self): 

164 LOCK.acquire() 

165 if self.depth == 0: 

166 tries = 0 

167 while True: 

168 try: 

169 tries += 1 

170 if self.debug: 

171 logger.debug( 

172 'Waiting for transaction: %-30s ' 

173 '(%s, mode: %s)', 

174 self.label, 

175 color_tid_pid(), 

176 self.mode) 

177 

178 self.cursor.execute('BEGIN %s' % self.mode.upper()) 

179 self.started = True 

180 if self.debug: 

181 logger.debug( 

182 'Transaction started: %-30s ' 

183 '(%s, mode: %s)', 

184 self.label, 

185 color_tid_pid(), 

186 self.mode) 

187 

188 self.total_changes_begin \ 

189 = self.cursor.connection.total_changes 

190 break 

191 

192 except sqlite3.OperationalError as e: 

193 if not str(e) == 'database is locked': 

194 raise 

195 

196 logger.info( 

197 'Database is locked retrying in %s s: %s ' 

198 '(%s, tries: %i)' % ( 

199 self.retry_interval, self.label, 

200 color_tid_pid(), tries)) 

201 

202 time.sleep(self.retry_interval) 

203 

204 self.depth += 1 

205 

206 def commit(self): 

207 try: 

208 if not self.started: 

209 raise Exception( 

210 'Trying to commit without having started a transaction ' 

211 '(%s, %s)' % ( 

212 self.label, 

213 color_tid_pid())) 

214 

215 self.depth -= 1 

216 if self.depth == 0: 

217 if not self.rollback_wanted: 

218 self.cursor.execute('COMMIT') 

219 self.started = False 

220 if self.total_changes_begin is not None: 

221 total_changes = self.cursor.connection.total_changes \ 

222 - self.total_changes_begin 

223 else: 

224 total_changes = None 

225 

226 if self.callback is not None and total_changes: 

227 self.callback('modified', total_changes) 

228 

229 if self.debug: 

230 logger.debug( 

231 'Transaction completed: %-30s ' 

232 '(%s, changes: %i)', 

233 self.label, 

234 color_tid_pid(), 

235 total_changes or 0) 

236 

237 else: 

238 self.cursor.execute('ROLLBACK') 

239 self.started = False 

240 logger.warning('Deferred rollback executed.') 

241 if self.debug: 

242 logger.debug( 

243 'Transaction failed: %-30s (%s)', 

244 self.label, 

245 color_tid_pid()) 

246 

247 self.rollback_wanted = False 

248 finally: 

249 LOCK.release() 

250 

251 def rollback(self): 

252 try: 

253 if not self.started: 

254 raise Exception( 

255 'Trying to rollback without having started a transaction.') 

256 

257 self.depth -= 1 

258 if self.depth == 0: 

259 self.cursor.execute('ROLLBACK') 

260 self.started = False 

261 

262 if self.debug: 

263 logger.debug( 

264 'Transaction failed: %-30s (pid: %s)', 

265 self.label, 

266 os.getpid()) 

267 

268 self.rollback_wanted = False 

269 else: 

270 logger.warning('Deferred rollback scheduled.') 

271 self.rollback_wanted = True 

272 

273 finally: 

274 LOCK.release() 

275 

276 def close(self): 

277 self.cursor.close() 

278 self.closed = True 

279 

280 def __enter__(self): 

281 LOCK.acquire() 

282 self.begin() 

283 return self.cursor 

284 

285 def __exit__(self, exc_type, exc_value, traceback): 

286 if exc_type is None: 

287 self.commit() 

288 else: 

289 self.rollback() 

290 

291 if self.depth == 0: 

292 self.close() 

293 self.callback = None 

294 

295 LOCK.release() 

296 

297 

298class xlist(list): 

299 pass 

300 

301 

302class Cursor(sqlite3.Cursor): 

303 

304 def execute(self, *args, **kwargs): 

305 with LOCK: 

306 r = super().execute(*args, **kwargs) 

307 xl = xlist(r) 

308 xl.rowcount = r.rowcount 

309 return xl 

310 

311 def execute_nolock(self, *args, **kwargs): 

312 return super().execute(*args, **kwargs) 

313 

314 def executemany(self, *args, **kwargs): 

315 with LOCK: 

316 return super().executemany(*args, **kwargs) 

317 

318 def executescript(self, *args, **kwargs): 

319 with LOCK: 

320 return super().executescript(*args, **kwargs) 

321 

322 

323class Connection(sqlite3.Connection): 

324 def cursor(self, factory=Cursor): 

325 return factory(self) 

326 

327 def execute(self, *args, **kwargs): 

328 cursor = self.cursor() 

329 return cursor.execute(*args, **kwargs) 

330 

331 def execute_nolock(self, *args, **kwargs): 

332 cursor = self.cursor() 

333 return cursor.execute_nolock(*args, **kwargs) 

334 

335 def executemany(self, *args, **kwargs): 

336 cursor = self.cursor() 

337 return cursor.executemany(*args, **kwargs) 

338 

339 def executescript(self, *args, **kwargs): 

340 cursor = self.cursor() 

341 return cursor.executescript(*args, **kwargs) 

342 

343 def close(self): 

344 if hasattr(self, '_close_handlers'): 

345 for handler in self._close_handlers: 

346 handler() 

347 

348 sqlite3.Connection.close(self) 

349 

350 def on_close(self, handler): 

351 if not hasattr(self, '_close_handlers'): 

352 self._close_handlers = [] 

353 

354 self._close_handlers.append(handler) 

355 

356 

357class Database(object): 

358 ''' 

359 Shared meta-information database used by Squirrel. 

360 ''' 

361 

362 def __init__(self, database_path=':memory:', log_statements=False): 

363 self._database_path = database_path 

364 if database_path != ':memory:': 

365 util.ensuredirs(database_path) 

366 

367 try: 

368 logger.debug( 

369 'Opening connection to database (threadsafety: %i): %s', 

370 sqlite3.threadsafety, 

371 database_path) 

372 

373 self._conn = sqlite3.connect( 

374 database_path, 

375 isolation_level=None, 

376 factory=Connection, 

377 check_same_thread=False if sqlite3.threadsafety == 3 else True) 

378 

379 except sqlite3.OperationalError: 

380 raise error.SquirrelError( 

381 'Cannot connect to database: %s' % database_path) 

382 

383 self._conn.on_close(self.close) 

384 

385 self._conn.text_factory = str 

386 self._tables = {} 

387 

388 if log_statements: 

389 self._conn.set_trace_callback(self._log_statement) 

390 

391 self._transaction = {} 

392 self._listeners = [] 

393 self._initialize_db() 

394 self._basepath = None 

395 

396 self._selections = [] 

397 

398 self.version = None 

399 

400 def __del__(self): 

401 self.close() 

402 

403 def close(self): 

404 for selection in self._selections[:]: 

405 selection.close() 

406 

407 self._selections[:] = [] 

408 

409 def add_selection(self, selection): 

410 self._selections.append(selection) 

411 

412 def remove_selection(self, selection): 

413 self._selections.remove(selection) 

414 

415 def set_basepath(self, basepath): 

416 if basepath is not None: 

417 self._basepath = os.path.abspath(basepath) 

418 else: 

419 self._basepath = None 

420 

421 def relpath(self, path): 

422 if self._basepath is not None and path.startswith( 

423 self._basepath + os.path.sep): 

424 return path[len(self._basepath) + 1:] 

425 else: 

426 return path 

427 

428 def abspath(self, path): 

429 if self._basepath is not None and not path.startswith('virtual:') \ 

430 and not path.startswith('client:') \ 

431 and not os.path.isabs(path): 

432 return os.path.join(self._basepath, path) 

433 else: 

434 return path 

435 

436 def _log_statement(self, statement): 

437 logger.debug(statement) 

438 

439 def get_connection(self): 

440 return self._conn 

441 

442 def transaction(self, label='', mode='immediate'): 

443 current_transaction = self._transaction.get(tid()) 

444 if current_transaction and not current_transaction.closed: 

445 # Only one transaction can operate on the db at a time. 

446 # Synchronized in Transaction.begin and the Transaction context 

447 # manager, therefore we allow one Transaction object per thread. 

448 # The following exception is only to catch programming errors. 

449 raise error.SquirrelError( 

450 'Cannot start transaction "%s". A transaction "%s" is already ' 

451 'in progress.' % (label, self._transaction[tid()].label)) 

452 

453 self._transaction[tid()] = Transaction( 

454 self._conn, 

455 label=label, 

456 mode=mode, 

457 callback=self._notify_listeners) 

458 

459 return self._transaction[tid()] 

460 

461 def add_listener(self, listener): 

462 listener_ref = util.smart_weakref(listener) 

463 self._listeners.append(listener_ref) 

464 return listener_ref 

465 

466 def remove_listener(self, listener_ref): 

467 self._listeners.remove(listener_ref) 

468 

469 def _notify_listeners(self, event, *args): 

470 dead = [] 

471 for listener_ref in self._listeners: 

472 listener = listener_ref() 

473 if listener is not None: 

474 listener(event, *args) 

475 else: 

476 dead.append(listener_ref) 

477 

478 for listener_ref in dead: 

479 self.remove_listener(listener_ref) 

480 

481 def _register_table(self, s): 

482 m = re.search(r'(\S+)\s*\(([^)]+)\)', s) 

483 table_name = m.group(1) 

484 dtypes = m.group(2) 

485 table_header = [] 

486 for dele in dtypes.split(','): 

487 table_header.append(dele.split()[:2]) 

488 

489 self._tables[table_name] = table_header 

490 

491 return s 

492 

493 def optimize(self): 

494 logger.info('Optimizing database...') 

495 with self.transaction('optimize') as cursor: 

496 cursor.execute('''PRAGMA optimize''') 

497 logger.info('Done optimizing database.') 

498 

499 def activate_wal_mode(self): 

500 result = self.get_connection().execute( 

501 '''PRAGMA journal_mode''')[0][0] 

502 

503 if result == 'wal': 

504 logger.info('WAL mode already active.') 

505 return 

506 

507 logger.info('Activating WAL mode on database.') 

508 result = self.get_connection().execute( 

509 '''PRAGMA journal_mode=WAL''')[0][0] 

510 if result != 'wal': 

511 raise error.SquirrelError( 

512 'Could not activate WAL mode. Result from ' 

513 '"PRAGMA journal_mode=WAL" was "%s".' % result) 

514 

515 def _initialize_db(self): 

516 with self.transaction('initialize') as cursor: 

517 cursor.execute( 

518 '''PRAGMA recursive_triggers = true''') 

519 

520 cursor.execute( 

521 '''PRAGMA busy_timeout = 30000''') 

522 

523 cursor.execute( 

524 '''PRAGMA cache_size = -10000''') 

525 

526 if 2 == len(list( 

527 cursor.execute( 

528 ''' 

529 SELECT name FROM sqlite_master 

530 WHERE type = 'table' AND name IN ( 

531 'files', 

532 'persistent') 

533 '''))): 

534 

535 try: 

536 self.version = versiontuple(execute_get1( 

537 cursor, 

538 ''' 

539 SELECT value FROM settings 

540 WHERE key = "version" 

541 ''')[0]) 

542 except sqlite3.OperationalError: 

543 raise error.SquirrelError( 

544 'Squirrel database in pre-release format found: %s\n' 

545 'Please remove the database file and reindex.' 

546 % self._database_path) 

547 

548 if self.version >= (1, 1, 0): 

549 raise error.SquirrelError( 

550 'Squirrel database "%s" is of version %i.%i.%i which ' 

551 'is not supported by this version of Pyrocko. Please ' 

552 'upgrade the Pyrocko library.' 

553 % ((self._database_path, ) + self.version)) 

554 

555 return 

556 

557 cursor.execute(self._register_table( 

558 ''' 

559 CREATE TABLE IF NOT EXISTS settings ( 

560 key text PRIMARY KEY, 

561 value text) 

562 ''')) 

563 

564 cursor.execute( 

565 "INSERT OR IGNORE INTO settings VALUES ('version', '1.0')") 

566 

567 self.version = execute_get1( 

568 cursor, 

569 'SELECT value FROM settings WHERE key = "version"') 

570 

571 cursor.execute(self._register_table( 

572 ''' 

573 CREATE TABLE IF NOT EXISTS files ( 

574 file_id integer PRIMARY KEY, 

575 path text, 

576 format text, 

577 mtime float, 

578 size integer) 

579 ''')) 

580 

581 cursor.execute( 

582 ''' 

583 CREATE UNIQUE INDEX IF NOT EXISTS index_files_path 

584 ON files (path) 

585 ''') 

586 

587 cursor.execute(self._register_table( 

588 ''' 

589 CREATE TABLE IF NOT EXISTS nuts ( 

590 nut_id integer PRIMARY KEY AUTOINCREMENT, 

591 file_id integer, 

592 file_segment integer, 

593 file_element integer, 

594 kind_id integer, 

595 kind_codes_id integer, 

596 tmin_seconds integer, 

597 tmin_offset integer, 

598 tmax_seconds integer, 

599 tmax_offset integer, 

600 kscale integer) 

601 ''')) 

602 

603 cursor.execute( 

604 ''' 

605 CREATE UNIQUE INDEX IF NOT EXISTS index_nuts_file_element 

606 ON nuts (file_id, file_segment, file_element) 

607 ''') 

608 

609 cursor.execute(self._register_table( 

610 ''' 

611 CREATE TABLE IF NOT EXISTS kind_codes ( 

612 kind_codes_id integer PRIMARY KEY, 

613 kind_id integer, 

614 codes text, 

615 deltat float) 

616 ''')) 

617 

618 cursor.execute( 

619 ''' 

620 CREATE UNIQUE INDEX IF NOT EXISTS index_kind_codes 

621 ON kind_codes (kind_id, codes, deltat) 

622 ''') 

623 

624 cursor.execute(self._register_table( 

625 ''' 

626 CREATE TABLE IF NOT EXISTS kind_codes_count ( 

627 kind_codes_id integer PRIMARY KEY, 

628 count integer) 

629 ''')) 

630 

631 cursor.execute( 

632 ''' 

633 CREATE INDEX IF NOT EXISTS index_nuts_file_id 

634 ON nuts (file_id) 

635 ''') 

636 

637 cursor.execute( 

638 ''' 

639 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_delete_file 

640 BEFORE DELETE ON files FOR EACH ROW 

641 BEGIN 

642 DELETE FROM nuts where file_id = old.file_id; 

643 END 

644 ''') 

645 

646 # trigger only on size to make silent update of mtime possible 

647 cursor.execute( 

648 ''' 

649 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_update_file 

650 BEFORE UPDATE OF size ON files FOR EACH ROW 

651 BEGIN 

652 DELETE FROM nuts where file_id = old.file_id; 

653 END 

654 ''') 

655 

656 cursor.execute( 

657 ''' 

658 CREATE TRIGGER IF NOT EXISTS increment_kind_codes 

659 BEFORE INSERT ON nuts FOR EACH ROW 

660 BEGIN 

661 INSERT OR IGNORE INTO kind_codes_count 

662 VALUES (new.kind_codes_id, 0); 

663 UPDATE kind_codes_count 

664 SET count = count + 1 

665 WHERE new.kind_codes_id = kind_codes_id; 

666 END 

667 ''') 

668 

669 cursor.execute( 

670 ''' 

671 CREATE TRIGGER IF NOT EXISTS decrement_kind_codes 

672 BEFORE DELETE ON nuts FOR EACH ROW 

673 BEGIN 

674 UPDATE kind_codes_count 

675 SET count = count - 1 

676 WHERE old.kind_codes_id = kind_codes_id; 

677 END 

678 ''') 

679 

680 cursor.execute(self._register_table( 

681 ''' 

682 CREATE TABLE IF NOT EXISTS persistent ( 

683 name text UNIQUE) 

684 ''')) 

685 

686 def dig(self, nuts, transaction=None): 

687 ''' 

688 Store or update content meta-information. 

689 

690 Given ``nuts`` are assumed to represent an up-to-date and complete 

691 inventory of a set of files. Any old information about these files is 

692 first pruned from the database (via database triggers). If such content 

693 is part of a live selection, it is also removed there. Then the new 

694 content meta-information is inserted into the main database. The 

695 content is not automatically inserted into the live selections again. 

696 It is in the responsibility of the selection object to perform this 

697 step. 

698 ''' 

699 

700 nuts = list(nuts) 

701 

702 if not nuts: 

703 return 

704 

705 files = set() 

706 kind_codes = set() 

707 for nut in nuts: 

708 files.add(( 

709 self.relpath(nut.file_path), 

710 nut.file_format, 

711 nut.file_mtime, 

712 nut.file_size)) 

713 kind_codes.add( 

714 (nut.kind_id, nut.codes.safe_str, nut.deltat or 0.0)) 

715 

716 with (transaction or self.transaction('dig')) as c: 

717 

718 c.executemany( 

719 'INSERT OR IGNORE INTO files VALUES (NULL,?,?,?,?)', files) 

720 

721 c.executemany( 

722 '''UPDATE files SET 

723 format = ?, mtime = ?, size = ? 

724 WHERE path = ? 

725 ''', 

726 ((x[1], x[2], x[3], x[0]) for x in files)) 

727 

728 c.executemany( 

729 'INSERT OR IGNORE INTO kind_codes VALUES (NULL,?,?,?)', 

730 kind_codes) 

731 

732 c.executemany( 

733 ''' 

734 INSERT INTO nuts VALUES 

735 (NULL, ( 

736 SELECT file_id FROM files 

737 WHERE path = ? 

738 ),?,?,?, 

739 ( 

740 SELECT kind_codes_id FROM kind_codes 

741 WHERE kind_id = ? AND codes = ? AND deltat = ? 

742 ), ?,?,?,?,?) 

743 ''', 

744 ((self.relpath(nut.file_path), 

745 nut.file_segment, nut.file_element, 

746 nut.kind_id, 

747 nut.kind_id, nut.codes.safe_str, nut.deltat or 0.0, 

748 nut.tmin_seconds, nut.tmin_offset, 

749 nut.tmax_seconds, nut.tmax_offset, 

750 nut.kscale) for nut in nuts)) 

751 

752 def undig(self, path, segment=None): 

753 

754 path = self.relpath(abspath(path)) 

755 

756 sql = ''' 

757 SELECT 

758 files.path, 

759 files.format, 

760 files.mtime, 

761 files.size, 

762 nuts.file_segment, 

763 nuts.file_element, 

764 kind_codes.kind_id, 

765 kind_codes.codes, 

766 nuts.tmin_seconds, 

767 nuts.tmin_offset, 

768 nuts.tmax_seconds, 

769 nuts.tmax_offset, 

770 kind_codes.deltat 

771 FROM files 

772 INNER JOIN nuts ON files.file_id = nuts.file_id 

773 INNER JOIN kind_codes 

774 ON nuts.kind_codes_id = kind_codes.kind_codes_id 

775 WHERE files.path = ? 

776 ''' 

777 args = [path] 

778 if segment is not None: 

779 sql += ' AND nuts.file_segment = ?' 

780 args.append(segment) 

781 

782 return [ 

783 Nut(values_nocheck=(self.abspath(row[0]),) + row[1:]) 

784 for row in self._conn.execute(sql, args)] 

785 

786 def undig_all(self): 

787 sql = ''' 

788 SELECT 

789 files.path, 

790 files.format, 

791 files.mtime, 

792 files.size, 

793 nuts.file_segment, 

794 nuts.file_element, 

795 kind_codes.kind_id, 

796 kind_codes.codes, 

797 nuts.tmin_seconds, 

798 nuts.tmin_offset, 

799 nuts.tmax_seconds, 

800 nuts.tmax_offset, 

801 kind_codes.deltat 

802 FROM files 

803 INNER JOIN nuts ON files.file_id = nuts.file_id 

804 INNER JOIN kind_codes 

805 ON nuts.kind_codes_id = kind_codes.kind_codes_id 

806 ''' 

807 

808 nuts = [] 

809 path = None 

810 for values in self._conn.execute(sql): 

811 if path is not None and values[0] != path: 

812 yield path, nuts 

813 nuts = [] 

814 

815 path = self.abspath(values[0]) 

816 

817 if values[1] is not None: 

818 nuts.append(Nut(values_nocheck=(path,) + values[1:])) 

819 

820 if path is not None: 

821 yield path, nuts 

822 

823 def undig_few(self, paths, format='detect', segment=None): 

824 for path in paths: 

825 nuts = self.undig(path, segment=segment) 

826 if nuts: 

827 yield (nuts[0].file_format, path), nuts 

828 else: 

829 yield (format, path), [] 

830 

831 def undig_many(self, paths, show_progress=True): 

832 selection = self.new_selection(paths, show_progress=show_progress) 

833 

834 for (_, path), nuts in selection.undig_grouped(): 

835 yield path, nuts 

836 

837 del selection 

838 

839 def new_selection(self, paths=None, format='detect', show_progress=True): 

840 from .selection import Selection 

841 selection = Selection(self) 

842 if paths: 

843 selection.add(paths, format=format, show_progress=show_progress) 

844 return selection 

845 

846 def undig_content(self, nut): 

847 return None 

848 

849 def remove(self, path): 

850 ''' 

851 Prune content meta-information about a given file. 

852 

853 All content pieces belonging to file ``path`` are removed from the 

854 main database and any attached live selections (via database triggers). 

855 ''' 

856 

857 path = self.relpath(abspath(path)) 

858 

859 with self.transaction('remove file') as cursor: 

860 cursor.execute( 

861 'DELETE FROM files WHERE path = ?', (path,)) 

862 

863 def remove_glob(self, pattern): 

864 ''' 

865 Prune content meta-information about files matching given pattern. 

866 

867 All content pieces belonging to files who's pathes match the given 

868 ``pattern`` are removed from the main database and any attached live 

869 selections (via database triggers). 

870 ''' 

871 

872 with self.transaction('remove file glob') as cursor: 

873 return cursor.execute( 

874 'DELETE FROM files WHERE path GLOB ?', (pattern,)).rowcount 

875 

876 def _remove_volatile(self): 

877 ''' 

878 Prune leftover volatile content from database. 

879 

880 If the cleanup handler of an attached selection is not called, e.g. due 

881 to a crash or terminated process, volatile content will not be removed 

882 properly. This method will delete such leftover entries. 

883 

884 This is a mainenance operatation which should only be called when no 

885 apps are using the database because it would remove volatile content 

886 currently used by the apps. 

887 ''' 

888 

889 with self.transaction('remove volatile') as cursor: 

890 return cursor.execute( 

891 ''' 

892 DELETE FROM files 

893 WHERE path LIKE 'virtual:volatile:%' 

894 ''').rowcount 

895 

896 def reset(self, path, transaction=None): 

897 ''' 

898 Prune information associated with a given file, but keep the file path. 

899 

900 This method is called when reading a file failed. File attributes, 

901 format, size and modification time are set to NULL. File content 

902 meta-information is removed from the database and any attached live 

903 selections (via database triggers). 

904 ''' 

905 

906 path = self.relpath(abspath(path)) 

907 

908 with (transaction or self.transaction('reset file')) as cursor: 

909 cursor.execute( 

910 ''' 

911 UPDATE files SET 

912 format = NULL, 

913 mtime = NULL, 

914 size = NULL 

915 WHERE path = ? 

916 ''', (path,)) 

917 

918 def silent_touch(self, path): 

919 ''' 

920 Update modification time of file without initiating reindexing. 

921 

922 Useful to prolong validity period of data with expiration date. 

923 ''' 

924 

925 apath = abspath(path) 

926 path = self.relpath(apath) 

927 

928 with self.transaction('silent touch') as cursor: 

929 

930 sql = 'SELECT format, size FROM files WHERE path = ?' 

931 fmt, size = execute_get1(cursor, sql, (path,)) 

932 

933 mod = io.get_backend(fmt) 

934 mod.touch(apath) 

935 file_stats = mod.get_stats(apath) 

936 

937 if file_stats[1] != size: 

938 raise FileLoadError( 

939 'Silent update for file "%s" failed: size has changed.' 

940 % apath) 

941 

942 sql = ''' 

943 UPDATE files 

944 SET mtime = ? 

945 WHERE path = ? 

946 ''' 

947 cursor.execute(sql, (file_stats[0], path)) 

948 

949 def _iter_codes_info( 

950 self, kind=None, codes=None, kind_codes_count='kind_codes_count'): 

951 

952 args = [] 

953 sel = '' 

954 if kind is not None: 

955 kind_id = to_kind_id(kind) 

956 

957 sel = 'AND kind_codes.kind_id = ?' 

958 args.append(to_kind_id(kind)) 

959 

960 if codes is not None: 

961 assert kind is not None # TODO supp by recursing possible kinds 

962 kind_id = to_kind_id(kind) 

963 pats = codes_patterns_for_kind(kind_id, codes) 

964 

965 if pats: 

966 # could optimize this by using IN for non-patterns 

967 sel += ' AND ( %s ) ' % ' OR '.join( 

968 ('kind_codes.codes GLOB ?',) * len(pats)) 

969 

970 args.extend(pat.safe_str for pat in pats) 

971 

972 sql = (''' 

973 SELECT 

974 kind_codes.kind_id, 

975 kind_codes.codes, 

976 kind_codes.deltat, 

977 kind_codes.kind_codes_id, 

978 %(kind_codes_count)s.count 

979 FROM %(kind_codes_count)s 

980 INNER JOIN kind_codes 

981 ON %(kind_codes_count)s.kind_codes_id 

982 = kind_codes.kind_codes_id 

983 WHERE %(kind_codes_count)s.count > 0 

984 ''' + sel + ''' 

985 ''') % {'kind_codes_count': kind_codes_count} 

986 

987 for kind_id, scodes, deltat, kcid, count in self._conn.execute( 

988 sql, args): 

989 

990 yield ( 

991 kind_id, to_codes_simple(kind_id, scodes), deltat, kcid, count) 

992 

993 def _iter_deltats(self, kind=None, kind_codes_count='kind_codes_count'): 

994 args = [] 

995 sel = '' 

996 if kind is not None: 

997 assert isinstance(kind, str) 

998 sel = 'AND kind_codes.kind_id = ?' 

999 args.append(to_kind_id(kind)) 

1000 

1001 sql = (''' 

1002 SELECT DISTINCT kind_codes.deltat FROM %(kind_codes_count)s 

1003 INNER JOIN kind_codes 

1004 ON %(kind_codes_count)s.kind_codes_id 

1005 = kind_codes.kind_codes_id 

1006 WHERE %(kind_codes_count)s.count > 0 

1007 ''' + sel + ''' 

1008 ORDER BY kind_codes.deltat 

1009 ''') % {'kind_codes_count': kind_codes_count} 

1010 

1011 for row in self._conn.execute(sql, args): 

1012 yield row[0] 

1013 

1014 def _iter_codes(self, kind=None, kind_codes_count='kind_codes_count'): 

1015 args = [] 

1016 sel = '' 

1017 if kind is not None: 

1018 assert isinstance(kind, str) 

1019 sel = 'AND kind_codes.kind_id = ?' 

1020 args.append(to_kind_id(kind)) 

1021 

1022 sql = (''' 

1023 SELECT DISTINCT kind_codes.kind_id, kind_codes.codes 

1024 FROM %(kind_codes_count)s 

1025 INNER JOIN kind_codes 

1026 ON %(kind_codes_count)s.kind_codes_id 

1027 = kind_codes.kind_codes_id 

1028 WHERE %(kind_codes_count)s.count > 0 

1029 ''' + sel + ''' 

1030 ORDER BY kind_codes.codes 

1031 

1032 ''') % dict(kind_codes_count=kind_codes_count) 

1033 

1034 for row in self._conn.execute(sql, args): 

1035 yield to_codes_simple(*row) 

1036 

1037 def _iter_kinds(self, codes=None, kind_codes_count='kind_codes_count'): 

1038 args = [] 

1039 sel = '' 

1040 if codes is not None: 

1041 sel = 'AND kind_codes.codes = ?' 

1042 args.append(codes.safe_str) 

1043 

1044 sql = (''' 

1045 SELECT DISTINCT kind_codes.kind_id FROM %(kind_codes_count)s 

1046 INNER JOIN kind_codes 

1047 ON %(kind_codes_count)s.kind_codes_id 

1048 = kind_codes.kind_codes_id 

1049 WHERE %(kind_codes_count)s.count > 0 

1050 ''' + sel + ''' 

1051 ORDER BY kind_codes.kind_id 

1052 ''') % {'kind_codes_count': kind_codes_count} 

1053 

1054 for row in self._conn.execute(sql, args): 

1055 yield to_kind(row[0]) 

1056 

1057 def iter_paths(self): 

1058 for row in self._conn.execute('''SELECT path FROM files'''): 

1059 yield self.abspath(row[0]) 

1060 

1061 def iter_nnuts_by_file(self): 

1062 sql = ''' 

1063 SELECT 

1064 path, 

1065 (SELECT COUNT(*) FROM nuts WHERE nuts.file_id = files.file_id) 

1066 FROM files 

1067 ''' 

1068 for row in self._conn.execute(sql): 

1069 yield (self.abspath(row[0]),) + row[1:] 

1070 

1071 def iter_kinds(self, codes=None): 

1072 return self._iter_kinds(codes=codes) 

1073 

1074 def iter_codes(self, kind=None): 

1075 return self._iter_codes(kind=kind) 

1076 

1077 def get_paths(self): 

1078 return list(self.iter_paths()) 

1079 

1080 def get_kinds(self, codes=None): 

1081 return list(self.iter_kinds(codes=codes)) 

1082 

1083 def get_codes(self, kind=None): 

1084 return list(self.iter_codes(kind=kind)) 

1085 

1086 def get_counts(self, kind=None): 

1087 d = {} 

1088 for kind_id, codes, _, _, count in self._iter_codes_info(kind=kind): 

1089 if kind_id not in d: 

1090 v = d[kind_id] = {} 

1091 else: 

1092 v = d[kind_id] 

1093 

1094 if codes not in v: 

1095 v[codes] = 0 

1096 

1097 v[codes] += count 

1098 

1099 if kind is not None: 

1100 return d[to_kind_id(kind)] 

1101 else: 

1102 return dict((to_kind(kind_id), v) for (kind_id, v) in d.items()) 

1103 

1104 def get_nfiles(self): 

1105 sql = '''SELECT COUNT(*) FROM files''' 

1106 for row in self._conn.execute(sql): 

1107 return row[0] 

1108 

1109 def get_nnuts(self): 

1110 sql = '''SELECT COUNT(*) FROM nuts''' 

1111 for row in self._conn.execute(sql): 

1112 return row[0] 

1113 

1114 def get_nnuts_by_file(self): 

1115 return list(self.iter_nnuts_by_file()) 

1116 

1117 def get_total_size(self): 

1118 sql = ''' 

1119 SELECT SUM(files.size) FROM files 

1120 ''' 

1121 

1122 for row in self._conn.execute(sql): 

1123 return row[0] or 0 

1124 

1125 def get_persistent_names(self): 

1126 sql = ''' 

1127 SELECT name FROM persistent 

1128 ''' 

1129 return [row[0] for row in self._conn.execute(sql)] 

1130 

1131 def vacuum(self): 

1132 

1133 logger.info('Vacuuming database...') 

1134 sql = ''' 

1135 VACUUM 

1136 ''' 

1137 self._conn.execute(sql) 

1138 logger.info('Done vacuuming database.') 

1139 

1140 def get_stats(self): 

1141 return DatabaseStats( 

1142 nfiles=self.get_nfiles(), 

1143 nnuts=self.get_nnuts(), 

1144 kinds=self.get_kinds(), 

1145 codes=self.get_codes(), 

1146 counts=self.get_counts(), 

1147 total_size=self.get_total_size(), 

1148 persistent=self.get_persistent_names()) 

1149 

1150 def __str__(self): 

1151 return str(self.get_stats()) 

1152 

1153 def print_tables(self, stream=None): 

1154 for table in [ 

1155 'persistent', 

1156 'files', 

1157 'nuts', 

1158 'kind_codes', 

1159 'kind_codes_count']: 

1160 

1161 self.print_table(table, stream=stream) 

1162 

1163 def print_table(self, name, stream=None): 

1164 

1165 if stream is None: 

1166 stream = sys.stdout 

1167 

1168 class hstr(str): 

1169 def __repr__(self): 

1170 return self 

1171 

1172 w = stream.write 

1173 w('\n') 

1174 w('\n') 

1175 w(name) 

1176 w('\n') 

1177 sql = 'SELECT * FROM %s' % name 

1178 tab = [] 

1179 if name in self._tables: 

1180 headers = self._tables[name] 

1181 tab.append([None for _ in headers]) 

1182 tab.append([hstr(x[0]) for x in headers]) 

1183 tab.append([hstr(x[1]) for x in headers]) 

1184 tab.append([None for _ in headers]) 

1185 

1186 for row in self._conn.execute(sql): 

1187 tab.append([x for x in row]) 

1188 

1189 widths = [ 

1190 max((len(repr(x)) if x is not None else 0) for x in col) 

1191 for col in zip(*tab)] 

1192 

1193 for row in tab: 

1194 w(' '.join( 

1195 (repr(x).ljust(wid) if x is not None else ''.ljust(wid, '-')) 

1196 for (x, wid) in zip(row, widths))) 

1197 

1198 w('\n') 

1199 

1200 w('\n') 

1201 

1202 

1203class DatabaseStats(Object): 

1204 ''' 

1205 Container to hold statistics about contents cached in meta-information db. 

1206 ''' 

1207 

1208 nfiles = Int.T( 

1209 help='Number of files in database.') 

1210 nnuts = Int.T( 

1211 help='Number of index nuts in database.') 

1212 codes = List.T( 

1213 Tuple.T(content_t=String.T()), 

1214 help='Available code sequences in database, e.g. ' 

1215 '(agency, network, station, location) for stations nuts.') 

1216 kinds = List.T( 

1217 String.T(), 

1218 help='Available content types in database.') 

1219 total_size = Int.T( 

1220 help='Aggregated file size [bytes] of files referenced in database.') 

1221 counts = Dict.T( 

1222 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()), 

1223 help='Breakdown of how many nuts of any content type and code ' 

1224 'sequence are available in database, ``counts[kind][codes]``.') 

1225 persistent = List.T( 

1226 String.T(), 

1227 help='Names of persistent selections stored in database.') 

1228 

1229 def __str__(self): 

1230 kind_counts = dict( 

1231 (kind, sum(self.counts[kind].values())) for kind in self.kinds) 

1232 

1233 codes = [c.safe_str for c in self.codes] 

1234 

1235 if len(codes) > 20: 

1236 scodes = '\n' + util.ewrap(codes[:10], indent=' ') \ 

1237 + '\n [%i more]\n' % (len(codes) - 20) \ 

1238 + util.ewrap(codes[-10:], indent=' ') 

1239 else: 

1240 scodes = '\n' + util.ewrap(codes, indent=' ') \ 

1241 if codes else '<none>' 

1242 

1243 s = ''' 

1244Available codes: %s 

1245Number of files: %i 

1246Total size of known files: %s 

1247Number of index nuts: %i 

1248Available content kinds: %s 

1249Persistent selections: %s''' % ( 

1250 scodes, 

1251 self.nfiles, 

1252 util.human_bytesize(self.total_size), 

1253 self.nnuts, 

1254 ', '.join('%s: %i' % ( 

1255 kind, kind_counts[kind]) for kind in sorted(self.kinds)), 

1256 ', '.join(self.persistent)) 

1257 

1258 return s 

1259 

1260 

1261__all__ = [ 

1262 'Database', 

1263 'DatabaseStats', 

1264]