Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/io/base.py: 95%
183 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-04 09:52 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-04 09:52 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Squirrel core file reading and indexing.
8'''
10import time
11import logging
13from pyrocko import util
14from pyrocko.io.io_common import FileLoadError
15from pyrocko.progress import progress
17from .backends import \
18 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas
20from ..model import to_kind_ids
22backend_modules = [
23 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas]
26logger = logging.getLogger('psq.io')
29def make_task(*args):
30 return progress.task(*args, logger=logger)
33def update_format_providers():
34 '''Update global mapping from file format to io backend module.'''
36 global g_format_providers
37 g_format_providers = {}
38 for mod in backend_modules:
39 for format in mod.provided_formats():
40 if format not in g_format_providers:
41 g_format_providers[format] = []
43 g_format_providers[format].append(mod)
46g_format_providers = {}
47update_format_providers()
50class FormatDetectionFailed(FileLoadError):
51 '''
52 Exception raised when file format detection fails.
53 '''
55 def __init__(self, path):
56 FileLoadError.__init__(
57 self, 'format detection failed for file: %s' % path)
60class UnknownFormat(Exception):
61 '''
62 Exception raised when user requests an unknown file format.
63 '''
65 def __init__(self, format):
66 Exception.__init__(
67 self, 'unknown format: %s' % format)
70def get_backend(fmt):
71 '''
72 Get squirrel io backend module for a given file format.
74 :param fmt:
75 Format identifier.
76 :type fmt:
77 str
78 '''
80 try:
81 return g_format_providers[fmt][0]
82 except KeyError:
83 raise UnknownFormat(fmt)
86def detect_format(path):
87 '''
88 Determine file type from first 512 bytes.
90 :param path:
91 Path to file.
92 :type path:
93 str
94 '''
96 if path.startswith('virtual:'):
97 return 'virtual'
99 try:
100 with open(path, 'rb') as f:
101 data = f.read(512)
103 except (OSError, IOError):
104 raise FormatDetectionFailed(path)
106 fmt = None
107 for mod in backend_modules:
108 fmt = mod.detect(data)
109 if fmt is not None:
110 return fmt
112 raise FormatDetectionFailed(path)
115def supported_formats():
116 '''
117 Get list of file formats supported by Squirrel.
118 '''
119 return sorted(g_format_providers.keys())
122g_content_kinds = ['waveform', 'station', 'channel', 'response', 'event']
125def supported_content_kinds():
126 '''
127 Get list of supported content kinds offered through Squirrel.
128 '''
129 return g_content_kinds + ['waveform_promise']
132def iload(
133 paths,
134 segment=None,
135 format='detect',
136 database=None,
137 check=True,
138 skip_unchanged=False,
139 content=g_content_kinds,
140 show_progress=True,
141 update_selection=None):
143 '''
144 Iteratively load content or index/reindex meta-information from files.
146 :param paths:
147 Iterator yielding file names to load from or a Squirrel selection
148 object providing the file names.
149 :type paths:
150 iterator yielding :py:class:`str` or
151 :py:class:`~pyrocko.squirrel.selection.Selection`
153 :param segment:
154 File-specific segment identifier (can only be used when loading from a
155 single file).
156 :type segment:
157 int
159 :param format:
160 File format identifier or ``'detect'`` for autodetection. When loading
161 from a selection, per-file format assignation is taken from the hint in
162 the selection and this flag is ignored.
163 :type format:
164 str
166 :param database:
167 Database to use for meta-information caching. When loading from a
168 selection, this should be ``None`` and the database from the selection
169 is used.
170 :type database:
171 :py:class:`~pyrocko.squirrel.database.Database`
173 :param check:
174 If ``True``, investigate modification time and file sizes of known
175 files to debunk modified files (pessimistic mode), or ``False`` to
176 deactivate checks (optimistic mode).
177 :type check:
178 bool
180 :param skip_unchanged:
181 If ``True``, only yield index nuts for new / modified files.
182 :type skip_unchanged:
183 bool
185 :param content:
186 Selection of content types to load.
187 :type content:
188 :py:class:`list` of :py:class:`str`
190 This generator yields :py:class:`~pyrocko.squirrel.model.Nut` objects for
191 individual pieces of information found when reading the given files. Such a
192 nut may represent a waveform, a station, a channel, an event or other data
193 type. The nut itself only contains the meta-information. The actual content
194 information is attached to the nut if requested. All nut meta-information
195 is stored in the squirrel meta-information database. If possible, this
196 function avoids accessing the actual disk files and provides the requested
197 information straight from the database. Modified files are recognized and
198 reindexed as needed.
199 '''
201 from ..selection import Selection
203 n_db = 0
204 n_load = 0
205 selection = None
206 kind_ids = to_kind_ids(content)
208 if isinstance(paths, str):
209 paths = [paths]
210 else:
211 if segment is not None:
212 raise TypeError(
213 'iload: segment argument can only be used when loading from '
214 'a single file')
216 if isinstance(paths, Selection):
217 selection = paths
218 if database is not None:
219 raise TypeError(
220 'iload: database argument must be None when called with a '
221 'selection')
223 database = selection.get_database()
225 if skip_unchanged and not isinstance(paths, Selection):
226 raise TypeError(
227 'iload: need selection when called with "skip_unchanged=True"')
229 temp_selection = None
230 transaction = None
231 if database:
232 if not selection:
233 # Avoid creating temporary selection for small batches.
234 # this is helpful because then, we can avoid locking the database,
235 # e.g. during loading of content, when the content has not been
236 # modified.
237 paths = util.short_to_list(100, paths)
238 if isinstance(paths, list) and len(paths) == 0:
239 return
241 if not (isinstance(paths, list) and len(paths) < 100
242 and not skip_unchanged):
244 temp_selection = database.new_selection(
245 paths, show_progress=show_progress, format=format)
247 selection = temp_selection
249 if skip_unchanged:
250 selection.flag_modified(check)
252 if selection:
253 # undig_grouped starts a long select which causes deadlocks
254 # when transaction is started after starting the select, therefore
255 # the transaction has to be started before in these cases.
256 # The db will be locked for a long time in this case. This could be
257 # solved either by breaking the indexing into smaller blocks in
258 # the caller or by modifying undig_grouped to allow limit and
259 # offset and add an outer loop below.
260 transaction = database.transaction(
261 'update content index')
262 transaction.begin()
263 it = selection.undig_grouped(skip_unchanged=skip_unchanged)
264 else:
265 # The list() causes the query to finish, so we don't have to lock,
266 # and can start a transaction only when encountering a modified/new
267 # file.
268 it = list(database.undig_few(paths, format=format))
270 else:
271 it = (((format, path), []) for path in paths)
273 it = util.short_to_list(100, iter(it))
275 try:
276 n_files_total = len(it)
277 if n_files_total == 0:
278 if transaction:
279 transaction.commit()
280 transaction.close()
281 return
283 except TypeError:
284 n_files_total = None
286 task = None
287 if show_progress:
288 if not kind_ids:
289 task = make_task('Indexing files', n_files_total)
290 else:
291 task = make_task('Loading files', n_files_total)
293 n_files = 0
294 tcommit = time.time()
296 clean = False
297 try:
298 for (format, path), old_nuts in it:
299 if task is not None:
300 condition = '(nuts: %i from file, %i from cache)\n %s' % (
301 n_load, n_db, path)
302 task.update(n_files, condition)
304 n_files += 1
305 # cannot release when iterating a selection (see above)
306 if database and transaction and not selection:
307 tnow = time.time()
308 if tnow - tcommit > 20. or n_files % 1000 == 0:
309 transaction.commit()
310 tcommit = tnow
311 transaction.begin()
313 try:
314 if check and old_nuts and old_nuts[0].file_modified():
315 old_nuts = []
316 modified = True
317 else:
318 modified = False
320 if segment is not None:
321 old_nuts = [
322 nut for nut in old_nuts if nut.file_segment == segment]
324 if old_nuts:
325 db_only_operation = not kind_ids or all(
326 nut.kind_id in kind_ids and nut.content_in_db
327 for nut in old_nuts)
329 if db_only_operation:
330 # logger.debug('using cached information for file %s, '
331 # % path)
333 for nut in old_nuts:
334 if nut.kind_id in kind_ids:
335 database.undig_content(nut)
337 n_db += 1
338 yield nut
340 continue
342 if format == 'detect':
343 if old_nuts and not old_nuts[0].file_modified():
344 format_this = old_nuts[0].file_format
345 else:
346 format_this = detect_format(path)
347 else:
348 format_this = format
350 mod = get_backend(format_this)
351 mtime, size = mod.get_stats(path)
353 if segment is not None:
354 logger.debug(
355 'Reading file "%s", segment "%s".' % (path, segment))
356 else:
357 logger.debug(
358 'Reading file "%s".' % path)
360 nuts = []
361 for nut in mod.iload(format_this, path, segment, content):
362 nut.file_path = path
363 nut.file_format = format_this
364 nut.file_mtime = mtime
365 nut.file_size = size
366 if nut.content is not None:
367 nut.content._squirrel_key = nut.key
369 nuts.append(nut)
370 n_load += 1
371 yield nut
373 if database and nuts != old_nuts:
374 if old_nuts or modified:
375 logger.debug(
376 'File has been modified since last access: %s'
377 % path)
379 if segment is not None:
380 nuts = list(mod.iload(format_this, path, None, []))
381 for nut in nuts:
382 nut.file_path = path
383 nut.file_format = format_this
384 nut.file_mtime = mtime
385 nut.file_size = size
387 if not transaction:
388 transaction = database.transaction(
389 'update content index')
390 transaction.begin()
392 database.dig(nuts, transaction=transaction)
393 if update_selection is not None:
394 update_selection._set_file_states_force_check(
395 [path], transaction=transaction)
396 update_selection._update_nuts(transaction=transaction)
398 except FileLoadError:
399 logger.error('Cannot read file: %s' % path)
400 if database:
401 if not transaction:
402 transaction = database.transaction(
403 'update content index')
404 transaction.begin()
405 database.reset(path, transaction=transaction)
407 clean = True
409 finally:
410 if task is not None:
411 condition = '(nuts: %i from file, %i from cache)' % (n_load, n_db)
412 task.update(n_files, condition)
413 if clean:
414 task.done(condition)
415 else:
416 task.fail(condition + ' terminated')
418 if database and transaction:
419 transaction.commit()
420 transaction.close()
422 if temp_selection:
423 del temp_selection
426__all__ = [
427 'iload',
428 'detect_format',
429 'supported_formats',
430 'supported_content_kinds',
431 'get_backend',
432 'FormatDetectionFailed',
433 'UnknownFormat',
434]