Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/io/base.py: 97%

188 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-12-04 10:41 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel core file reading and indexing. 

8''' 

9 

10import time 

11import logging 

12import threading 

13 

14from pyrocko import util 

15from pyrocko.io.io_common import FileLoadError 

16from pyrocko import progress 

17 

18from .backends import \ 

19 mseed, sac, hdf5_optodas, datacube, stationxml, textfiles, virtual, yaml, \ 

20 tdms_idas, spickle 

21 

22from ..model import to_kind_ids, EMPTY, Nut 

23from ..database import color_tid_pid 

24 

25backend_modules = [ 

26 mseed, sac, hdf5_optodas, datacube, stationxml, textfiles, virtual, yaml, 

27 tdms_idas, spickle] 

28 

29logger = logging.getLogger('psq.io') 

30 

31 

32def tid(): 

33 return threading.get_ident() 

34 

35 

36def make_task(*args): 

37 return progress.task(*args, logger=logger) 

38 

39 

40def update_format_providers(): 

41 '''Update global mapping from file format to io backend module.''' 

42 

43 global g_format_providers 

44 g_format_providers = {} 

45 for mod in backend_modules: 

46 for format in mod.provided_formats(): 

47 if format not in g_format_providers: 

48 g_format_providers[format] = [] 

49 

50 g_format_providers[format].append(mod) 

51 

52 

53g_format_providers = {} 

54update_format_providers() 

55 

56 

57class FormatDetectionFailed(FileLoadError): 

58 ''' 

59 Exception raised when file format detection fails. 

60 ''' 

61 

62 def __init__(self, path): 

63 FileLoadError.__init__( 

64 self, 'format detection failed for file: %s' % path) 

65 

66 

67class UnknownFormat(Exception): 

68 ''' 

69 Exception raised when user requests an unknown file format. 

70 ''' 

71 

72 def __init__(self, format): 

73 Exception.__init__( 

74 self, 'unknown format: %s' % format) 

75 

76 

77def get_backend(fmt): 

78 ''' 

79 Get squirrel io backend module for a given file format. 

80 

81 :param fmt: 

82 Format identifier. 

83 :type fmt: 

84 str 

85 ''' 

86 

87 try: 

88 return g_format_providers[fmt][0] 

89 except KeyError: 

90 raise UnknownFormat(fmt) 

91 

92 

93def detect_format(path): 

94 ''' 

95 Determine file type from first 512 bytes. 

96 

97 :param path: 

98 Path to file. 

99 :type path: 

100 str 

101 ''' 

102 

103 if path.startswith('virtual:'): 

104 return 'virtual' 

105 

106 try: 

107 with open(path, 'rb') as f: 

108 data = f.read(512) 

109 

110 except (OSError, IOError): 

111 raise FormatDetectionFailed(path) 

112 

113 fmt = None 

114 for mod in backend_modules: 

115 fmt = mod.detect(data) 

116 if fmt is not None: 

117 return fmt 

118 

119 raise FormatDetectionFailed(path) 

120 

121 

122def supported_formats(): 

123 ''' 

124 Get list of file formats supported by Squirrel. 

125 ''' 

126 return sorted(g_format_providers.keys()) 

127 

128 

129g_content_kinds = ['waveform', 'station', 'channel', 'response', 'event'] 

130 

131 

132def supported_content_kinds(): 

133 ''' 

134 Get list of supported content kinds offered through Squirrel. 

135 ''' 

136 return g_content_kinds + ['waveform_promise'] 

137 

138 

139def iload( 

140 paths, 

141 segment=None, 

142 format='detect', 

143 database=None, 

144 check=True, 

145 skip_unchanged=False, 

146 content=g_content_kinds, 

147 show_progress=True, 

148 update_selection=None, 

149 transaction=None): 

150 

151 ''' 

152 Iteratively load content or index/reindex meta-information from files. 

153 

154 :param paths: 

155 Iterator yielding file names to load from or a Squirrel selection 

156 object providing the file names. 

157 :type paths: 

158 iterator yielding :py:class:`str` or 

159 :py:class:`~pyrocko.squirrel.selection.Selection` 

160 

161 :param segment: 

162 File-specific segment identifier (can only be used when loading from a 

163 single file). 

164 :type segment: 

165 int 

166 

167 :param format: 

168 File format identifier or ``'detect'`` for autodetection. When loading 

169 from a selection, per-file format assignation is taken from the hint in 

