Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/io/base.py: 97%
188 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Squirrel core file reading and indexing.
8'''
10import time
11import logging
12import threading
14from pyrocko import util
15from pyrocko.io.io_common import FileLoadError
16from pyrocko import progress
18from .backends import \
19 mseed, sac, hdf5_optodas, datacube, stationxml, textfiles, virtual, yaml, \
20 tdms_idas, spickle
22from ..model import to_kind_ids, EMPTY, Nut
23from ..database import color_tid_pid
25backend_modules = [
26 mseed, sac, hdf5_optodas, datacube, stationxml, textfiles, virtual, yaml,
27 tdms_idas, spickle]
29logger = logging.getLogger('psq.io')
32def tid():
33 return threading.get_ident()
36def make_task(*args):
37 return progress.task(*args, logger=logger)
40def update_format_providers():
41 '''Update global mapping from file format to io backend module.'''
43 global g_format_providers
44 g_format_providers = {}
45 for mod in backend_modules:
46 for format in mod.provided_formats():
47 if format not in g_format_providers:
48 g_format_providers[format] = []
50 g_format_providers[format].append(mod)
53g_format_providers = {}
54update_format_providers()
57class FormatDetectionFailed(FileLoadError):
58 '''
59 Exception raised when file format detection fails.
60 '''
62 def __init__(self, path):
63 FileLoadError.__init__(
64 self, 'format detection failed for file: %s' % path)
67class UnknownFormat(Exception):
68 '''
69 Exception raised when user requests an unknown file format.
70 '''
72 def __init__(self, format):
73 Exception.__init__(
74 self, 'unknown format: %s' % format)
77def get_backend(fmt):
78 '''
79 Get squirrel io backend module for a given file format.
81 :param fmt:
82 Format identifier.
83 :type fmt:
84 str
85 '''
87 try:
88 return g_format_providers[fmt][0]
89 except KeyError:
90 raise UnknownFormat(fmt)
93def detect_format(path):
94 '''
95 Determine file type from first 512 bytes.
97 :param path:
98 Path to file.
99 :type path:
100 str
101 '''
103 if path.startswith('virtual:'):
104 return 'virtual'
106 try:
107 with open(path, 'rb') as f:
108 data = f.read(512)
110 except (OSError, IOError):
111 raise FormatDetectionFailed(path)
113 fmt = None
114 for mod in backend_modules:
115 fmt = mod.detect(data)
116 if fmt is not None:
117 return fmt
119 raise FormatDetectionFailed(path)
122def supported_formats():
123 '''
124 Get list of file formats supported by Squirrel.
125 '''
126 return sorted(g_format_providers.keys())
129g_content_kinds = ['waveform', 'station', 'channel', 'response', 'event']
132def supported_content_kinds():
133 '''
134 Get list of supported content kinds offered through Squirrel.
135 '''
136 return g_content_kinds + ['waveform_promise']
139def iload(
140 paths,
141 segment=None,
142 format='detect',
143 database=None,
144 check=True,
145 skip_unchanged=False,
146 content=g_content_kinds,
147 show_progress=True,
148 update_selection=None,
149 transaction=None):
151 '''
152 Iteratively load content or index/reindex meta-information from files.
154 :param paths:
155 Iterator yielding file names to load from or a Squirrel selection
156 object providing the file names.
157 :type paths:
158 iterator yielding :py:class:`str` or
159 :py:class:`~pyrocko.squirrel.selection.Selection`
161 :param segment:
162 File-specific segment identifier (can only be used when loading from a
163 single file).
164 :type segment:
165 int
167 :param format:
168 File format identifier or ``'detect'`` for autodetection. When loading
169 from a selection, per-file format assignation is taken from the hint in
170 the selection and this flag is ignored.
171 :type format:
172 str
174 :param database:
175 Database to use for meta-information caching. When loading from a
176 selection, this should be ``None`` and the database from the selection
177 is used.
178 :type database:
179 :py:class:`~pyrocko.squirrel.database.Database`
181 :param check:
182 If ``True``, investigate modification time and file sizes of known
183 files to debunk modified files (pessimistic mode), or ``False`` to
184 deactivate checks (optimistic mode).
185 :type check:
186 bool
188 :param skip_unchanged:
189 If ``True``, only yield index nuts for new / modified files.
190 :type skip_unchanged:
191 bool
193 :param content:
194 Selection of content types to load.
195 :type content:
196 :py:class:`list` of :py:class:`str`
198 This generator yields :py:class:`~pyrocko.squirrel.model.Nut` objects for
199 individual pieces of information found when reading the given files. Such a
200 nut may represent a waveform, a station, a channel, an event or other data
201 type. The nut itself only contains the meta-information. The actual content
202 information is attached to the nut if requested. All nut meta-information
203 is stored in the squirrel meta-information database. If possible, this
204 function avoids accessing the actual disk files and provides the requested
205 information straight from the database. Modified files are recognized and
206 reindexed as needed.
207 '''
209 from ..selection import Selection
211 outer_transaction = transaction
213 n_db = 0
214 n_load = 0
215 selection = None
216 kind_ids = to_kind_ids(content)
218 if isinstance(paths, str):
219 paths = [paths]
220 else:
221 if segment is not None:
222 raise TypeError(
223 'iload: segment argument can only be used when loading from '
224 'a single file')
226 if isinstance(paths, Selection):
227 selection = paths
228 if database is not None:
229 raise TypeError(
230 'iload: database argument must be None when called with a '
231 'selection')
233 database = selection.get_database()
235 if skip_unchanged and not isinstance(paths, Selection):
236 raise TypeError(
237 'iload: need selection when called with "skip_unchanged=True"')
239 temp_selection = None
240 transaction = None
241 if database:
242 if not selection:
243 # Avoid creating temporary selection for small batches.
244 # this is helpful because then, we can avoid locking the database,
245 # e.g. during loading of content, when the content has not been
246 # modified.
247 paths = util.short_to_list(100, paths)
248 if isinstance(paths, list) and len(paths) == 0:
249 return
251 if not (isinstance(paths, list) and len(paths) < 100
252 and not skip_unchanged):
254 temp_selection = database.new_selection(
255 paths, show_progress=show_progress, format=format)
257 selection = temp_selection
259 if skip_unchanged:
260 selection.flag_modified(check, transaction=outer_transaction)
262 if selection:
263 it = selection.undig_grouped(skip_unchanged=skip_unchanged)
264 else:
265 # The list() causes the query to finish, so we don't have to lock,
266 # and can start a transaction only when encountering a modified/new
267 # file.
268 it = list(
269 database.undig_few(paths, format=format, segment=segment)
270 )
272 else:
273 it = (((format, path), []) for path in paths)
275 it = util.short_to_list(100, iter(it))
277 try:
278 n_files_total = len(it)
279 if n_files_total == 0:
280 return
282 except TypeError:
283 n_files_total = None
285 task = None
286 if show_progress:
287 if not kind_ids:
288 task = make_task('Indexing files', n_files_total)
289 else:
290 task = make_task('Loading files', n_files_total)
292 n_files = 0
293 tcommit = time.time()
295 clean = False
296 try:
297 for (format, path), old_nuts in it:
298 if task is not None:
299 condition = '(nuts: %i from file, %i from cache)\n %s' % (
300 n_load, n_db, path)
301 task.update(n_files, condition)
303 n_files += 1
304 if database and transaction:
305 tnow = time.time()
306 if tnow - tcommit > 20. or n_files % 200 == 0 \
307 and not outer_transaction:
309 transaction.commit()
310 tcommit = tnow
311 transaction.close()
312 transaction = database.transaction(
313 'update content index')
315 transaction.begin()
317 try:
318 if check and old_nuts and old_nuts[0].file_modified():
319 old_nuts = []
320 modified = True
321 else:
322 modified = False
324 if segment is not None:
325 old_nuts = [
326 nut for nut in old_nuts if nut.file_segment == segment]
328 if old_nuts:
329 db_only_operation = not kind_ids or all(
330 nut.kind_id in kind_ids and nut.content_in_db
331 for nut in old_nuts)
333 if db_only_operation:
334 # logger.debug('using cached information for file %s, '
335 # % path)
337 for nut in old_nuts:
338 if nut.kind_id in kind_ids:
339 database.undig_content(nut)
341 n_db += 1
342 yield nut
344 continue
346 if format == 'detect':
347 if old_nuts and not old_nuts[0].file_modified():
348 format_this = old_nuts[0].file_format
349 else:
350 format_this = detect_format(path)
351 else:
352 format_this = format
354 mod = get_backend(format_this)
355 mtime, size = mod.get_stats(path)
357 if segment is not None:
358 logger.debug(
359 'Reading file "%s", segment "%s".' % (path, segment))
360 else:
361 logger.debug(
362 'Reading file "%s". %s' % (path, color_tid_pid()))
364 nuts = []
365 for nut in mod.iload(format_this, path, segment, content):
366 nut.file_path = path
367 nut.file_format = format_this
368 nut.file_mtime = mtime
369 nut.file_size = size
370 if nut.content is not None:
371 nut.content._squirrel_key = nut.key
373 nuts.append(nut)
374 n_load += 1
375 yield nut
377 if segment is None and len(nuts) == 0:
378 nuts.append(
379 Nut(
380 file_path=path,
381 file_format=format_this,
382 file_mtime=mtime,
383 file_size=size,
384 kind_id=EMPTY))
386 if database and nuts != old_nuts:
387 if old_nuts or modified:
388 logger.debug(
389 'File has been modified since last access: %s'
390 % path)
392 if segment is not None:
393 nuts = list(mod.iload(format_this, path, None, []))
394 for nut in nuts:
395 nut.file_path = path
396 nut.file_format = format_this
397 nut.file_mtime = mtime
398 nut.file_size = size
400 if len(nuts) == 0:
401 nuts.append(
402 Nut(
403 file_path=path,
404 file_format=format_this,
405 file_mtime=mtime,
406 file_size=size,
407 kind_id=EMPTY))
409 if not transaction and not outer_transaction:
410 transaction = database.transaction(
411 'update content index')
412 transaction.begin()
414 database.dig(
415 nuts, transaction=(outer_transaction or transaction))
417 if update_selection is not None:
418 update_selection._set_file_states_force_check(
419 [path],
420 transaction=(outer_transaction or transaction))
422 update_selection._update_nuts(
423 transaction=(outer_transaction or transaction))
425 except FileLoadError:
426 logger.error('Cannot read file: %s' % path)
427 if database:
428 if not transaction and not outer_transaction:
429 transaction = database.transaction(
430 'update content index')
431 transaction.begin()
432 database.reset(
433 path, transaction=(outer_transaction or transaction))
435 clean = True
437 finally:
438 if task is not None:
439 condition = '(nuts: %i from file, %i from cache)' % (n_load, n_db)
440 task.update(n_files, condition)
441 if clean:
442 task.done(condition)
443 else:
444 task.fail(condition + ' terminated')
446 if database and transaction:
447 transaction.commit()
448 transaction.close()
450 if temp_selection:
451 del temp_selection
454__all__ = [
455 'iload',
456 'detect_format',
457 'supported_formats',
458 'supported_content_kinds',
459 'get_backend',
460 'FormatDetectionFailed',
461 'UnknownFormat',
462]