Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/io/base.py: 94%

187 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-02-27 10:58 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel core file reading and indexing. 

8''' 

9 

10import time 

11import logging 

12 

13from pyrocko import util 

14from pyrocko.io.io_common import FileLoadError 

15from pyrocko import progress 

16 

17from .backends import \ 

18 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas, \ 

19 spickle 

20 

21from ..model import to_kind_ids, EMPTY, Nut 

22 

23backend_modules = [ 

24 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas, 

25 spickle] 

26 

27 

28logger = logging.getLogger('psq.io') 

29 

30 

31def make_task(*args): 

32 return progress.task(*args, logger=logger) 

33 

34 

35def update_format_providers(): 

36 '''Update global mapping from file format to io backend module.''' 

37 

38 global g_format_providers 

39 g_format_providers = {} 

40 for mod in backend_modules: 

41 for format in mod.provided_formats(): 

42 if format not in g_format_providers: 

43 g_format_providers[format] = [] 

44 

45 g_format_providers[format].append(mod) 

46 

47 

48g_format_providers = {} 

49update_format_providers() 

50 

51 

52class FormatDetectionFailed(FileLoadError): 

53 ''' 

54 Exception raised when file format detection fails. 

55 ''' 

56 

57 def __init__(self, path): 

58 FileLoadError.__init__( 

59 self, 'format detection failed for file: %s' % path) 

60 

61 

62class UnknownFormat(Exception): 

63 ''' 

64 Exception raised when user requests an unknown file format. 

65 ''' 

66 

67 def __init__(self, format): 

68 Exception.__init__( 

69 self, 'unknown format: %s' % format) 

70 

71 

72def get_backend(fmt): 

73 ''' 

74 Get squirrel io backend module for a given file format. 

75 

76 :param fmt: 

77 Format identifier. 

78 :type fmt: 

79 str 

80 ''' 

81 

82 try: 

83 return g_format_providers[fmt][0] 

84 except KeyError: 

85 raise UnknownFormat(fmt) 

86 

87 

88def detect_format(path): 

89 ''' 

90 Determine file type from first 512 bytes. 

91 

92 :param path: 

93 Path to file. 

94 :type path: 

95 str 

96 ''' 

97 

98 if path.startswith('virtual:'): 

99 return 'virtual' 

100 

101 try: 

102 with open(path, 'rb') as f: 

103 data = f.read(512) 

104 

105 except (OSError, IOError): 

106 raise FormatDetectionFailed(path) 

107 

108 fmt = None 

109 for mod in backend_modules: 

110 fmt = mod.detect(data) 

111 if fmt is not None: 

112 return fmt 

113 

114 raise FormatDetectionFailed(path) 

115 

116 

117def supported_formats(): 

118 ''' 

119 Get list of file formats supported by Squirrel. 

120 ''' 

121 return sorted(g_format_providers.keys()) 

122 

123 

124g_content_kinds = ['waveform', 'station', 'channel', 'response', 'event'] 

125 

126 

127def supported_content_kinds(): 

128 ''' 

129 Get list of supported content kinds offered through Squirrel. 

130 ''' 

131 return g_content_kinds + ['waveform_promise'] 

132 

133 

134def iload( 

135 paths, 

136 segment=None, 

137 format='detect', 

138 database=None, 

139 check=True, 

140 skip_unchanged=False, 

141 content=g_content_kinds, 

142 show_progress=True, 

143 update_selection=None): 

144 

145 ''' 

146 Iteratively load content or index/reindex meta-information from files. 

147 

148 :param paths: 

149 Iterator yielding file names to load from or a Squirrel selection 

150 object providing the file names. 

151 :type paths: 

152 iterator yielding :py:class:`str` or 

153 :py:class:`~pyrocko.squirrel.selection.Selection` 

154 

155 :param segment: 

156 File-specific segment identifier (can only be used when loading from a 

157 single file). 

158 :type segment: 

159 int 

160 

161 :param format: 

162 File format identifier or ``'detect'`` for autodetection. When loading 

163 from a selection, per-file format assignation is taken from the hint in 

164 the selection and this flag is ignored. 

165 :type format: 

166 str 

167 

168 :param database: 

169 Database to use for meta-information caching. When loading from a 

170 selection, this should be ``None`` and the database from the selection 

171 is used. 

172 :type database: 

173 :py:class:`~pyrocko.squirrel.database.Database` 

