Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/io/base.py: 94%
187 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Squirrel core file reading and indexing.
8'''
10import time
11import logging
13from pyrocko import util
14from pyrocko.io.io_common import FileLoadError
15from pyrocko import progress
17from .backends import \
18 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas, \
19 spickle
21from ..model import to_kind_ids, EMPTY, Nut
23backend_modules = [
24 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas,
25 spickle]
28logger = logging.getLogger('psq.io')
31def make_task(*args):
32 return progress.task(*args, logger=logger)
35def update_format_providers():
36 '''Update global mapping from file format to io backend module.'''
38 global g_format_providers
39 g_format_providers = {}
40 for mod in backend_modules:
41 for format in mod.provided_formats():
42 if format not in g_format_providers:
43 g_format_providers[format] = []
45 g_format_providers[format].append(mod)
48g_format_providers = {}
49update_format_providers()
52class FormatDetectionFailed(FileLoadError):
53 '''
54 Exception raised when file format detection fails.
55 '''
57 def __init__(self, path):
58 FileLoadError.__init__(
59 self, 'format detection failed for file: %s' % path)
62class UnknownFormat(Exception):
63 '''
64 Exception raised when user requests an unknown file format.
65 '''
67 def __init__(self, format):
68 Exception.__init__(
69 self, 'unknown format: %s' % format)
72def get_backend(fmt):
73 '''
74 Get squirrel io backend module for a given file format.
76 :param fmt:
77 Format identifier.
78 :type fmt:
79 str
80 '''
82 try:
83 return g_format_providers[fmt][0]
84 except KeyError:
85 raise UnknownFormat(fmt)
88def detect_format(path):
89 '''
90 Determine file type from first 512 bytes.
92 :param path:
93 Path to file.
94 :type path:
95 str
96 '''
98 if path.startswith('virtual:'):
99 return 'virtual'
101 try:
102 with open(path, 'rb') as f:
103 data = f.read(512)
105 except (OSError, IOError):
106 raise FormatDetectionFailed(path)
108 fmt = None
109 for mod in backend_modules:
110 fmt = mod.detect(data)
111 if fmt is not None:
112 return fmt
114 raise FormatDetectionFailed(path)
117def supported_formats():
118 '''
119 Get list of file formats supported by Squirrel.
120 '''
121 return sorted(g_format_providers.keys())
124g_content_kinds = ['waveform', 'station', 'channel', 'response', 'event']
127def supported_content_kinds():
128 '''
129 Get list of supported content kinds offered through Squirrel.
130 '''
131 return g_content_kinds + ['waveform_promise']
134def iload(
135 paths,
136 segment=None,
137 format='detect',
138 database=None,
139 check=True,
140 skip_unchanged=False,
141 content=g_content_kinds,
142 show_progress=True,
143 update_selection=None):
145 '''
146 Iteratively load content or index/reindex meta-information from files.
148 :param paths:
149 Iterator yielding file names to load from or a Squirrel selection
150 object providing the file names.
151 :type paths:
152 iterator yielding :py:class:`str` or
153 :py:class:`~pyrocko.squirrel.selection.Selection`
155 :param segment:
156 File-specific segment identifier (can only be used when loading from a
157 single file).
158 :type segment:
159 int
161 :param format:
162 File format identifier or ``'detect'`` for autodetection. When loading
163 from a selection, per-file format assignation is taken from the hint in
164 the selection and this flag is ignored.
165 :type format:
166 str
168 :param database:
169 Database to use for meta-information caching. When loading from a
170 selection, this should be ``None`` and the database from the selection
171 is used.
172 :type database:
173 :py:class:`~pyrocko.squirrel.database.Database`
175 :param check:
176 If ``True``, investigate modification time and file sizes of known
177 files to debunk modified files (pessimistic mode), or ``False`` to
178 deactivate checks (optimistic mode).
179 :type check:
180 bool
182 :param skip_unchanged:
183 If ``True``, only yield index nuts for new / modified files.
184 :type skip_unchanged:
185 bool
187 :param content:
188 Selection of content types to load.
189 :type content:
190 :py:class:`list` of :py:class:`str`
192 This generator yields :py:class:`~pyrocko.squirrel.model.Nut` objects for
193 individual pieces of information found when reading the given files. Such a
194 nut may represent a waveform, a station, a channel, an event or other data
195 type. The nut itself only contains the meta-information. The actual content
196 information is attached to the nut if requested. All nut meta-information
197 is stored in the squirrel meta-information database. If possible, this
198 function avoids accessing the actual disk files and provides the requested
199 information straight from the database. Modified files are recognized and
200 reindexed as needed.
201 '''
203 from ..selection import Selection
205 n_db = 0
206 n_load = 0
207 selection = None
208 kind_ids = to_kind_ids(content)
210 if isinstance(paths, str):
211 paths = [paths]
212 else:
213 if segment is not None:
214 raise TypeError(
215 'iload: segment argument can only be used when loading from '
216 'a single file')
218 if isinstance(paths, Selection):
219 selection = paths
220 if database is not None:
221 raise TypeError(
222 'iload: database argument must be None when called with a '
223 'selection')
225 database = selection.get_database()
227 if skip_unchanged and not isinstance(paths, Selection):
228 raise TypeError(
229 'iload: need selection when called with "skip_unchanged=True"')
231 temp_selection = None
232 transaction = None
233 if database:
234 if not selection:
235 # Avoid creating temporary selection for small batches.
236 # this is helpful because then, we can avoid locking the database,
237 # e.g. during loading of content, when the content has not been
238 # modified.
239 paths = util.short_to_list(100, paths)
240 if isinstance(paths, list) and len(paths) == 0:
241 return
243 if not (isinstance(paths, list) and len(paths) < 100
244 and not skip_unchanged):
246 temp_selection = database.new_selection(
247 paths, show_progress=show_progress, format=format)
249 selection = temp_selection
251 if skip_unchanged:
252 selection.flag_modified(check)
254 if selection:
255 # undig_grouped starts a long select which causes deadlocks
256 # when transaction is started after starting the select, therefore
257 # the transaction has to be started before in these cases.
258 # The db will be locked for a long time in this case. This could be
259 # solved either by breaking the indexing into smaller blocks in
260 # the caller or by modifying undig_grouped to allow limit and
261 # offset and add an outer loop below.
262 transaction = database.transaction(
263 'update content index')
264 transaction.begin()
265 it = selection.undig_grouped(skip_unchanged=skip_unchanged)
266 else:
267 # The list() causes the query to finish, so we don't have to lock,
268 # and can start a transaction only when encountering a modified/new
269 # file.
270 it = list(database.undig_few(paths, format=format))
272 else:
273 it = (((format, path), []) for path in paths)
275 it = util.short_to_list(100, iter(it))
277 try:
278 n_files_total = len(it)
279 if n_files_total == 0:
280 if transaction:
281 transaction.commit()
282 transaction.close()
283 return
285 except TypeError:
286 n_files_total = None
288 task = None
289 if show_progress:
290 if not kind_ids:
291 task = make_task('Indexing files', n_files_total)
292 else:
293 task = make_task('Loading files', n_files_total)
295 n_files = 0
296 tcommit = time.time()
298 clean = False
299 try:
300 for (format, path), old_nuts in it:
301 if task is not None:
302 condition = '(nuts: %i from file, %i from cache)\n %s' % (
303 n_load, n_db, path)
304 task.update(n_files, condition)
306 n_files += 1
307 # cannot release when iterating a selection (see above)
308 if database and transaction and not selection:
309 tnow = time.time()
310 if tnow - tcommit > 20. or n_files % 1000 == 0:
311 transaction.commit()
312 tcommit = tnow
313 transaction.begin()
315 try:
316 if check and old_nuts and old_nuts[0].file_modified():
317 old_nuts = []
318 modified = True
319 else:
320 modified = False
322 if segment is not None:
323 old_nuts = [
324 nut for nut in old_nuts if nut.file_segment == segment]
326 if old_nuts:
327 db_only_operation = not kind_ids or all(
328 nut.kind_id in kind_ids and nut.content_in_db
329 for nut in old_nuts)
331 if db_only_operation:
332 # logger.debug('using cached information for file %s, '
333 # % path)
335 for nut in old_nuts:
336 if nut.kind_id in kind_ids:
337 database.undig_content(nut)
339 n_db += 1
340 yield nut
342 continue
344 if format == 'detect':
345 if old_nuts and not old_nuts[0].file_modified():
346 format_this = old_nuts[0].file_format
347 else:
348 format_this = detect_format(path)
349 else:
350 format_this = format
352 mod = get_backend(format_this)
353 mtime, size = mod.get_stats(path)
355 if segment is not None:
356 logger.debug(
357 'Reading file "%s", segment "%s".' % (path, segment))
358 else:
359 logger.debug(
360 'Reading file "%s".' % path)
362 nuts = []
363 for nut in mod.iload(format_this, path, segment, content):
364 nut.file_path = path
365 nut.file_format = format_this
366 nut.file_mtime = mtime
367 nut.file_size = size
368 if nut.content is not None:
369 nut.content._squirrel_key = nut.key
371 nuts.append(nut)
372 n_load += 1
373 yield nut
375 if segment is None and len(nuts) == 0:
376 nuts.append(
377 Nut(
378 file_path=path,
379 file_format=format_this,
380 file_mtime=mtime,
381 file_size=size,
382 kind_id=EMPTY))
384 if database and nuts != old_nuts:
385 if old_nuts or modified:
386 logger.debug(
387 'File has been modified since last access: %s'
388 % path)
390 if segment is not None:
391 nuts = list(mod.iload(format_this, path, None, []))
392 for nut in nuts:
393 nut.file_path = path
394 nut.file_format = format_this
395 nut.file_mtime = mtime
396 nut.file_size = size
398 if len(nuts) == 0:
399 nuts.append(
400 Nut(
401 file_path=path,
402 file_format=format_this,
403 file_mtime=mtime,
404 file_size=size,
405 kind_id=EMPTY))
407 if not transaction:
408 transaction = database.transaction(
409 'update content index')
410 transaction.begin()
412 database.dig(nuts, transaction=transaction)
413 if update_selection is not None:
414 update_selection._set_file_states_force_check(
415 [path], transaction=transaction)
416 update_selection._update_nuts(transaction=transaction)
418 except FileLoadError:
419 logger.error('Cannot read file: %s' % path)
420 if database:
421 if not transaction:
422 transaction = database.transaction(
423 'update content index')
424 transaction.begin()
425 database.reset(path, transaction=transaction)
427 clean = True
429 finally:
430 if task is not None:
431 condition = '(nuts: %i from file, %i from cache)' % (n_load, n_db)
432 task.update(n_files, condition)
433 if clean:
434 task.done(condition)
435 else:
436 task.fail(condition + ' terminated')
438 if database and transaction:
439 transaction.commit()
440 transaction.close()
442 if temp_selection:
443 del temp_selection
446__all__ = [
447 'iload',
448 'detect_format',
449 'supported_formats',
450 'supported_content_kinds',
451 'get_backend',
452 'FormatDetectionFailed',
453 'UnknownFormat',
454]