170 the selection and this flag is ignored. 

171 :type format: 

172 str 

173 

174 :param database: 

175 Database to use for meta-information caching. When loading from a 

176 selection, this should be ``None`` and the database from the selection 

177 is used. 

178 :type database: 

179 :py:class:`~pyrocko.squirrel.database.Database` 

180 

181 :param check: 

182 If ``True``, investigate modification time and file sizes of known 

183 files to debunk modified files (pessimistic mode), or ``False`` to 

184 deactivate checks (optimistic mode). 

185 :type check: 

186 bool 

187 

188 :param skip_unchanged: 

189 If ``True``, only yield index nuts for new / modified files. 

190 :type skip_unchanged: 

191 bool 

192 

193 :param content: 

194 Selection of content types to load. 

195 :type content: 

196 :py:class:`list` of :py:class:`str` 

197 

198 This generator yields :py:class:`~pyrocko.squirrel.model.Nut` objects for 

199 individual pieces of information found when reading the given files. Such a 

200 nut may represent a waveform, a station, a channel, an event or other data 

201 type. The nut itself only contains the meta-information. The actual content 

202 information is attached to the nut if requested. All nut meta-information 

203 is stored in the squirrel meta-information database. If possible, this 

204 function avoids accessing the actual disk files and provides the requested 

205 information straight from the database. Modified files are recognized and 

206 reindexed as needed. 

