Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/pile.py: 71%
928 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 06:59 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 06:59 +0000
1# https://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Waveform archive lookup, data loading and caching infrastructure.
9.. note::
11 This module has been superseded by :py:mod:`~pyrocko.squirrel` but will
12 remain available for backwards compatibility.
13'''
15import os
16import logging
17import time
18import copy
19import re
20import sys
21import operator
22import math
23import hashlib
24try:
25 import cPickle as pickle
26except ImportError:
27 import pickle
30from . import avl
31from . import trace, io, util
32from . import config
33from .trace import degapper
36is_windows = sys.platform.startswith('win')
37show_progress_force_off = False
38version_salt = 'v1-'
41def ehash(s):
42 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest()
45def cmp(a, b):
46 return int(a > b) - int(a < b)
49def sl(s):
50 return [str(x) for x in sorted(s)]
53class Counter(dict):
55 def __missing__(self, k):
56 return 0
58 def update(self, other):
59 for k, v in other.items():
60 self[k] += v
62 def subtract(self, other):
63 for k, v in other.items():
64 self[k] -= v
65 if self[k] <= 0:
66 del self[k]
68 def subtract1(self, k):
69 self[k] -= 1
70 if self[k] <= 0:
71 del self[k]
74def fix_unicode_copy(counter, func):
75 counter_new = Counter()
76 for k in counter:
77 counter_new[func(k)] = counter[k]
78 return counter_new
81pjoin = os.path.join
82logger = logging.getLogger('pyrocko.pile')
85def avl_remove_exact(avltree, element):
86 ilo, ihi = avltree.span(element)
87 for i in range(ilo, ihi):
88 if avltree[i] is element:
89 avltree.remove_at(i)
90 return i
92 raise ValueError(
93 'avl_remove_exact(avltree, element): element not in avltree')
96def cmpfunc(key):
97 if isinstance(key, str):
98 # special cases; these run about 50% faster than the generic one on
99 # Python 2.5
100 if key == 'tmin':
101 return lambda a, b: cmp(a.tmin, b.tmin)
102 if key == 'tmax':
103 return lambda a, b: cmp(a.tmax, b.tmax)
105 key = operator.attrgetter(key)
107 return lambda a, b: cmp(key(a), key(b))
110g_dummys = {}
113def get_dummy(key):
114 if key not in g_dummys:
115 class Dummy(object):
116 def __init__(self, k):
117 setattr(self, key, k)
119 g_dummys[key] = Dummy
121 return g_dummys[key]
124class TooMany(Exception):
125 def __init__(self, n):
126 Exception.__init__(self)
127 self.n = n
130class Sorted(object):
131 def __init__(self, values=[], key=None):
132 self._set_key(key)
133 self._avl = avl.new(values, self._cmp)
135 def _set_key(self, key):
136 self._key = key
137 self._cmp = cmpfunc(key)
138 if isinstance(key, str):
139 self._dummy = get_dummy(key)
141 def __getstate__(self):
142 state = list(self._avl.iter()), self._key
143 return state
145 def __setstate__(self, state):
146 it, key = state
147 self._set_key(key)
148 self._avl = avl.from_iter(iter(it), len(it))
150 def insert(self, value):
151 self._avl.insert(value)
153 def remove(self, value):
154 return avl_remove_exact(self._avl, value)
156 def remove_at(self, i):
157 return self._avl.remove_at(i)
159 def insert_many(self, values):
160 for value in values:
161 self._avl.insert(value)
163 def remove_many(self, values):
164 for value in values:
165 avl_remove_exact(self._avl, value)
167 def __iter__(self):
168 return iter(self._avl)
170 def with_key_in(self, kmin, kmax):
171 omin, omax = self._dummy(kmin), self._dummy(kmax)
172 ilo, ihi = self._avl.span(omin, omax)
173 return self._avl[ilo:ihi]
175 def with_key_in_limited(self, kmin, kmax, nmax):
176 omin, omax = self._dummy(kmin), self._dummy(kmax)
177 ilo, ihi = self._avl.span(omin, omax)
178 if ihi - ilo > nmax:
179 raise TooMany(ihi - ilo)
181 return self._avl[ilo:ihi]
183 def index(self, value):
184 ilo, ihi = self._avl.span(value)
185 for i in range(ilo, ihi):
186 if self._avl[i] is value:
187 return i
189 raise ValueError('element is not in avl tree')
191 def min(self):
192 return self._avl.min()
194 def max(self):
195 return self._avl.max()
197 def __len__(self):
198 return len(self._avl)
200 def __getitem__(self, i):
201 return self._avl[i]
204class TracesFileCache(object):
205 '''
206 Manages trace metainformation cache.
208 For each directory with files containing traces, one cache file is
209 maintained to hold the trace metainformation of all files which are
210 contained in the directory.
211 '''
213 caches = {}
215 def __init__(self, cachedir):
216 '''
217 Create new cache.
219 :param cachedir: directory to hold the cache files.
220 '''
222 self.cachedir = cachedir
223 self.dircaches = {}
224 self.modified = set()
225 util.ensuredir(self.cachedir)
227 def get(self, abspath):
228 '''
229 Try to get an item from the cache.
231 :param abspath: absolute path of the object to retrieve
233 :returns: a stored object is returned or None if nothing could be
234 found.
235 '''
237 dircache = self._get_dircache_for(abspath)
238 if abspath in dircache:
239 return dircache[abspath]
240 return None
242 def put(self, abspath, tfile):
243 '''
244 Put an item into the cache.
246 :param abspath: absolute path of the object to be stored
247 :param tfile: object to be stored
248 '''
250 cachepath = self._dircachepath(abspath)
251 # get lock on cachepath here
252 dircache = self._get_dircache(cachepath)
253 dircache[abspath] = tfile
254 self.modified.add(cachepath)
256 def dump_modified(self):
257 '''
258 Save any modifications to disk.
259 '''
261 for cachepath in self.modified:
262 self._dump_dircache(self.dircaches[cachepath], cachepath)
263 # unlock
265 self.modified = set()
267 def clean(self):
268 '''
269 Weed out missing files from the disk caches.
270 '''
272 self.dump_modified()
274 for fn in os.listdir(self.cachedir):
275 if len(fn) == 40:
276 cache = self._load_dircache(pjoin(self.cachedir, fn))
277 self._dump_dircache(cache, pjoin(self.cachedir, fn))
279 def _get_dircache_for(self, abspath):
280 return self._get_dircache(self._dircachepath(abspath))
282 def _get_dircache(self, cachepath):
283 if cachepath not in self.dircaches:
284 if os.path.isfile(cachepath):
285 self.dircaches[cachepath] = self._load_dircache(cachepath)
286 else:
287 self.dircaches[cachepath] = {}
289 return self.dircaches[cachepath]
291 def _dircachepath(self, abspath):
292 cachefn = ehash(os.path.dirname(abspath))
293 return pjoin(self.cachedir, cachefn)
295 def _load_dircache(self, cachefilename):
297 with open(cachefilename, 'rb') as f:
298 cache = pickle.load(f)
300 # weed out files which no longer exist
301 for fn in list(cache.keys()):
302 if not os.path.isfile(fn):
303 del cache[fn]
305 time_float = util.get_time_float()
307 for v in cache.values():
308 v.trees_from_content(v.traces)
309 for tr in v.traces:
310 tr.file = v
311 # fix Py2 codes to not include unicode when the cache file
312 # was created with Py3
313 if not isinstance(tr.station, str):
314 tr.prune_from_reuse_cache()
315 tr.set_codes(
316 str(tr.network),
317 str(tr.station),
318 str(tr.location),
319 str(tr.channel))
321 tr.tmin = time_float(tr.tmin)
322 tr.tmax = time_float(tr.tmax)
324 v.data_use_count = 0
325 v.data_loaded = False
326 v.fix_unicode_codes()
328 return cache
330 def _dump_dircache(self, cache, cachefilename):
332 if not cache:
333 if os.path.exists(cachefilename):
334 os.remove(cachefilename)
335 return
337 # make a copy without the parents and the binsearch trees
338 cache_copy = {}
339 for fn in cache.keys():
340 trf = copy.copy(cache[fn])
341 trf.parent = None
342 trf.by_tmin = None
343 trf.by_tmax = None
344 trf.by_tlen = None
345 trf.by_mtime = None
346 trf.data_use_count = 0
347 trf.data_loaded = False
348 traces = []
349 for tr in trf.traces:
350 tr = tr.copy(data=False)
351 tr.ydata = None
352 tr.meta = None
353 tr.file = trf
354 traces.append(tr)
356 trf.traces = traces
358 cache_copy[fn] = trf
360 tmpfn = cachefilename+'.%i.tmp' % os.getpid()
361 with open(tmpfn, 'wb') as f:
362 pickle.dump(cache_copy, f, protocol=2)
364 if is_windows and os.path.exists(cachefilename):
365 # windows doesn't allow to rename over existing file
366 os.unlink(cachefilename)
368 os.rename(tmpfn, cachefilename)
371def get_cache(cachedir):
372 '''
373 Get global TracesFileCache object for given directory.
374 '''
375 if cachedir not in TracesFileCache.caches:
376 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir)
378 return TracesFileCache.caches[cachedir]
381def loader(
382 filenames, fileformat, cache, filename_attributes,
383 show_progress=True, update_progress=None):
385 if show_progress_force_off:
386 show_progress = False
388 class Progress(object):
389 def __init__(self, label, n):
390 self._label = label
391 self._n = n
392 self._bar = None
393 if show_progress:
394 self._bar = util.progressbar(label, self._n)
396 if update_progress:
397 update_progress(label, 0, self._n)
399 def update(self, i):
400 if self._bar:
401 if i < self._n-1:
402 self._bar.update(i)
403 else:
404 self._bar.finish()
405 self._bar = None
407 abort = False
408 if update_progress:
409 abort = update_progress(self._label, i, self._n)
411 return abort
413 def finish(self):
414 if self._bar:
415 self._bar.finish()
416 self._bar = None
418 if not filenames:
419 logger.warning('No files to load from')
420 return
422 regex = None
423 if filename_attributes:
424 regex = re.compile(filename_attributes)
426 try:
427 progress = Progress('Looking at files', len(filenames))
429 failures = []
430 to_load = []
431 for i, filename in enumerate(filenames):
432 try:
433 abspath = os.path.abspath(filename)
435 substitutions = None
436 if regex:
437 m = regex.search(filename)
438 if not m:
439 raise FilenameAttributeError(
440 "Cannot get attributes with pattern '%s' "
441 "from path '%s'" % (filename_attributes, filename))
443 substitutions = {}
444 for k in m.groupdict():
445 if k in ('network', 'station', 'location', 'channel'):
446 substitutions[k] = m.groupdict()[k]
448 mtime = os.stat(filename)[8]
449 tfile = None
450 if cache:
451 tfile = cache.get(abspath)
453 mustload = (
454 not tfile or
455 (tfile.format != fileformat and fileformat != 'detect') or
456 tfile.mtime != mtime or
457 substitutions is not None)
459 to_load.append(
460 (mustload, mtime, abspath, substitutions, tfile))
462 except (OSError, FilenameAttributeError) as xerror:
463 failures.append(abspath)
464 logger.warning(xerror)
466 abort = progress.update(i+1)
467 if abort:
468 progress.update(len(filenames))
469 return
471 progress.update(len(filenames))
473 to_load.sort(key=lambda x: x[2])
475 nload = len([1 for x in to_load if x[0]])
476 iload = 0
478 count_all = False
479 if nload < 0.01*len(to_load):
480 nload = len(to_load)
481 count_all = True
483 if to_load:
484 progress = Progress('Scanning files', nload)
486 for (mustload, mtime, abspath, substitutions, tfile) in to_load:
487 try:
488 if mustload:
489 tfile = TracesFile(
490 None, abspath, fileformat,
491 substitutions=substitutions, mtime=mtime)
493 if cache and not substitutions:
494 cache.put(abspath, tfile)
496 if not count_all:
497 iload += 1
499 if count_all:
500 iload += 1
502 except (io.FileLoadError, OSError) as xerror:
503 failures.append(abspath)
504 logger.warning(xerror)
505 else:
506 yield tfile
508 abort = progress.update(iload+1)
509 if abort:
510 break
512 progress.update(nload)
514 if failures:
515 logger.warning(
516 'The following file%s caused problems and will be ignored:\n' %
517 util.plural_s(len(failures)) + '\n'.join(failures))
519 if cache:
520 cache.dump_modified()
521 finally:
522 progress.finish()
525def tlen(x):
526 return x.tmax-x.tmin
529class TracesGroup(object):
531 '''
532 Trace container base class.
534 Base class for Pile, SubPile, and TracesFile, i.e. anything containing
535 a collection of several traces. A TracesGroup object maintains lookup sets
536 of some of the traces meta-information, as well as a combined time-range
537 of its contents.
538 '''
540 def __init__(self, parent):
541 self.parent = parent
542 self.empty()
543 self.nupdates = 0
544 self.abspath = None
546 def set_parent(self, parent):
547 self.parent = parent
549 def get_parent(self):
550 return self.parent
552 def empty(self):
553 self.networks, self.stations, self.locations, self.channels, \
554 self.nslc_ids, self.deltats = [Counter() for x in range(6)]
555 self.by_tmin = Sorted([], 'tmin')
556 self.by_tmax = Sorted([], 'tmax')
557 self.by_tlen = Sorted([], tlen)
558 self.by_mtime = Sorted([], 'mtime')
559 self.tmin, self.tmax = None, None
560 self.deltatmin, self.deltatmax = None, None
562 def trees_from_content(self, content):
563 self.by_tmin = Sorted(content, 'tmin')
564 self.by_tmax = Sorted(content, 'tmax')
565 self.by_tlen = Sorted(content, tlen)
566 self.by_mtime = Sorted(content, 'mtime')
567 self.adjust_minmax()
569 def fix_unicode_codes(self):
570 for net in self.networks:
571 if isinstance(net, str):
572 return
574 self.networks = fix_unicode_copy(self.networks, str)
575 self.stations = fix_unicode_copy(self.stations, str)
576 self.locations = fix_unicode_copy(self.locations, str)
577 self.channels = fix_unicode_copy(self.channels, str)
578 self.nslc_ids = fix_unicode_copy(
579 self.nslc_ids, lambda k: tuple(str(x) for x in k))
581 def add(self, content):
582 '''
583 Add content to traces group and update indices.
585 Accepts :py:class:`pyrocko.trace.Trace` objects and
586 :py:class:`pyrocko.pile.TracesGroup` objects.
587 '''
589 if isinstance(content, (trace.Trace, TracesGroup)):
590 content = [content]
592 for c in content:
594 if isinstance(c, TracesGroup):
595 self.networks.update(c.networks)
596 self.stations.update(c.stations)
597 self.locations.update(c.locations)
598 self.channels.update(c.channels)
599 self.nslc_ids.update(c.nslc_ids)
600 self.deltats.update(c.deltats)
602 self.by_tmin.insert_many(c.by_tmin)
603 self.by_tmax.insert_many(c.by_tmax)
604 self.by_tlen.insert_many(c.by_tlen)
605 self.by_mtime.insert_many(c.by_mtime)
607 elif isinstance(c, trace.Trace):
608 self.networks[c.network] += 1
609 self.stations[c.station] += 1
610 self.locations[c.location] += 1
611 self.channels[c.channel] += 1
612 self.nslc_ids[c.nslc_id] += 1
613 self.deltats[c.deltat] += 1
615 self.by_tmin.insert(c)
616 self.by_tmax.insert(c)
617 self.by_tlen.insert(c)
618 self.by_mtime.insert(c)
620 self.adjust_minmax()
622 self.nupdates += 1
623 self.notify_listeners('add', content)
625 if self.parent is not None:
626 self.parent.add(content)
628 def remove(self, content):
629 '''
630 Remove content to traces group and update indices.
631 '''
632 if isinstance(content, (trace.Trace, TracesGroup)):
633 content = [content]
635 for c in content:
637 if isinstance(c, TracesGroup):
638 self.networks.subtract(c.networks)
639 self.stations.subtract(c.stations)
640 self.locations.subtract(c.locations)
641 self.channels.subtract(c.channels)
642 self.nslc_ids.subtract(c.nslc_ids)
643 self.deltats.subtract(c.deltats)
645 self.by_tmin.remove_many(c.by_tmin)
646 self.by_tmax.remove_many(c.by_tmax)
647 self.by_tlen.remove_many(c.by_tlen)
648 self.by_mtime.remove_many(c.by_mtime)
650 elif isinstance(c, trace.Trace):
651 self.networks.subtract1(c.network)
652 self.stations.subtract1(c.station)
653 self.locations.subtract1(c.location)
654 self.channels.subtract1(c.channel)
655 self.nslc_ids.subtract1(c.nslc_id)
656 self.deltats.subtract1(c.deltat)
658 self.by_tmin.remove(c)
659 self.by_tmax.remove(c)
660 self.by_tlen.remove(c)
661 self.by_mtime.remove(c)
663 self.adjust_minmax()
665 self.nupdates += 1
666 self.notify_listeners('remove', content)
668 if self.parent is not None:
669 self.parent.remove(content)
671 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None):
672 '''
673 Return list of :py:class:`pyrocko.trace.Trace` objects where given
674 arguments ``tmin`` and ``tmax`` match.
676 :param tmin: start time
677 :param tmax: end time
678 :param group_selector: lambda expression taking group dict of regex
679 match object as a single argument and which returns true or false
680 to keep or reject a file (default: ``None``)
681 :param trace_selector: lambda expression taking group dict of regex
682 match object as a single argument and which returns true or false
683 to keep or reject a file (default: ``None``)
684 '''
686 if not self.by_tmin or not self.is_relevant(
687 tmin, tmax, group_selector):
689 return []
691 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax)
692 if tr.is_relevant(tmin, tmax, trace_selector)]
694 def adjust_minmax(self):
695 if self.by_tmin:
696 self.tmin = self.by_tmin.min().tmin
697 self.tmax = self.by_tmax.max().tmax
698 t = self.by_tlen.max()
699 self.tlenmax = t.tmax - t.tmin
700 self.mtime = self.by_mtime.max().mtime
701 deltats = list(self.deltats.keys())
702 self.deltatmin = min(deltats)
703 self.deltatmax = max(deltats)
704 else:
705 self.tmin = None
706 self.tmax = None
707 self.tlenmax = None
708 self.mtime = None
709 self.deltatmin = None
710 self.deltatmax = None
712 def notify_listeners(self, what, content):
713 pass
715 def get_update_count(self):
716 return self.nupdates
718 def overlaps(self, tmin, tmax):
719 return self.tmin is not None \
720 and tmax >= self.tmin and self.tmax >= tmin
722 def is_relevant(self, tmin, tmax, group_selector=None):
723 if self.tmin is None or self.tmax is None:
724 return False
725 return tmax >= self.tmin and self.tmax >= tmin and (
726 group_selector is None or group_selector(self))
729class MemTracesFile(TracesGroup):
731 '''
732 This is needed to make traces without an actual disc file to be inserted
733 into a Pile.
734 '''
736 def __init__(self, parent, traces):
737 TracesGroup.__init__(self, parent)
738 self.add(traces)
739 self.mtime = time.time()
741 def add(self, traces):
742 if isinstance(traces, trace.Trace):
743 traces = [traces]
745 for tr in traces:
746 tr.file = self
748 TracesGroup.add(self, traces)
750 def load_headers(self, mtime=None):
751 pass
753 def load_data(self):
754 pass
756 def use_data(self):
757 pass
759 def drop_data(self):
760 pass
762 def reload_if_modified(self):
763 return False
765 def iter_traces(self):
766 for tr in self.by_tmin:
767 yield tr
769 def get_traces(self):
770 return list(self.by_tmin)
772 def gather_keys(self, gather, selector=None):
773 keys = set()
774 for tr in self.by_tmin:
775 if selector is None or selector(tr):
776 keys.add(gather(tr))
778 return keys
780 def __str__(self):
782 s = 'MemTracesFile\n'
783 s += 'file mtime: %s\n' % util.time_to_str(self.mtime)
784 s += 'number of traces: %i\n' % len(self.by_tmin)
785 s += 'timerange: %s - %s\n' % (
786 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
787 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
788 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
789 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
790 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
791 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
792 return s
795class TracesFile(TracesGroup):
796 def __init__(
797 self, parent, abspath, format,
798 substitutions=None, mtime=None):
800 TracesGroup.__init__(self, parent)
801 self.abspath = abspath
802 self.format = format
803 self.traces = []
804 self.data_loaded = False
805 self.data_use_count = 0
806 self.substitutions = substitutions
807 self.load_headers(mtime=mtime)
808 self.mtime = mtime
810 def load_headers(self, mtime=None):
811 logger.debug('loading headers from file: %s' % self.abspath)
812 if mtime is None:
813 self.mtime = os.stat(self.abspath)[8]
815 def kgen(tr):
816 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id
818 self.remove(self.traces)
819 ks = set()
820 for tr in io.load(self.abspath,
821 format=self.format,
822 getdata=False,
823 substitutions=self.substitutions):
825 k = kgen(tr)
826 if k not in ks:
827 ks.add(k)
828 self.traces.append(tr)
829 tr.file = self
831 self.add(self.traces)
833 self.data_loaded = False
834 self.data_use_count = 0
836 def load_data(self, force=False):
837 file_changed = False
838 if not self.data_loaded or force:
839 logger.debug('loading data from file: %s' % self.abspath)
841 def kgen(tr):
842 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id
844 traces_ = io.load(self.abspath, format=self.format, getdata=True,
845 substitutions=self.substitutions)
847 # prevent adding duplicate snippets from corrupt mseed files
848 k_loaded = set()
849 traces = []
850 for tr in traces_:
851 k = kgen(tr)
852 if k not in k_loaded:
853 k_loaded.add(k)
854 traces.append(tr)
856 k_current_d = dict((kgen(tr), tr) for tr in self.traces)
857 k_current = set(k_current_d)
858 k_new = k_loaded - k_current
859 k_delete = k_current - k_loaded
860 k_unchanged = k_current & k_loaded
862 for tr in self.traces[:]:
863 if kgen(tr) in k_delete:
864 self.remove(tr)
865 self.traces.remove(tr)
866 tr.file = None
867 file_changed = True
869 for tr in traces:
870 if kgen(tr) in k_new:
871 tr.file = self
872 self.traces.append(tr)
873 self.add(tr)
874 file_changed = True
876 for tr in traces:
877 if kgen(tr) in k_unchanged:
878 ctr = k_current_d[kgen(tr)]
879 ctr.ydata = tr.ydata
881 self.data_loaded = True
883 if file_changed:
884 logger.debug('reloaded (file may have changed): %s' % self.abspath)
886 return file_changed
888 def use_data(self):
889 if not self.data_loaded:
890 raise Exception('Data not loaded')
891 self.data_use_count += 1
893 def drop_data(self):
894 if self.data_loaded:
895 if self.data_use_count == 1:
896 logger.debug('forgetting data of file: %s' % self.abspath)
897 for tr in self.traces:
898 tr.drop_data()
900 self.data_loaded = False
902 self.data_use_count -= 1
903 else:
904 self.data_use_count = 0
906 def reload_if_modified(self):
907 mtime = os.stat(self.abspath)[8]
908 if mtime != self.mtime:
909 logger.debug(
910 'mtime=%i, reloading file: %s' % (mtime, self.abspath))
912 self.mtime = mtime
913 if self.data_loaded:
914 self.load_data(force=True)
915 else:
916 self.load_headers()
918 return True
920 return False
922 def iter_traces(self):
923 for tr in self.traces:
924 yield tr
926 def gather_keys(self, gather, selector=None):
927 keys = set()
928 for tr in self.by_tmin:
929 if selector is None or selector(tr):
930 keys.add(gather(tr))
932 return keys
934 def __str__(self):
935 s = 'TracesFile\n'
936 s += 'abspath: %s\n' % self.abspath
937 s += 'file mtime: %s\n' % util.time_to_str(self.mtime)
938 s += 'number of traces: %i\n' % len(self.traces)
939 s += 'timerange: %s - %s\n' % (
940 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
941 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
942 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
943 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
944 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
945 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
946 return s
949class FilenameAttributeError(Exception):
950 pass
953class SubPile(TracesGroup):
954 def __init__(self, parent):
955 TracesGroup.__init__(self, parent)
956 self.files = []
957 self.empty()
959 def add_file(self, file):
960 self.files.append(file)
961 file.set_parent(self)
962 self.add(file)
964 def remove_file(self, file):
965 self.files.remove(file)
966 file.set_parent(None)
967 self.remove(file)
969 def remove_files(self, files):
970 for file in files:
971 self.files.remove(file)
972 file.set_parent(None)
973 self.remove(files)
975 def gather_keys(self, gather, selector=None):
976 keys = set()
977 for file in self.files:
978 keys |= file.gather_keys(gather, selector)
980 return keys
982 def iter_traces(
983 self,
984 load_data=False,
985 return_abspath=False,
986 group_selector=None,
987 trace_selector=None):
989 for file in self.files:
991 if group_selector and not group_selector(file):
992 continue
994 must_drop = False
995 if load_data:
996 file.load_data()
997 file.use_data()
998 must_drop = True
1000 for tr in file.iter_traces():
1001 if trace_selector and not trace_selector(tr):
1002 continue
1004 if return_abspath:
1005 yield file.abspath, tr
1006 else:
1007 yield tr
1009 if must_drop:
1010 file.drop_data()
1012 def iter_files(self):
1013 for file in self.files:
1014 yield file
1016 def reload_modified(self):
1017 modified = False
1018 for file in self.files:
1019 modified |= file.reload_if_modified()
1021 return modified
1023 def __str__(self):
1024 s = 'SubPile\n'
1025 s += 'number of files: %i\n' % len(self.files)
1026 s += 'timerange: %s - %s\n' % (
1027 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
1028 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
1029 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
1030 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
1031 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
1032 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
1033 return s
1036class Batch(object):
1037 '''
1038 Batch of waveforms from window wise data extraction.
1040 Encapsulates state and results yielded for each window in window wise
1041 waveform extraction with the :py:meth:`Pile.chopper` method (when the
1042 `style='batch'` keyword argument set).
1044 *Attributes:*
1046 .. py:attribute:: tmin
1048 Start of this time window.
1050 .. py:attribute:: tmax
1052 End of this time window.
1054 .. py:attribute:: i
1056 Index of this time window in sequence.
1058 .. py:attribute:: n
1060 Total number of time windows in sequence.
1062 .. py:attribute:: traces
1064 Extracted waveforms for this time window.
1065 '''
1067 def __init__(self, tmin, tmax, i, n, traces):
1068 self.tmin = tmin
1069 self.tmax = tmax
1070 self.i = i
1071 self.n = n
1072 self.traces = traces
1075class Pile(TracesGroup):
1076 '''
1077 Waveform archive lookup, data loading and caching infrastructure.
1078 '''
1080 def __init__(self):
1081 TracesGroup.__init__(self, None)
1082 self.subpiles = {}
1083 self.open_files = {}
1084 self.listeners = []
1085 self.abspaths = set()
1087 def add_listener(self, obj):
1088 self.listeners.append(util.smart_weakref(obj))
1090 def notify_listeners(self, what, content):
1091 for ref in self.listeners:
1092 obj = ref()
1093 if obj:
1094 obj(what, content)
1096 def load_files(
1097 self, filenames,
1098 filename_attributes=None,
1099 fileformat='mseed',
1100 cache=None,
1101 show_progress=True,
1102 update_progress=None):
1104 load = loader(
1105 filenames, fileformat, cache, filename_attributes,
1106 show_progress=show_progress,
1107 update_progress=update_progress)
1109 self.add_files(load)
1111 def add_files(self, files):
1112 for file in files:
1113 self.add_file(file)
1115 def add_file(self, file):
1116 if file.abspath is not None and file.abspath in self.abspaths:
1117 logger.warning('File already in pile: %s' % file.abspath)
1118 return
1120 if file.deltatmin is None:
1121 logger.warning('Sampling rate of all traces are zero in file: %s' %
1122 file.abspath)
1123 return
1125 subpile = self.dispatch(file)
1126 subpile.add_file(file)
1127 if file.abspath is not None:
1128 self.abspaths.add(file.abspath)
1130 def remove_file(self, file):
1131 subpile = file.get_parent()
1132 if subpile is not None:
1133 subpile.remove_file(file)
1134 if file.abspath is not None:
1135 self.abspaths.remove(file.abspath)
1137 def remove_files(self, files):
1138 subpile_files = {}
1139 for file in files:
1140 subpile = file.get_parent()
1141 if subpile not in subpile_files:
1142 subpile_files[subpile] = []
1144 subpile_files[subpile].append(file)
1146 for subpile, files in subpile_files.items():
1147 subpile.remove_files(files)
1148 for file in files:
1149 if file.abspath is not None:
1150 self.abspaths.remove(file.abspath)
1152 def dispatch_key(self, file):
1153 dt = int(math.floor(math.log(file.deltatmin)))
1154 return dt
1156 def dispatch(self, file):
1157 k = self.dispatch_key(file)
1158 if k not in self.subpiles:
1159 self.subpiles[k] = SubPile(self)
1161 return self.subpiles[k]
1163 def get_deltats(self):
1164 return list(self.deltats.keys())
1166 def chop(
1167 self, tmin, tmax,
1168 group_selector=None,
1169 trace_selector=None,
1170 snap=(round, round),
1171 include_last=False,
1172 load_data=True):
1174 chopped = []
1175 used_files = set()
1177 traces = self.relevant(tmin, tmax, group_selector, trace_selector)
1178 if load_data:
1179 files_changed = False
1180 for tr in traces:
1181 if tr.file and tr.file not in used_files:
1182 if tr.file.load_data():
1183 files_changed = True
1185 if tr.file is not None:
1186 used_files.add(tr.file)
1188 if files_changed:
1189 traces = self.relevant(
1190 tmin, tmax, group_selector, trace_selector)
1192 for tr in traces:
1193 if not load_data and tr.ydata is not None:
1194 tr = tr.copy(data=False)
1195 tr.ydata = None
1197 try:
1198 chopped.append(tr.chop(
1199 tmin, tmax,
1200 inplace=False,
1201 snap=snap,
1202 include_last=include_last))
1204 except trace.NoData:
1205 pass
1207 return chopped, used_files
1209 def _process_chopped(
1210 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
1211 tpad):
1213 chopped.sort(key=lambda a: a.full_id)
1214 if degap:
1215 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap)
1217 if not want_incomplete:
1218 chopped_weeded = []
1219 for tr in chopped:
1220 emin = tr.tmin - (wmin-tpad)
1221 emax = tr.tmax + tr.deltat - (wmax+tpad)
1222 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
1223 chopped_weeded.append(tr)
1225 elif degap:
1226 if (0. < emin <= 5. * tr.deltat and
1227 -5. * tr.deltat <= emax < 0.):
1229 tr.extend(
1230 wmin-tpad,
1231 wmax+tpad-tr.deltat,
1232 fillmethod='repeat')
1234 chopped_weeded.append(tr)
1236 chopped = chopped_weeded
1238 for tr in chopped:
1239 tr.wmin = wmin
1240 tr.wmax = wmax
1242 return chopped
1244 def chopper(
1245 self,
1246 tmin=None, tmax=None, tinc=None, tpad=0.,
1247 group_selector=None, trace_selector=None,
1248 want_incomplete=True, degap=True, maxgap=5, maxlap=None,
1249 keep_current_files_open=False, accessor_id=None,
1250 snap=(round, round), include_last=False, load_data=True,
1251 style=None):
1253 '''
1254 Get iterator for shifting window wise data extraction from waveform
1255 archive.
1257 :param tmin: start time (default uses start time of available data)
1258 :param tmax: end time (default uses end time of available data)
1259 :param tinc: time increment (window shift time) (default uses
1260 ``tmax-tmin``)
1261 :param tpad: padding time appended on either side of the data windows
1262 (window overlap is ``2*tpad``)
1263 :param group_selector: filter callback taking :py:class:`TracesGroup`
1264 objects
1265 :param trace_selector: filter callback taking
1266 :py:class:`pyrocko.trace.Trace` objects
1267 :param want_incomplete: if set to ``False``, gappy/incomplete traces
1268 are discarded from the results
1269 :param degap: whether to try to connect traces and to remove gaps and
1270 overlaps
1271 :param maxgap: maximum gap size in samples which is filled with
1272 interpolated samples when ``degap`` is ``True``
1273 :param maxlap: maximum overlap size in samples which is removed when
1274 ``degap`` is ``True``
1275 :param keep_current_files_open: whether to keep cached trace data in
1276 memory after the iterator has ended
1277 :param accessor_id: if given, used as a key to identify different
1278 points of extraction for the decision of when to release cached
1279 trace data (should be used when data is alternately extracted from
1280 more than one region / selection)
1281 :param snap: replaces Python's :py:func:`round` function which is used
1282 to determine indices where to start and end the trace data array
1283 :param include_last: whether to include last sample
1284 :param load_data: whether to load the waveform data. If set to
1285 ``False``, traces with no data samples, but with correct
1286 meta-information are returned
1287 :param style: set to ``'batch'`` to yield waveforms and information
1288 about the chopper state as :py:class:`Batch` objects. By default
1289 lists of :py:class:`pyrocko.trace.Trace` objects are yielded.
1290 :returns: iterator providing extracted waveforms for each extracted
1291 window. See ``style`` argument for details.
1292 '''
1293 if tmin is None:
1294 if self.tmin is None:
1295 logger.warning("Pile's tmin is not set - pile may be empty.")
1296 return
1297 tmin = self.tmin + tpad
1299 if tmax is None:
1300 if self.tmax is None:
1301 logger.warning("Pile's tmax is not set - pile may be empty.")
1302 return
1303 tmax = self.tmax - tpad
1305 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector):
1306 return
1308 if accessor_id not in self.open_files:
1309 self.open_files[accessor_id] = set()
1311 open_files = self.open_files[accessor_id]
1313 if tinc is None:
1314 tinc = tmax - tmin
1315 nwin = 1
1316 else:
1317 eps = tinc * 1e-6
1318 if tinc != 0.0:
1319 nwin = int(((tmax - eps) - tmin) / tinc) + 1
1320 else:
1321 nwin = 1
1323 for iwin in range(nwin):
1324 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
1326 chopped, used_files = self.chop(
1327 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap,
1328 include_last, load_data)
1330 for file in used_files - open_files:
1331 # increment datause counter on newly opened files
1332 file.use_data()
1334 open_files.update(used_files)
1336 processed = self._process_chopped(
1337 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
1338 tpad)
1340 if style == 'batch':
1341 yield Batch(
1342 tmin=wmin,
1343 tmax=wmax,
1344 i=iwin,
1345 n=nwin,
1346 traces=processed)
1348 else:
1349 yield processed
1351 unused_files = open_files - used_files
1353 while unused_files:
1354 file = unused_files.pop()
1355 file.drop_data()
1356 open_files.remove(file)
1358 if not keep_current_files_open:
1359 while open_files:
1360 file = open_files.pop()
1361 file.drop_data()
1363 def all(self, *args, **kwargs):
1364 '''
1365 Shortcut to aggregate :py:meth:`chopper` output into a single list.
1366 '''
1368 alltraces = []
1369 for traces in self.chopper(*args, **kwargs):
1370 alltraces.extend(traces)
1372 return alltraces
1374 def iter_all(self, *args, **kwargs):
1375 for traces in self.chopper(*args, **kwargs):
1376 for tr in traces:
1377 yield tr
1379 def chopper_grouped(self, gather, progress=None, *args, **kwargs):
1380 keys = self.gather_keys(gather)
1381 if len(keys) == 0:
1382 return
1384 outer_group_selector = None
1385 if 'group_selector' in kwargs:
1386 outer_group_selector = kwargs['group_selector']
1388 outer_trace_selector = None
1389 if 'trace_selector' in kwargs:
1390 outer_trace_selector = kwargs['trace_selector']
1392 # the use of this gather-cache makes it impossible to modify the pile
1393 # during chopping
1394 gather_cache = {}
1395 pbar = None
1396 try:
1397 if progress is not None:
1398 pbar = util.progressbar(progress, len(keys))
1400 for ikey, key in enumerate(keys):
1401 def tsel(tr):
1402 return gather(tr) == key and (
1403 outer_trace_selector is None
1404 or outer_trace_selector(tr))
1406 def gsel(gr):
1407 if gr not in gather_cache:
1408 gather_cache[gr] = gr.gather_keys(gather)
1410 return key in gather_cache[gr] and (
1411 outer_group_selector is None
1412 or outer_group_selector(gr))
1414 kwargs['trace_selector'] = tsel
1415 kwargs['group_selector'] = gsel
1417 for traces in self.chopper(*args, **kwargs):
1418 yield traces
1420 if pbar:
1421 pbar.update(ikey+1)
1423 finally:
1424 if pbar:
1425 pbar.finish()
1427 def gather_keys(self, gather, selector=None):
1428 keys = set()
1429 for subpile in self.subpiles.values():
1430 keys |= subpile.gather_keys(gather, selector)
1432 return sorted(keys)
1434 def iter_traces(
1435 self,
1436 load_data=False,
1437 return_abspath=False,
1438 group_selector=None,
1439 trace_selector=None):
1441 '''
1442 Iterate over all traces in pile.
1444 :param load_data: whether to load the waveform data, by default empty
1445 traces are yielded
1446 :param return_abspath: if ``True`` yield tuples containing absolute
1447 file path and :py:class:`pyrocko.trace.Trace` objects
1448 :param group_selector: filter callback taking :py:class:`TracesGroup`
1449 objects
1450 :param trace_selector: filter callback taking
1451 :py:class:`pyrocko.trace.Trace` objects
1453 Example; yields only traces, where the station code is 'HH1'::
1455 test_pile = pile.make_pile('/local/test_trace_directory')
1456 for t in test_pile.iter_traces(
1457 trace_selector=lambda tr: tr.station=='HH1'):
1459 print t
1460 '''
1462 for subpile in self.subpiles.values():
1463 if not group_selector or group_selector(subpile):
1464 for tr in subpile.iter_traces(load_data, return_abspath,
1465 group_selector, trace_selector):
1466 yield tr
1468 def iter_files(self):
1469 for subpile in self.subpiles.values():
1470 for file in subpile.iter_files():
1471 yield file
1473 def reload_modified(self):
1474 modified = False
1475 for subpile in self.subpiles.values():
1476 modified |= subpile.reload_modified()
1478 return modified
1480 def get_tmin(self):
1481 return self.tmin
1483 def get_tmax(self):
1484 return self.tmax
1486 def get_deltatmin(self):
1487 return self.deltatmin
1489 def get_deltatmax(self):
1490 return self.deltatmax
1492 def is_empty(self):
1493 return self.tmin is None and self.tmax is None
1495 def __str__(self):
1496 if self.tmin is not None and self.tmax is not None:
1497 tmin = util.time_to_str(self.tmin)
1498 tmax = util.time_to_str(self.tmax)
1499 s = 'Pile\n'
1500 s += 'number of subpiles: %i\n' % len(self.subpiles)
1501 s += 'timerange: %s - %s\n' % (tmin, tmax)
1502 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
1503 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
1504 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
1505 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
1506 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
1508 else:
1509 s = 'empty Pile'
1511 return s
1513 def snuffle(self, **kwargs):
1514 '''
1515 Visualize it.
1517 :param stations: list of :py:class:`pyrocko.model.station.Station`
1518 objects or ``None``
1519 :param events: list of :py:class:`pyrocko.model.event.Event` objects or
1520 ``None``
1521 :param markers: list of :py:class:`pyrocko.gui.snuffler.marker.Marker`
1522 objects or ``None``
1523 :param ntracks: float, number of tracks to be shown initially
1524 (default: 12)
1525 :param follow: time interval (in seconds) for real time follow mode or
1526 ``None``
1527 :param controls: bool, whether to show the main controls (default:
1528 ``True``)
1529 :param opengl: bool, whether to use opengl (default: ``False``)
1530 '''
1532 from pyrocko.gui.snuffler.snuffler import snuffle
1533 snuffle(self, **kwargs)
1536def make_pile(
1537 paths=None, selector=None, regex=None,
1538 fileformat='mseed',
1539 cachedirname=None, show_progress=True):
1541 '''
1542 Create pile from given file and directory names.
1544 :param paths: filenames and/or directories to look for traces. If paths is
1545 ``None`` ``sys.argv[1:]`` is used.
1546 :param selector: lambda expression taking group dict of regex match object
1547 as a single argument and which returns true or false to keep or reject
1548 a file
1549 :param regex: regular expression which filenames have to match
1550 :param fileformat: format of the files ('mseed', 'sac', 'kan',
1551 'from_extension', 'detect')
1552 :param cachedirname: loader cache is stored under this directory. It is
1553 created as neccessary.
1554 :param show_progress: show progress bar and other progress information
1555 '''
1557 if show_progress_force_off:
1558 show_progress = False
1560 if isinstance(paths, str):
1561 paths = [paths]
1563 if paths is None:
1564 paths = sys.argv[1:]
1566 if cachedirname is None:
1567 cachedirname = config.config().cache_dir
1569 fns = util.select_files(
1570 paths, include=regex, selector=selector, show_progress=show_progress)
1572 cache = get_cache(cachedirname)
1573 p = Pile()
1574 p.load_files(
1575 sorted(fns),
1576 cache=cache,
1577 fileformat=fileformat,
1578 show_progress=show_progress)
1580 return p
1583class Injector(trace.States):
1585 def __init__(
1586 self, pile,
1587 fixation_length=None,
1588 path=None,
1589 format='from_extension',
1590 forget_fixed=False):
1592 trace.States.__init__(self)
1593 self._pile = pile
1594 self._fixation_length = fixation_length
1595 self._format = format
1596 self._path = path
1597 self._forget_fixed = forget_fixed
1599 def set_fixation_length(self, length):
1600 '''
1601 Set length after which the fixation method is called on buffer traces.
1603 The length should be given in seconds. Give None to disable.
1604 '''
1605 self.fixate_all()
1606 self._fixation_length = length # in seconds
1608 def set_save_path(
1609 self,
1610 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_'
1611 '%(tmin)s_%(tmax)s.mseed'):
1613 self.fixate_all()
1614 self._path = path
1616 def inject(self, trace):
1617 logger.debug('Received a trace: %s' % trace)
1619 buf = self.get(trace)
1620 if buf is None:
1621 trbuf = trace.copy()
1622 buf = MemTracesFile(None, [trbuf])
1623 self._pile.add_file(buf)
1624 self.set(trace, buf)
1626 else:
1627 self._pile.remove_file(buf)
1628 trbuf = buf.get_traces()[0]
1629 buf.remove(trbuf)
1630 trbuf.append(trace.ydata)
1631 buf.add(trbuf)
1632 self._pile.add_file(buf)
1633 self.set(trace, buf)
1635 trbuf = buf.get_traces()[0]
1636 if self._fixation_length is not None:
1637 if trbuf.tmax - trbuf.tmin > self._fixation_length:
1638 self._fixate(buf, complete=False)
1640 def fixate_all(self):
1641 for state in list(self._states.values()):
1642 self._fixate(state[-1])
1644 self._states = {}
1646 def free(self, buf):
1647 self._fixate(buf)
1649 def _fixate(self, buf, complete=True):
1650 trbuf = buf.get_traces()[0]
1651 del_state = True
1652 if self._path:
1653 if self._fixation_length is not None:
1654 ttmin = trbuf.tmin
1655 ytmin = util.year_start(ttmin)
1656 n = int(math.floor((ttmin - ytmin) / self._fixation_length))
1657 tmin = ytmin + n*self._fixation_length
1658 traces = []
1659 t = tmin
1660 while t <= trbuf.tmax:
1661 try:
1662 traces.append(
1663 trbuf.chop(
1664 t,
1665 t+self._fixation_length,
1666 inplace=False,
1667 snap=(math.ceil, math.ceil)))
1669 except trace.NoData:
1670 pass
1671 t += self._fixation_length
1673 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \
1674 trbuf.deltat/100. or complete:
1676 self._pile.remove_file(buf)
1678 else: # reinsert incomplete last part
1679 new_trbuf = traces.pop()
1680 self._pile.remove_file(buf)
1681 buf.remove(trbuf)
1682 buf.add(new_trbuf)
1683 self._pile.add_file(buf)
1684 del_state = False
1686 else:
1687 traces = [trbuf]
1688 self._pile.remove_file(buf)
1690 fns = io.save(traces, self._path, format=self._format)
1692 if not self._forget_fixed:
1693 self._pile.load_files(
1694 fns, show_progress=False, fileformat=self._format)
1696 if del_state:
1697 del self._states[trbuf.nslc_id]
1699 def __del__(self):
1700 self.fixate_all()