1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import time
9import logging
10from builtins import str as newstr
12from pyrocko import util
13from pyrocko.io.io_common import FileLoadError
14from pyrocko.progress import progress
16from .backends import \
17 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas
19from ..model import to_kind_ids
21backend_modules = [
22 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas]
25logger = logging.getLogger('psq.io')
28def make_task(*args):
29 return progress.task(*args, logger=logger)
32def update_format_providers():
33 '''Update global mapping from file format to io backend module.'''
35 global g_format_providers
36 g_format_providers = {}
37 for mod in backend_modules:
38 for format in mod.provided_formats():
39 if format not in g_format_providers:
40 g_format_providers[format] = []
42 g_format_providers[format].append(mod)
45g_format_providers = {}
46update_format_providers()
49class FormatDetectionFailed(FileLoadError):
50 '''
51 Exception raised when file format detection fails.
52 '''
54 def __init__(self, path):
55 FileLoadError.__init__(
56 self, 'format detection failed for file: %s' % path)
59class UnknownFormat(Exception):
60 '''
61 Exception raised when user requests an unknown file format.
62 '''
64 def __init__(self, format):
65 Exception.__init__(
66 self, 'unknown format: %s' % format)
69def get_backend(fmt):
70 '''
71 Get squirrel io backend module for a given file format.
73 :param fmt:
74 Format identifier.
75 :type fmt:
76 str
77 '''
79 try:
80 return g_format_providers[fmt][0]
81 except KeyError:
82 raise UnknownFormat(fmt)
85def detect_format(path):
86 '''
87 Determine file type from first 512 bytes.
89 :param path:
90 Path to file.
91 :type path:
92 str
93 '''
95 if path.startswith('virtual:'):
96 return 'virtual'
98 try:
99 with open(path, 'rb') as f:
100 data = f.read(512)
102 except (OSError, IOError):
103 raise FormatDetectionFailed(path)
105 fmt = None
106 for mod in backend_modules:
107 fmt = mod.detect(data)
108 if fmt is not None:
109 return fmt
111 raise FormatDetectionFailed(path)
114def supported_formats():
115 '''
116 Get list of file formats supported by Squirrel.
117 '''
118 return sorted(g_format_providers.keys())
121g_content_kinds = ['waveform', 'station', 'channel', 'response', 'event']
124def supported_content_kinds():
125 '''
126 Get list of supported content kinds offered through Squirrel.
127 '''
128 return g_content_kinds + ['waveform_promise']
131def iload(
132 paths,
133 segment=None,
134 format='detect',
135 database=None,
136 check=True,
137 skip_unchanged=False,
138 content=g_content_kinds,
139 show_progress=True,
140 update_selection=None):
142 '''
143 Iteratively load content or index/reindex meta-information from files.
145 :param paths:
146 Iterator yielding file names to load from or a Squirrel selection
147 object providing the file names.
148 :type paths:
149 iterator yielding :py:class:`str` or
150 :py:class:`~pyrocko.squirrel.selection.Selection`
152 :param segment:
153 File-specific segment identifier (can only be used when loading from a
154 single file).
155 :type segment:
156 int
158 :param format:
159 File format identifier or ``'detect'`` for autodetection. When loading
160 from a selection, per-file format assignation is taken from the hint in
161 the selection and this flag is ignored.
162 :type format:
163 str
165 :param database:
166 Database to use for meta-information caching. When loading from a
167 selection, this should be ``None`` and the database from the selection
168 is used.
169 :type database:
170 :py:class:`~pyrocko.squirrel.database.Database`
172 :param check:
173 If ``True``, investigate modification time and file sizes of known
174 files to debunk modified files (pessimistic mode), or ``False`` to
175 deactivate checks (optimistic mode).
176 :type check:
177 bool
179 :param skip_unchanged:
180 If ``True``, only yield index nuts for new / modified files.
181 :type skip_unchanged:
182 bool
184 :param content:
185 Selection of content types to load.
186 :type content:
187 :py:class:`list` of :py:class:`str`
189 This generator yields :py:class:`~pyrocko.squirrel.model.Nut` objects for
190 individual pieces of information found when reading the given files. Such a
191 nut may represent a waveform, a station, a channel, an event or other data
192 type. The nut itself only contains the meta-information. The actual content
193 information is attached to the nut if requested. All nut meta-information
194 is stored in the squirrel meta-information database. If possible, this
195 function avoids accessing the actual disk files and provides the requested
196 information straight from the database. Modified files are recognized and
197 reindexed as needed.
198 '''
200 from ..selection import Selection
202 n_db = 0
203 n_load = 0
204 selection = None
205 kind_ids = to_kind_ids(content)
207 if isinstance(paths, (str, newstr)):
208 paths = [paths]
209 else:
210 if segment is not None:
211 raise TypeError(
212 'iload: segment argument can only be used when loading from '
213 'a single file')
215 if isinstance(paths, Selection):
216 selection = paths
217 if database is not None:
218 raise TypeError(
219 'iload: database argument must be None when called with a '
220 'selection')
222 database = selection.get_database()
224 if skip_unchanged and not isinstance(paths, Selection):
225 raise TypeError(
226 'iload: need selection when called with "skip_unchanged=True"')
228 temp_selection = None
229 transaction = None
230 if database:
231 if not selection:
232 # Avoid creating temporary selection for small batches.
233 # this is helpful because then, we can avoid locking the database,
234 # e.g. during loading of content, when the content has not been
235 # modified.
236 paths = util.short_to_list(100, paths)
237 if not (isinstance(paths, list) and len(paths) < 100
238 and not skip_unchanged):
240 temp_selection = database.new_selection(
241 paths, show_progress=show_progress, format=format)
243 selection = temp_selection
245 if skip_unchanged:
246 selection.flag_modified(check)
248 if selection:
249 # undig_grouped starts a long select which causes deadlocks
250 # when transaction is started after starting the select, therefore
251 # the transaction has to be started before in these cases.
252 # The db will be locked for a long time in this case. This could be
253 # solved either by breaking the indexing into smaller blocks in
254 # the caller or by modifying undig_grouped to allow limit and
255 # offset and add an outer loop below.
256 transaction = database.transaction(
257 'update content index')
258 transaction.begin()
259 it = selection.undig_grouped(skip_unchanged=skip_unchanged)
260 else:
261 # The list() causes the query to finish, so we don't have to lock,
262 # and can start a transaction only when encountering a modified/new
263 # file.
264 it = list(database.undig_few(paths, format=format))
266 else:
267 it = (((format, path), []) for path in paths)
269 try:
270 n_files_total = len(it)
271 except TypeError:
272 n_files_total = None
274 task = None
275 if show_progress:
276 if not kind_ids:
277 task = make_task('Indexing files', n_files_total)
278 else:
279 task = make_task('Loading files', n_files_total)
281 n_files = 0
282 tcommit = time.time()
284 clean = False
285 try:
286 for (format, path), old_nuts in it:
287 if task is not None:
288 condition = '(nuts: %i from file, %i from cache)\n %s' % (
289 n_load, n_db, path)
290 task.update(n_files, condition)
292 n_files += 1
293 if database and transaction:
294 tnow = time.time()
295 if tnow - tcommit > 20. or n_files % 1000 == 0:
296 transaction.commit()
297 tcommit = tnow
298 transaction.begin()
300 try:
301 if check and old_nuts and old_nuts[0].file_modified():
302 old_nuts = []
303 modified = True
304 else:
305 modified = False
307 if segment is not None:
308 old_nuts = [
309 nut for nut in old_nuts if nut.file_segment == segment]
311 if old_nuts:
312 db_only_operation = not kind_ids or all(
313 nut.kind_id in kind_ids and nut.content_in_db
314 for nut in old_nuts)
316 if db_only_operation:
317 # logger.debug('using cached information for file %s, '
318 # % path)
320 for nut in old_nuts:
321 if nut.kind_id in kind_ids:
322 database.undig_content(nut)
324 n_db += 1
325 yield nut
327 continue
329 if format == 'detect':
330 if old_nuts and not old_nuts[0].file_modified():
331 format_this = old_nuts[0].file_format
332 else:
333 format_this = detect_format(path)
334 else:
335 format_this = format
337 mod = get_backend(format_this)
338 mtime, size = mod.get_stats(path)
340 if segment is not None:
341 logger.debug(
342 'Reading file "%s", segment "%s".' % (path, segment))
343 else:
344 logger.debug(
345 'Reading file "%s".' % path)
347 nuts = []
348 for nut in mod.iload(format_this, path, segment, content):
349 nut.file_path = path
350 nut.file_format = format_this
351 nut.file_mtime = mtime
352 nut.file_size = size
353 if nut.content is not None:
354 nut.content._squirrel_key = nut.key
356 nuts.append(nut)
357 n_load += 1
358 yield nut
360 if database and nuts != old_nuts:
361 if old_nuts or modified:
362 logger.debug(
363 'File has been modified since last access: %s'
364 % path)
366 if segment is not None:
367 nuts = list(mod.iload(format_this, path, None, []))
368 for nut in nuts:
369 nut.file_path = path
370 nut.file_format = format_this
371 nut.file_mtime = mtime
372 nut.file_size = size
374 if not transaction:
375 transaction = database.transaction(
376 'update content index')
377 transaction.begin()
379 database.dig(nuts, transaction=transaction)
380 if update_selection is not None:
381 update_selection._set_file_states_force_check(
382 [path], transaction=transaction)
383 update_selection._update_nuts(transaction=transaction)
385 except FileLoadError:
386 logger.error('Cannot read file: %s' % path)
387 if database:
388 if not transaction:
389 transaction = database.transaction(
390 'update content index')
391 transaction.begin()
392 database.reset(path, transaction=transaction)
394 clean = True
396 finally:
397 if task is not None:
398 condition = '(nuts: %i from file, %i from cache)' % (n_load, n_db)
399 task.update(n_files, condition)
400 if clean:
401 task.done(condition)
402 else:
403 task.fail(condition + ' terminated')
405 if database and transaction:
406 transaction.commit()
407 transaction.close()
409 if temp_selection:
410 del temp_selection
413__all__ = [
414 'iload',
415 'detect_format',
416 'supported_formats',
417 'supported_content_kinds',
418 'get_backend',
419 'FormatDetectionFailed',
420 'UnknownFormat',
421]