207 ''' 

208 

209 from ..selection import Selection 

210 

211 outer_transaction = transaction 

212 

213 n_db = 0 

214 n_load = 0 

215 selection = None 

216 kind_ids = to_kind_ids(content) 

217 

218 if isinstance(paths, str): 

219 paths = [paths] 

220 else: 

221 if segment is not None: 

222 raise TypeError( 

223 'iload: segment argument can only be used when loading from ' 

224 'a single file') 

225 

226 if isinstance(paths, Selection): 

227 selection = paths 

228 if database is not None: 

229 raise TypeError( 

230 'iload: database argument must be None when called with a ' 

231 'selection') 

232 

233 database = selection.get_database() 

234 

235 if skip_unchanged and not isinstance(paths, Selection): 

236 raise TypeError( 

237 'iload: need selection when called with "skip_unchanged=True"') 

238 

239 temp_selection = None 

240 transaction = None 

241 if database: 

242 if not selection: 

243 # Avoid creating temporary selection for small batches. 

244 # this is helpful because then, we can avoid locking the database, 

245 # e.g. during loading of content, when the content has not been 

246 # modified. 

247 paths = util.short_to_list(100, paths) 

248 if isinstance(paths, list) and len(paths) == 0: 

249 return 

250 

251 if not (isinstance(paths, list) and len(paths) < 100 

252 and not skip_unchanged): 

253 

254 temp_selection = database.new_selection( 

255 paths, show_progress=show_progress, format=format) 

256 

257 selection = temp_selection 

258 

259 if skip_unchanged: 

260 selection.flag_modified(check, transaction=outer_transaction) 

261 

262 if selection: 

263 it = selection.undig_grouped(skip_unchanged=skip_unchanged) 

264 else: 

265 # The list() causes the query to finish, so we don't have to lock, 

266 # and can start a transaction only when encountering a modified/new 

267 # file. 

268 it = list( 

269 database.undig_few(paths, format=format, segment=segment) 

270 ) 

271 

272 else: 

273 it = (((format, path), []) for path in paths) 

274 

275 it = util.short_to_list(100, iter(it)) 

276 

277 try: 

278 n_files_total = len(it) 

279 if n_files_total == 0: 

280 return 

281 

282 except TypeError: 

283 n_files_total = None 

284 

285 task = None 

286 if show_progress: 

287 if not kind_ids: 

288 task = make_task('Indexing files', n_files_total) 

289 else: 

290 task = make_task('Loading files', n_files_total) 

291 

292 n_files = 0 

293 tcommit = time.time() 

294 

295 clean = False 

296 try: 

297 for (format, path), old_nuts in it: 

298 if task is not None: 

299 condition = '(nuts: %i from file, %i from cache)\n %s' % ( 

300 n_load, n_db, path) 

301 task.update(n_files, condition) 

302 

303 n_files += 1 

304 if database and transaction: 

305 tnow = time.time() 

306 if tnow - tcommit > 20. or n_files % 200 == 0 \ 

307 and not outer_transaction: 

308 

309 transaction.commit() 

310 tcommit = tnow 

311 transaction.close() 

312 transaction = database.transaction( 

313 'update content index') 

314 

315 transaction.begin() 

316 

317 try: 

318 if check and old_nuts and old_nuts[0].file_modified(): 

319 old_nuts = [] 

320 modified = True 

321 else: 

322 modified = False 

323 

324 if segment is not None: 

325 old_nuts = [ 

326 nut for nut in old_nuts if nut.file_segment == segment] 

327 

328 if old_nuts: 

329 db_only_operation = not kind_ids or all( 

330 nut.kind_id in kind_ids and nut.content_in_db 

331 for nut in old_nuts) 

332 

333 if db_only_operation: 

334 # logger.debug('using cached information for file %s, ' 

335 # % path) 

336 

337 for nut in old_nuts: 

338 if nut.kind_id in kind_ids: 

339 database.undig_content(nut) 

340 

341 n_db += 1 

342 yield nut 

343 

344 continue 

345 

346 if format == 'detect': 

347 if old_nuts and not old_nuts[0].file_modified(): 

348 format_this = old_nuts[0].file_format 

349 else: 

350 format_this = detect_format(path) 

351 else: 

352 format_this = format 

353 

354 mod = get_backend(format_this) 

355 mtime, size = mod.get_stats(path) 

356 

357 if segment is not None: 

358 logger.debug( 

359 'Reading file "%s", segment "%s".' % (path, segment)) 

360 else: 

361 logger.debug( 

362 'Reading file "%s". %s' % (path, color_tid_pid())) 

363 

364 nuts = [] 

365 for nut in mod.iload(format_this, path, segment, content): 

366 nut.file_path = path 

367 nut.file_format = format_this 

368 nut.file_mtime = mtime 

369 nut.file_size = size 

370 if nut.content is not None: 

371 nut.content._squirrel_key = nut.key 

372 

373 nuts.append(nut) 

374 n_load += 1 

375 yield nut 

376 

377 if segment is None and len(nuts) == 0: 

378 nuts.append( 

379 Nut( 

380 file_path=path, 

381 file_format=format_this, 

382 file_mtime=mtime, 

383 file_size=size, 

384 kind_id=EMPTY)) 

385 

386 if database and nuts != old_nuts: 

387 if old_nuts or modified: 

388 logger.debug( 

389 'File has been modified since last access: %s' 

390 % path) 

391 

392 if segment is not None: 

393 nuts = list(mod.iload(format_this, path, None, [])) 

394 for nut in nuts: 

395 nut.file_path = path 

396 nut.file_format = format_this 

397 nut.file_mtime = mtime 

398 nut.file_size = size 

399 

400 if len(nuts) == 0: 

401 nuts.append( 

402 Nut( 

403 file_path=path, 

404 file_format=format_this, 

405 file_mtime=mtime, 

406 file_size=size, 

407 kind_id=EMPTY)) 

408 

409 if not transaction and not outer_transaction: 

410 transaction = database.transaction( 

411 'update content index') 

412 transaction.begin() 

413 

414 database.dig( 

415 nuts, transaction=(outer_transaction or transaction)) 

416 

417 if update_selection is not None: 

418 update_selection._set_file_states_force_check( 

419 [path], 

420 transaction=(outer_transaction or transaction)) 

421 

422 update_selection._update_nuts( 

423 transaction=(outer_transaction or transaction)) 

424 

425 except FileLoadError: 

426 logger.error('Cannot read file: %s' % path) 

427 if database: 

428 if not transaction and not outer_transaction: 

429 transaction = database.transaction( 

430 'update content index') 

431 transaction.begin() 

432 database.reset( 

433 path, transaction=(outer_transaction or transaction)) 

434 

435 clean = True 

436 

437 finally: 

438 if task is not None: 

439 condition = '(nuts: %i from file, %i from cache)' % (n_load, n_db) 

440 task.update(n_files, condition) 

441 if clean: 

442 task.done(condition) 

443 else: 

444 task.fail(condition + ' terminated') 

445 

446 if database and transaction: 

447 transaction.commit() 

448 transaction.close() 

449 

450 if temp_selection: 

451 del temp_selection 

452 

453 

454__all__ = [ 

455 'iload', 

456 'detect_format', 

457 'supported_formats', 

458 'supported_content_kinds', 

459 'get_backend', 

460 'FormatDetectionFailed', 

461 'UnknownFormat', 

462]