1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import time
7import logging
9from pyrocko import util
10from pyrocko.io.io_common import FileLoadError
11from pyrocko.progress import progress
13from .backends import \
14 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas
16from ..model import to_kind_ids
18backend_modules = [
19 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas]
22logger = logging.getLogger('psq.io')
25def make_task(*args):
26 return progress.task(*args, logger=logger)
29def update_format_providers():
30 '''Update global mapping from file format to io backend module.'''
32 global g_format_providers
33 g_format_providers = {}
34 for mod in backend_modules:
35 for format in mod.provided_formats():
36 if format not in g_format_providers:
37 g_format_providers[format] = []
39 g_format_providers[format].append(mod)
42g_format_providers = {}
43update_format_providers()
46class FormatDetectionFailed(FileLoadError):
47 '''
48 Exception raised when file format detection fails.
49 '''
51 def __init__(self, path):
52 FileLoadError.__init__(
53 self, 'format detection failed for file: %s' % path)
56class UnknownFormat(Exception):
57 '''
58 Exception raised when user requests an unknown file format.
59 '''
61 def __init__(self, format):
62 Exception.__init__(
63 self, 'unknown format: %s' % format)
66def get_backend(fmt):
67 '''
68 Get squirrel io backend module for a given file format.
70 :param fmt:
71 Format identifier.
72 :type fmt:
73 str
74 '''
76 try:
77 return g_format_providers[fmt][0]
78 except KeyError:
79 raise UnknownFormat(fmt)
82def detect_format(path):
83 '''
84 Determine file type from first 512 bytes.
86 :param path:
87 Path to file.
88 :type path:
89 str
90 '''
92 if path.startswith('virtual:'):
93 return 'virtual'
95 try:
96 with open(path, 'rb') as f:
97 data = f.read(512)
99 except (OSError, IOError):
100 raise FormatDetectionFailed(path)
102 fmt = None
103 for mod in backend_modules:
104 fmt = mod.detect(data)
105 if fmt is not None:
106 return fmt
108 raise FormatDetectionFailed(path)
111def supported_formats():
112 '''
113 Get list of file formats supported by Squirrel.
114 '''
115 return sorted(g_format_providers.keys())
118g_content_kinds = ['waveform', 'station', 'channel', 'response', 'event']
121def supported_content_kinds():
122 '''
123 Get list of supported content kinds offered through Squirrel.
124 '''
125 return g_content_kinds + ['waveform_promise']
128def iload(
129 paths,
130 segment=None,
131 format='detect',
132 database=None,
133 check=True,
134 skip_unchanged=False,
135 content=g_content_kinds,
136 show_progress=True,
137 update_selection=None):
139 '''
140 Iteratively load content or index/reindex meta-information from files.
142 :param paths:
143 Iterator yielding file names to load from or a Squirrel selection
144 object providing the file names.
145 :type paths:
146 iterator yielding :py:class:`str` or
147 :py:class:`~pyrocko.squirrel.selection.Selection`
149 :param segment:
150 File-specific segment identifier (can only be used when loading from a
151 single file).
152 :type segment:
153 int
155 :param format:
156 File format identifier or ``'detect'`` for autodetection. When loading
157 from a selection, per-file format assignation is taken from the hint in
158 the selection and this flag is ignored.
159 :type format:
160 str
162 :param database:
163 Database to use for meta-information caching. When loading from a
164 selection, this should be ``None`` and the database from the selection
165 is used.
166 :type database:
167 :py:class:`~pyrocko.squirrel.database.Database`
169 :param check:
170 If ``True``, investigate modification time and file sizes of known
171 files to debunk modified files (pessimistic mode), or ``False`` to
172 deactivate checks (optimistic mode).
173 :type check:
174 bool
176 :param skip_unchanged:
177 If ``True``, only yield index nuts for new / modified files.
178 :type skip_unchanged:
179 bool
181 :param content:
182 Selection of content types to load.
183 :type content:
184 :py:class:`list` of :py:class:`str`
186 This generator yields :py:class:`~pyrocko.squirrel.model.Nut` objects for
187 individual pieces of information found when reading the given files. Such a
188 nut may represent a waveform, a station, a channel, an event or other data
189 type. The nut itself only contains the meta-information. The actual content
190 information is attached to the nut if requested. All nut meta-information
191 is stored in the squirrel meta-information database. If possible, this
192 function avoids accessing the actual disk files and provides the requested
193 information straight from the database. Modified files are recognized and
194 reindexed as needed.
195 '''
197 from ..selection import Selection
199 n_db = 0
200 n_load = 0
201 selection = None
202 kind_ids = to_kind_ids(content)
204 if isinstance(paths, str):
205 paths = [paths]
206 else:
207 if segment is not None:
208 raise TypeError(
209 'iload: segment argument can only be used when loading from '
210 'a single file')
212 if isinstance(paths, Selection):
213 selection = paths
214 if database is not None:
215 raise TypeError(
216 'iload: database argument must be None when called with a '
217 'selection')
219 database = selection.get_database()
221 if skip_unchanged and not isinstance(paths, Selection):
222 raise TypeError(
223 'iload: need selection when called with "skip_unchanged=True"')
225 temp_selection = None
226 transaction = None
227 if database:
228 if not selection:
229 # Avoid creating temporary selection for small batches.
230 # this is helpful because then, we can avoid locking the database,
231 # e.g. during loading of content, when the content has not been
232 # modified.
233 paths = util.short_to_list(100, paths)
234 if isinstance(paths, list) and len(paths) == 0:
235 return
237 if not (isinstance(paths, list) and len(paths) < 100
238 and not skip_unchanged):
240 temp_selection = database.new_selection(
241 paths, show_progress=show_progress, format=format)
243 selection = temp_selection
245 if skip_unchanged:
246 selection.flag_modified(check)
248 if selection:
249 # undig_grouped starts a long select which causes deadlocks
250 # when transaction is started after starting the select, therefore
251 # the transaction has to be started before in these cases.
252 # The db will be locked for a long time in this case. This could be
253 # solved either by breaking the indexing into smaller blocks in
254 # the caller or by modifying undig_grouped to allow limit and
255 # offset and add an outer loop below.
256 transaction = database.transaction(
257 'update content index')
258 transaction.begin()
259 it = selection.undig_grouped(skip_unchanged=skip_unchanged)
260 else:
261 # The list() causes the query to finish, so we don't have to lock,
262 # and can start a transaction only when encountering a modified/new
263 # file.
264 it = list(database.undig_few(paths, format=format))
266 else:
267 it = (((format, path), []) for path in paths)
269 it = util.short_to_list(100, iter(it))
271 try:
272 n_files_total = len(it)
273 if n_files_total == 0:
274 if transaction:
275 transaction.commit()
276 transaction.close()
277 return
279 except TypeError:
280 n_files_total = None
282 task = None
283 if show_progress:
284 if not kind_ids:
285 task = make_task('Indexing files', n_files_total)
286 else:
287 task = make_task('Loading files', n_files_total)
289 n_files = 0
290 tcommit = time.time()
292 clean = False
293 try:
294 for (format, path), old_nuts in it:
295 if task is not None:
296 condition = '(nuts: %i from file, %i from cache)\n %s' % (
297 n_load, n_db, path)
298 task.update(n_files, condition)
300 n_files += 1
301 if database and transaction:
302 tnow = time.time()
303 if tnow - tcommit > 20. or n_files % 1000 == 0:
304 transaction.commit()
305 tcommit = tnow
306 transaction.begin()
308 try:
309 if check and old_nuts and old_nuts[0].file_modified():
310 old_nuts = []
311 modified = True
312 else:
313 modified = False
315 if segment is not None:
316 old_nuts = [
317 nut for nut in old_nuts if nut.file_segment == segment]
319 if old_nuts:
320 db_only_operation = not kind_ids or all(
321 nut.kind_id in kind_ids and nut.content_in_db
322 for nut in old_nuts)
324 if db_only_operation:
325 # logger.debug('using cached information for file %s, '
326 # % path)
328 for nut in old_nuts:
329 if nut.kind_id in kind_ids:
330 database.undig_content(nut)
332 n_db += 1
333 yield nut
335 continue
337 if format == 'detect':
338 if old_nuts and not old_nuts[0].file_modified():
339 format_this = old_nuts[0].file_format
340 else:
341 format_this = detect_format(path)
342 else:
343 format_this = format
345 mod = get_backend(format_this)
346 mtime, size = mod.get_stats(path)
348 if segment is not None:
349 logger.debug(
350 'Reading file "%s", segment "%s".' % (path, segment))
351 else:
352 logger.debug(
353 'Reading file "%s".' % path)
355 nuts = []
356 for nut in mod.iload(format_this, path, segment, content):
357 nut.file_path = path
358 nut.file_format = format_this
359 nut.file_mtime = mtime
360 nut.file_size = size
361 if nut.content is not None:
362 nut.content._squirrel_key = nut.key
364 nuts.append(nut)
365 n_load += 1
366 yield nut
368 if database and nuts != old_nuts:
369 if old_nuts or modified:
370 logger.debug(
371 'File has been modified since last access: %s'
372 % path)
374 if segment is not None:
375 nuts = list(mod.iload(format_this, path, None, []))
376 for nut in nuts:
377 nut.file_path = path
378 nut.file_format = format_this
379 nut.file_mtime = mtime
380 nut.file_size = size
382 if not transaction:
383 transaction = database.transaction(
384 'update content index')
385 transaction.begin()
387 database.dig(nuts, transaction=transaction)
388 if update_selection is not None:
389 update_selection._set_file_states_force_check(
390 [path], transaction=transaction)
391 update_selection._update_nuts(transaction=transaction)
393 except FileLoadError:
394 logger.error('Cannot read file: %s' % path)
395 if database:
396 if not transaction:
397 transaction = database.transaction(
398 'update content index')
399 transaction.begin()
400 database.reset(path, transaction=transaction)
402 clean = True
404 finally:
405 if task is not None:
406 condition = '(nuts: %i from file, %i from cache)' % (n_load, n_db)
407 task.update(n_files, condition)
408 if clean:
409 task.done(condition)
410 else:
411 task.fail(condition + ' terminated')
413 if database and transaction:
414 transaction.commit()
415 transaction.close()
417 if temp_selection:
418 del temp_selection
421__all__ = [
422 'iload',
423 'detect_format',
424 'supported_formats',
425 'supported_content_kinds',
426 'get_backend',
427 'FormatDetectionFailed',
428 'UnknownFormat',
429]