Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/io/base.py: 95%

183 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-06 06:59 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel core file reading and indexing. 

8''' 

9 

10import time 

11import logging 

12 

13from pyrocko import util 

14from pyrocko.io.io_common import FileLoadError 

15from pyrocko.progress import progress 

16 

17from .backends import \ 

18 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas 

19 

20from ..model import to_kind_ids 

21 

22backend_modules = [ 

23 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas] 

24 

25 

26logger = logging.getLogger('psq.io') 

27 

28 

29def make_task(*args): 

30 return progress.task(*args, logger=logger) 

31 

32 

33def update_format_providers(): 

34 '''Update global mapping from file format to io backend module.''' 

35 

36 global g_format_providers 

37 g_format_providers = {} 

38 for mod in backend_modules: 

39 for format in mod.provided_formats(): 

40 if format not in g_format_providers: 

41 g_format_providers[format] = [] 

42 

43 g_format_providers[format].append(mod) 

44 

45 

46g_format_providers = {} 

47update_format_providers() 

48 

49 

50class FormatDetectionFailed(FileLoadError): 

51 ''' 

52 Exception raised when file format detection fails. 

53 ''' 

54 

55 def __init__(self, path): 

56 FileLoadError.__init__( 

57 self, 'format detection failed for file: %s' % path) 

58 

59 

60class UnknownFormat(Exception): 

61 ''' 

62 Exception raised when user requests an unknown file format. 

63 ''' 

64 

65 def __init__(self, format): 

66 Exception.__init__( 

67 self, 'unknown format: %s' % format) 

68 

69 

70def get_backend(fmt): 

71 ''' 

72 Get squirrel io backend module for a given file format. 

73 

74 :param fmt: 

75 Format identifier. 

76 :type fmt: 

77 str 

78 ''' 

79 

80 try: 

81 return g_format_providers[fmt][0] 

82 except KeyError: 

83 raise UnknownFormat(fmt) 

84 

85 

86def detect_format(path): 

87 ''' 

88 Determine file type from first 512 bytes. 

89 

90 :param path: 

91 Path to file. 

92 :type path: 

93 str 

94 ''' 

95 

96 if path.startswith('virtual:'): 

97 return 'virtual' 

98 

99 try: 

100 with open(path, 'rb') as f: 

101 data = f.read(512) 

102 

103 except (OSError, IOError): 

104 raise FormatDetectionFailed(path) 

105 

106 fmt = None 

107 for mod in backend_modules: 

108 fmt = mod.detect(data) 

109 if fmt is not None: 

110 return fmt 

111 

112 raise FormatDetectionFailed(path) 

113 

114 

115def supported_formats(): 

116 ''' 

117 Get list of file formats supported by Squirrel. 

118 ''' 

119 return sorted(g_format_providers.keys()) 

120 

121 

122g_content_kinds = ['waveform', 'station', 'channel', 'response', 'event'] 

123 

124 

125def supported_content_kinds(): 

126 ''' 

127 Get list of supported content kinds offered through Squirrel. 

128 ''' 

129 return g_content_kinds + ['waveform_promise'] 

130 

131 

132def iload( 

133 paths, 

134 segment=None, 

135 format='detect', 

136 database=None, 

137 check=True, 

138 skip_unchanged=False, 

139 content=g_content_kinds, 

140 show_progress=True, 

141 update_selection=None): 

142 

143 ''' 

144 Iteratively load content or index/reindex meta-information from files. 

145 

146 :param paths: 

147 Iterator yielding file names to load from or a Squirrel selection 

148 object providing the file names. 

149 :type paths: 

150 iterator yielding :py:class:`str` or 

151 :py:class:`~pyrocko.squirrel.selection.Selection` 

152 

153 :param segment: 

154 File-specific segment identifier (can only be used when loading from a 

155 single file). 

156 :type segment: 

157 int 

158 

159 :param format: 

160 File format identifier or ``'detect'`` for autodetection. When loading 

161 from a selection, per-file format assignation is taken from the hint in 

162 the selection and this flag is ignored. 

163 :type format: 

164 str 

165 

166 :param database: 

167 Database to use for meta-information caching. When loading from a 