174 

175 :param check: 

176 If ``True``, investigate modification time and file sizes of known 

177 files to debunk modified files (pessimistic mode), or ``False`` to 

178 deactivate checks (optimistic mode). 

179 :type check: 

180 bool 

181 

182 :param skip_unchanged: 

183 If ``True``, only yield index nuts for new / modified files. 

184 :type skip_unchanged: 

185 bool 

186 

187 :param content: 

188 Selection of content types to load. 

189 :type content: 

190 :py:class:`list` of :py:class:`str` 

191 

192 This generator yields :py:class:`~pyrocko.squirrel.model.Nut` objects for 

193 individual pieces of information found when reading the given files. Such a 

194 nut may represent a waveform, a station, a channel, an event or other data 

195 type. The nut itself only contains the meta-information. The actual content 

196 information is attached to the nut if requested. All nut meta-information 

197 is stored in the squirrel meta-information database. If possible, this 

198 function avoids accessing the actual disk files and provides the requested 

199 information straight from the database. Modified files are recognized and 

200 reindexed as needed. 

201 ''' 

202 

203 from ..selection import Selection 

204 

205 n_db = 0 

206 n_load = 0 

207 selection = None 

208 kind_ids = to_kind_ids(content) 

209 

210 if isinstance(paths, str): 

211 paths = [paths] 

212 else: 

213 if segment is not None: 

214 raise TypeError( 

215 'iload: segment argument can only be used when loading from ' 

216 'a single file') 

217 

218 if isinstance(paths, Selection): 

219 selection = paths 

220 if database is not None: 

221 raise TypeError( 

222 'iload: database argument must be None when called with a ' 

223 'selection') 

224 

225 database = selection.get_database() 

226 

227 if skip_unchanged and not isinstance(paths, Selection): 

228 raise TypeError( 

229 'iload: need selection when called with "skip_unchanged=True"') 

230 

231 temp_selection = None 

232 transaction = None 

233 if database: 

234 if not selection: 

235 # Avoid creating temporary selection for small batches. 

236 # this is helpful because then, we can avoid locking the database, 

237 # e.g. during loading of content, when the content has not been 

238 # modified. 

239 paths = util.short_to_list(100, paths) 

240 if isinstance(paths, list) and len(paths) == 0: 

241 return 

242 

243 if not (isinstance(paths, list) and len(paths) < 100 

244 and not skip_unchanged): 

245 

246 temp_selection = database.new_selection( 

247 paths, show_progress=show_progress, format=format) 

248 

249 selection = temp_selection 

250 

251 if skip_unchanged: 

252 selection.flag_modified(check) 

253 

254 if selection: 

255 # undig_grouped starts a long select which causes deadlocks 

256 # when transaction is started after starting the select, therefore 

257 # the transaction has to be started before in these cases. 

258 # The db will be locked for a long time in this case. This could be 

259 # solved either by breaking the indexing into smaller blocks in 

260 # the caller or by modifying undig_grouped to allow limit and 

261 # offset and add an outer loop below. 

262 transaction = database.transaction( 

263 'update content index') 

264 transaction.begin() 

265 it = selection.undig_grouped(skip_unchanged=skip_unchanged) 

266 else: 

267 # The list() causes the query to finish, so we don't have to lock, 

268 # and can start a transaction only when encountering a modified/new 

269 # file. 

270 it = list(database.undig_few(paths, format=format)) 

271 

272 else: 

273 it = (((format, path), []) for path in paths) 

274 

275 it = util.short_to_list(100, iter(it)) 

276 

277 try: 

278 n_files_total = len(it) 

279 if n_files_total == 0: 

280 if transaction: 

281 transaction.commit() 

282 transaction.close() 

283 return 

284 

285 except TypeError: 

286 n_files_total = None 

287 

288 task = None 

289 if show_progress: 

290 if not kind_ids: 

291 task = make_task('Indexing files', n_files_total) 

292 else: 

293 task = make_task('Loading files', n_files_total) 

294 

295 n_files = 0 

296 tcommit = time.time() 

297 

298 clean = False 

299 try: 

300 for (format, path), old_nuts in it: 

301 if task is not None: 

302 condition = '(nuts: %i from file, %i from cache)\n %s' % ( 

303 n_load, n_db, path) 

304 task.update(n_files, condition) 

305 

306 n_files += 1 

307 # cannot release when iterating a selection (see above) 

308 if database and transaction and not selection: 

309 tnow = time.time() 

310 if tnow - tcommit > 20. or n_files % 1000 == 0: 

311 transaction.commit() 

312 tcommit = tnow 

313 transaction.begin() 

314 

315 try: 

316 if check and old_nuts and old_nuts[0].file_modified(): 

317 old_nuts = [] 

318 modified = True 

319 else: 

320 modified = False 

321 

322 if segment is not None: 

323 old_nuts = [ 

324 nut for nut in old_nuts if nut.file_segment == segment] 

325 

326 if old_nuts: 

327 db_only_operation = not kind_ids or all( 

328 nut.kind_id in kind_ids and nut.content_in_db 

329 for nut in old_nuts) 

330 

331 if db_only_operation: 

332 # logger.debug('using cached information for file %s, ' 

333 # % path) 

334 

335 for nut in old_nuts: 

336 if nut.kind_id in kind_ids: 

337 database.undig_content(nut) 

338 

339 n_db += 1 

340 yield nut 

341 

342 continue 

343 

344 if format == 'detect': 

345 if old_nuts and not old_nuts[0].file_modified(): 

346 format_this = old_nuts[0].file_format 

347 else: 

348 format_this = detect_format(path) 

349 else: 

350 format_this = format 

351 

352 mod = get_backend(format_this) 

353 mtime, size = mod.get_stats(path) 

354 

355 if segment is not None: 

356 logger.debug( 

357 'Reading file "%s", segment "%s".' % (path, segment)) 

358 else: 

359 logger.debug( 

360 'Reading file "%s".' % path) 

361 

362 nuts = [] 

363 for nut in mod.iload(format_this, path, segment, content): 

364 nut.file_path = path 

365 nut.file_format = format_this 

366 nut.file_mtime = mtime 

367 nut.file_size = size 

368 if nut.content is not None: 

369 nut.content._squirrel_key = nut.key 

370 

371 nuts.append(nut) 

372 n_load += 1 

373 yield nut 

374 

375 if segment is None and len(nuts) == 0: 

376 nuts.append( 

377 Nut( 

378 file_path=path, 

379 file_format=format_this, 

380 file_mtime=mtime, 

381 file_size=size, 

382 kind_id=EMPTY)) 

383 

384 if database and nuts != old_nuts: 

385 if old_nuts or modified: 

386 logger.debug( 

387 'File has been modified since last access: %s' 

388 % path) 

389 

390 if segment is not None: 

391 nuts = list(mod.iload(format_this, path, None, [])) 

392 for nut in nuts: 

393 nut.file_path = path 

394 nut.file_format = format_this 

395 nut.file_mtime = mtime 

396 nut.file_size = size 

397 

398 if len(nuts) == 0: 

399 nuts.append( 

400 Nut( 

401 file_path=path, 

402 file_format=format_this, 

403 file_mtime=mtime, 

404 file_size=size, 

405 kind_id=EMPTY)) 

406 

407 if not transaction: 

408 transaction = database.transaction( 

409 'update content index') 

410 transaction.begin() 

411 

412 database.dig(nuts, transaction=transaction) 

413 if update_selection is not None: 

414 update_selection._set_file_states_force_check( 

415 [path], transaction=transaction) 

416 update_selection._update_nuts(transaction=transaction) 

417 

418 except FileLoadError: 

419 logger.error('Cannot read file: %s' % path) 

420 if database: 

421 if not transaction: 

422 transaction = database.transaction( 

423 'update content index') 

424 transaction.begin() 

425 database.reset(path, transaction=transaction) 

426 

427 clean = True 

428 

429 finally: 

430 if task is not None: 

431 condition = '(nuts: %i from file, %i from cache)' % (n_load, n_db) 

432 task.update(n_files, condition) 

433 if clean: 

434 task.done(condition) 

435 else: 

436 task.fail(condition + ' terminated') 

437 

438 if database and transaction: 

439 transaction.commit() 

440 transaction.close() 

441 

442 if temp_selection: 

443 del temp_selection 

444 

445 

446__all__ = [ 

447 'iload', 

448 'detect_format', 

449 'supported_formats', 

450 'supported_content_kinds', 

451 'get_backend', 

452 'FormatDetectionFailed', 

453 'UnknownFormat', 

454]