168 selection, this should be ``None`` and the database from the selection 

169 is used. 

170 :type database: 

171 :py:class:`~pyrocko.squirrel.database.Database` 

172 

173 :param check: 

174 If ``True``, investigate modification time and file sizes of known 

175 files to debunk modified files (pessimistic mode), or ``False`` to 

176 deactivate checks (optimistic mode). 

177 :type check: 

178 bool 

179 

180 :param skip_unchanged: 

181 If ``True``, only yield index nuts for new / modified files. 

182 :type skip_unchanged: 

183 bool 

184 

185 :param content: 

186 Selection of content types to load. 

187 :type content: 

188 :py:class:`list` of :py:class:`str` 

189 

190 This generator yields :py:class:`~pyrocko.squirrel.model.Nut` objects for 

191 individual pieces of information found when reading the given files. Such a 

192 nut may represent a waveform, a station, a channel, an event or other data 

193 type. The nut itself only contains the meta-information. The actual content 

194 information is attached to the nut if requested. All nut meta-information 

195 is stored in the squirrel meta-information database. If possible, this 

196 function avoids accessing the actual disk files and provides the requested 

197 information straight from the database. Modified files are recognized and 

198 reindexed as needed. 

199 ''' 

200 

201 from ..selection import Selection 

202 

203 n_db = 0 

204 n_load = 0 

205 selection = None 

206 kind_ids = to_kind_ids(content) 

207 

208 if isinstance(paths, str): 

209 paths = [paths] 

210 else: 

211 if segment is not None: 

212 raise TypeError( 

213 'iload: segment argument can only be used when loading from ' 

214 'a single file') 

215 

216 if isinstance(paths, Selection): 

217 selection = paths 

218 if database is not None: 

219 raise TypeError( 

220 'iload: database argument must be None when called with a ' 

221 'selection') 

222 

223 database = selection.get_database() 

224 

225 if skip_unchanged and not isinstance(paths, Selection): 

226 raise TypeError( 

227 'iload: need selection when called with "skip_unchanged=True"') 

228 

229 temp_selection = None 

230 transaction = None 

231 if database: 

232 if not selection: 

233 # Avoid creating temporary selection for small batches. 

234 # this is helpful because then, we can avoid locking the database, 

235 # e.g. during loading of content, when the content has not been 

236 # modified. 

237 paths = util.short_to_list(100, paths) 

238 if isinstance(paths, list) and len(paths) == 0: 

239 return 

240 

241 if not (isinstance(paths, list) and len(paths) < 100 

242 and not skip_unchanged): 

243 

244 temp_selection = database.new_selection( 

245 paths, show_progress=show_progress, format=format) 

246 

247 selection = temp_selection 

248 

249 if skip_unchanged: 

250 selection.flag_modified(check) 

251 

252 if selection: 

253 # undig_grouped starts a long select which causes deadlocks 

254 # when transaction is started after starting the select, therefore 

255 # the transaction has to be started before in these cases. 

256 # The db will be locked for a long time in this case. This could be 

257 # solved either by breaking the indexing into smaller blocks in 

258 # the caller or by modifying undig_grouped to allow limit and 

259 # offset and add an outer loop below. 

260 transaction = database.transaction( 

261 'update content index') 

262 transaction.begin() 

263 it = selection.undig_grouped(skip_unchanged=skip_unchanged) 

264 else: 

265 # The list() causes the query to finish, so we don't have to lock, 

266 # and can start a transaction only when encountering a modified/new 

267 # file. 

268 it = list(database.undig_few(paths, format=format)) 

269 

270 else: 

271 it = (((format, path), []) for path in paths) 

272 

273 it = util.short_to_list(100, iter(it)) 

274 

275 try: 

276 n_files_total = len(it) 

277 if n_files_total == 0: 

278 if transaction: 

279 transaction.commit() 

280 transaction.close() 

281 return 

282 

283 except TypeError: 

284 n_files_total = None 

285 

286 task = None 

287 if show_progress: 

288 if not kind_ids: 

289 task = make_task('Indexing files', n_files_total) 

290 else: 

291 task = make_task('Loading files', n_files_total) 

292 

293 n_files = 0 

294 tcommit = time.time() 

295 

296 clean = False 

297 try: 

298 for (format, path), old_nuts in it: 

299 if task is not None: 

300 condition = '(nuts: %i from file, %i from cache)\n %s' % ( 

301 n_load, n_db, path) 

302 task.update(n_files, condition) 

303 

304 n_files += 1 

305 # cannot release when iterating a selection (see above) 

306 if database and transaction and not selection: 

307 tnow = time.time() 

308 if tnow - tcommit > 20. or n_files % 1000 == 0: 

309 transaction.commit() 

310 tcommit = tnow 

311 transaction.begin() 

312 

313 try: 

314 if check and old_nuts and old_nuts[0].file_modified(): 

315 old_nuts = [] 

316 modified = True 

317 else: 

318 modified = False 

319 

320 if segment is not None: 

321 old_nuts = [ 

322 nut for nut in old_nuts if nut.file_segment == segment] 

323 

324 if old_nuts: 

325 db_only_operation = not kind_ids or all( 

326 nut.kind_id in kind_ids and nut.content_in_db 

327 for nut in old_nuts) 

328 

329 if db_only_operation: 

330 # logger.debug('using cached information for file %s, ' 

331 # % path) 

332 

333 for nut in old_nuts: 

334 if nut.kind_id in kind_ids: 

335 database.undig_content(nut) 

336 

337 n_db += 1 

338 yield nut 

339 

340 continue 

341 

342 if format == 'detect': 

343 if old_nuts and not old_nuts[0].file_modified(): 

344 format_this = old_nuts[0].file_format 

345 else: 

346 format_this = detect_format(path) 

347 else: 

348 format_this = format 

349 

350 mod = get_backend(format_this) 

351 mtime, size = mod.get_stats(path) 

352 

353 if segment is not None: 

354 logger.debug( 

355 'Reading file "%s", segment "%s".' % (path, segment)) 

356 else: 

357 logger.debug( 

358 'Reading file "%s".' % path) 

359 

360 nuts = [] 

361 for nut in mod.iload(format_this, path, segment, content): 

362 nut.file_path = path 

363 nut.file_format = format_this 

364 nut.file_mtime = mtime 

365 nut.file_size = size 

366 if nut.content is not None: 

367 nut.content._squirrel_key = nut.key 

368 

369 nuts.append(nut) 

370 n_load += 1 

371 yield nut 

372 

373 if database and nuts != old_nuts: 

374 if old_nuts or modified: 

375 logger.debug( 

376 'File has been modified since last access: %s' 

377 % path) 

378 

379 if segment is not None: 

380 nuts = list(mod.iload(format_this, path, None, [])) 

381 for nut in nuts: 

382 nut.file_path = path 

383 nut.file_format = format_this 

384 nut.file_mtime = mtime 

385 nut.file_size = size 

386 

387 if not transaction: 

388 transaction = database.transaction( 

389 'update content index') 

390 transaction.begin() 

391 

392 database.dig(nuts, transaction=transaction) 

393 if update_selection is not None: 

394 update_selection._set_file_states_force_check( 

395 [path], transaction=transaction) 

396 update_selection._update_nuts(transaction=transaction) 

397 

398 except FileLoadError: 

399 logger.error('Cannot read file: %s' % path) 

400 if database: 

401 if not transaction: 

402 transaction = database.transaction( 

403 'update content index') 

404 transaction.begin() 

405 database.reset(path, transaction=transaction) 

406 

407 clean = True 

408 

409 finally: 

410 if task is not None: 

411 condition = '(nuts: %i from file, %i from cache)' % (n_load, n_db) 

412 task.update(n_files, condition) 

413 if clean: 

414 task.done(condition) 

415 else: 

416 task.fail(condition + ' terminated') 

417 

418 if database and transaction: 

419 transaction.commit() 

420 transaction.close() 

421 

422 if temp_selection: 

423 del temp_selection 

424 

425 

426__all__ = [ 

427 'iload', 

428 'detect_format', 

429 'supported_formats', 

430 'supported_content_kinds', 

431 'get_backend', 

432 'FormatDetectionFailed', 

433 'UnknownFormat', 

434]