1# https://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import os
7import logging
8import time
9import copy
10import re
11import sys
12import operator
13import math
14import hashlib
15try:
16 import cPickle as pickle
17except ImportError:
18 import pickle
21from . import avl
22from . import trace, io, util
23from . import config
24from .trace import degapper
27is_windows = sys.platform.startswith('win')
28show_progress_force_off = False
29version_salt = 'v1-'
32def ehash(s):
33 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest()
36def cmp(a, b):
37 return int(a > b) - int(a < b)
40def sl(s):
41 return [str(x) for x in sorted(s)]
44class Counter(dict):
46 def __missing__(self, k):
47 return 0
49 def update(self, other):
50 for k, v in other.items():
51 self[k] += v
53 def subtract(self, other):
54 for k, v in other.items():
55 self[k] -= v
56 if self[k] <= 0:
57 del self[k]
59 def subtract1(self, k):
60 self[k] -= 1
61 if self[k] <= 0:
62 del self[k]
65def fix_unicode_copy(counter, func):
66 counter_new = Counter()
67 for k in counter:
68 counter_new[func(k)] = counter[k]
69 return counter_new
72pjoin = os.path.join
73logger = logging.getLogger('pyrocko.pile')
76def avl_remove_exact(avltree, element):
77 ilo, ihi = avltree.span(element)
78 for i in range(ilo, ihi):
79 if avltree[i] is element:
80 avltree.remove_at(i)
81 return i
83 raise ValueError(
84 'avl_remove_exact(avltree, element): element not in avltree')
87def cmpfunc(key):
88 if isinstance(key, str):
89 # special cases; these run about 50% faster than the generic one on
90 # Python 2.5
91 if key == 'tmin':
92 return lambda a, b: cmp(a.tmin, b.tmin)
93 if key == 'tmax':
94 return lambda a, b: cmp(a.tmax, b.tmax)
96 key = operator.attrgetter(key)
98 return lambda a, b: cmp(key(a), key(b))
101g_dummys = {}
104def get_dummy(key):
105 if key not in g_dummys:
106 class Dummy(object):
107 def __init__(self, k):
108 setattr(self, key, k)
110 g_dummys[key] = Dummy
112 return g_dummys[key]
115class TooMany(Exception):
116 def __init__(self, n):
117 Exception.__init__(self)
118 self.n = n
121class Sorted(object):
122 def __init__(self, values=[], key=None):
123 self._set_key(key)
124 self._avl = avl.new(values, self._cmp)
126 def _set_key(self, key):
127 self._key = key
128 self._cmp = cmpfunc(key)
129 if isinstance(key, str):
130 self._dummy = get_dummy(key)
132 def __getstate__(self):
133 state = list(self._avl.iter()), self._key
134 return state
136 def __setstate__(self, state):
137 it, key = state
138 self._set_key(key)
139 self._avl = avl.from_iter(iter(it), len(it))
141 def insert(self, value):
142 self._avl.insert(value)
144 def remove(self, value):
145 return avl_remove_exact(self._avl, value)
147 def remove_at(self, i):
148 return self._avl.remove_at(i)
150 def insert_many(self, values):
151 for value in values:
152 self._avl.insert(value)
154 def remove_many(self, values):
155 for value in values:
156 avl_remove_exact(self._avl, value)
158 def __iter__(self):
159 return iter(self._avl)
161 def with_key_in(self, kmin, kmax):
162 omin, omax = self._dummy(kmin), self._dummy(kmax)
163 ilo, ihi = self._avl.span(omin, omax)
164 return self._avl[ilo:ihi]
166 def with_key_in_limited(self, kmin, kmax, nmax):
167 omin, omax = self._dummy(kmin), self._dummy(kmax)
168 ilo, ihi = self._avl.span(omin, omax)
169 if ihi - ilo > nmax:
170 raise TooMany(ihi - ilo)
172 return self._avl[ilo:ihi]
174 def index(self, value):
175 ilo, ihi = self._avl.span(value)
176 for i in range(ilo, ihi):
177 if self._avl[i] is value:
178 return i
180 raise ValueError('element is not in avl tree')
182 def min(self):
183 return self._avl.min()
185 def max(self):
186 return self._avl.max()
188 def __len__(self):
189 return len(self._avl)
191 def __getitem__(self, i):
192 return self._avl[i]
195class TracesFileCache(object):
196 '''
197 Manages trace metainformation cache.
199 For each directory with files containing traces, one cache file is
200 maintained to hold the trace metainformation of all files which are
201 contained in the directory.
202 '''
204 caches = {}
206 def __init__(self, cachedir):
207 '''
208 Create new cache.
210 :param cachedir: directory to hold the cache files.
211 '''
213 self.cachedir = cachedir
214 self.dircaches = {}
215 self.modified = set()
216 util.ensuredir(self.cachedir)
218 def get(self, abspath):
219 '''
220 Try to get an item from the cache.
222 :param abspath: absolute path of the object to retrieve
224 :returns: a stored object is returned or None if nothing could be
225 found.
226 '''
228 dircache = self._get_dircache_for(abspath)
229 if abspath in dircache:
230 return dircache[abspath]
231 return None
233 def put(self, abspath, tfile):
234 '''
235 Put an item into the cache.
237 :param abspath: absolute path of the object to be stored
238 :param tfile: object to be stored
239 '''
241 cachepath = self._dircachepath(abspath)
242 # get lock on cachepath here
243 dircache = self._get_dircache(cachepath)
244 dircache[abspath] = tfile
245 self.modified.add(cachepath)
247 def dump_modified(self):
248 '''
249 Save any modifications to disk.
250 '''
252 for cachepath in self.modified:
253 self._dump_dircache(self.dircaches[cachepath], cachepath)
254 # unlock
256 self.modified = set()
258 def clean(self):
259 '''
260 Weed out missing files from the disk caches.
261 '''
263 self.dump_modified()
265 for fn in os.listdir(self.cachedir):
266 if len(fn) == 40:
267 cache = self._load_dircache(pjoin(self.cachedir, fn))
268 self._dump_dircache(cache, pjoin(self.cachedir, fn))
270 def _get_dircache_for(self, abspath):
271 return self._get_dircache(self._dircachepath(abspath))
273 def _get_dircache(self, cachepath):
274 if cachepath not in self.dircaches:
275 if os.path.isfile(cachepath):
276 self.dircaches[cachepath] = self._load_dircache(cachepath)
277 else:
278 self.dircaches[cachepath] = {}
280 return self.dircaches[cachepath]
282 def _dircachepath(self, abspath):
283 cachefn = ehash(os.path.dirname(abspath))
284 return pjoin(self.cachedir, cachefn)
286 def _load_dircache(self, cachefilename):
288 with open(cachefilename, 'rb') as f:
289 cache = pickle.load(f)
291 # weed out files which no longer exist
292 for fn in list(cache.keys()):
293 if not os.path.isfile(fn):
294 del cache[fn]
296 time_float = util.get_time_float()
298 for v in cache.values():
299 v.trees_from_content(v.traces)
300 for tr in v.traces:
301 tr.file = v
302 # fix Py2 codes to not include unicode when the cache file
303 # was created with Py3
304 if not isinstance(tr.station, str):
305 tr.prune_from_reuse_cache()
306 tr.set_codes(
307 str(tr.network),
308 str(tr.station),
309 str(tr.location),
310 str(tr.channel))
312 tr.tmin = time_float(tr.tmin)
313 tr.tmax = time_float(tr.tmax)
315 v.data_use_count = 0
316 v.data_loaded = False
317 v.fix_unicode_codes()
319 return cache
321 def _dump_dircache(self, cache, cachefilename):
323 if not cache:
324 if os.path.exists(cachefilename):
325 os.remove(cachefilename)
326 return
328 # make a copy without the parents and the binsearch trees
329 cache_copy = {}
330 for fn in cache.keys():
331 trf = copy.copy(cache[fn])
332 trf.parent = None
333 trf.by_tmin = None
334 trf.by_tmax = None
335 trf.by_tlen = None
336 trf.by_mtime = None
337 trf.data_use_count = 0
338 trf.data_loaded = False
339 traces = []
340 for tr in trf.traces:
341 tr = tr.copy(data=False)
342 tr.ydata = None
343 tr.meta = None
344 tr.file = trf
345 traces.append(tr)
347 trf.traces = traces
349 cache_copy[fn] = trf
351 tmpfn = cachefilename+'.%i.tmp' % os.getpid()
352 with open(tmpfn, 'wb') as f:
353 pickle.dump(cache_copy, f, protocol=2)
355 if is_windows and os.path.exists(cachefilename):
356 # windows doesn't allow to rename over existing file
357 os.unlink(cachefilename)
359 os.rename(tmpfn, cachefilename)
362def get_cache(cachedir):
363 '''
364 Get global TracesFileCache object for given directory.
365 '''
366 if cachedir not in TracesFileCache.caches:
367 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir)
369 return TracesFileCache.caches[cachedir]
372def loader(
373 filenames, fileformat, cache, filename_attributes,
374 show_progress=True, update_progress=None):
376 if show_progress_force_off:
377 show_progress = False
379 class Progress(object):
380 def __init__(self, label, n):
381 self._label = label
382 self._n = n
383 self._bar = None
384 if show_progress:
385 self._bar = util.progressbar(label, self._n)
387 if update_progress:
388 update_progress(label, 0, self._n)
390 def update(self, i):
391 if self._bar:
392 if i < self._n-1:
393 self._bar.update(i)
394 else:
395 self._bar.finish()
396 self._bar = None
398 abort = False
399 if update_progress:
400 abort = update_progress(self._label, i, self._n)
402 return abort
404 def finish(self):
405 if self._bar:
406 self._bar.finish()
407 self._bar = None
409 if not filenames:
410 logger.warning('No files to load from')
411 return
413 regex = None
414 if filename_attributes:
415 regex = re.compile(filename_attributes)
417 try:
418 progress = Progress('Looking at files', len(filenames))
420 failures = []
421 to_load = []
422 for i, filename in enumerate(filenames):
423 try:
424 abspath = os.path.abspath(filename)
426 substitutions = None
427 if regex:
428 m = regex.search(filename)
429 if not m:
430 raise FilenameAttributeError(
431 "Cannot get attributes with pattern '%s' "
432 "from path '%s'" % (filename_attributes, filename))
434 substitutions = {}
435 for k in m.groupdict():
436 if k in ('network', 'station', 'location', 'channel'):
437 substitutions[k] = m.groupdict()[k]
439 mtime = os.stat(filename)[8]
440 tfile = None
441 if cache:
442 tfile = cache.get(abspath)
444 mustload = (
445 not tfile or
446 (tfile.format != fileformat and fileformat != 'detect') or
447 tfile.mtime != mtime or
448 substitutions is not None)
450 to_load.append(
451 (mustload, mtime, abspath, substitutions, tfile))
453 except (OSError, FilenameAttributeError) as xerror:
454 failures.append(abspath)
455 logger.warning(xerror)
457 abort = progress.update(i+1)
458 if abort:
459 progress.update(len(filenames))
460 return
462 progress.update(len(filenames))
464 to_load.sort(key=lambda x: x[2])
466 nload = len([1 for x in to_load if x[0]])
467 iload = 0
469 count_all = False
470 if nload < 0.01*len(to_load):
471 nload = len(to_load)
472 count_all = True
474 if to_load:
475 progress = Progress('Scanning files', nload)
477 for (mustload, mtime, abspath, substitutions, tfile) in to_load:
478 try:
479 if mustload:
480 tfile = TracesFile(
481 None, abspath, fileformat,
482 substitutions=substitutions, mtime=mtime)
484 if cache and not substitutions:
485 cache.put(abspath, tfile)
487 if not count_all:
488 iload += 1
490 if count_all:
491 iload += 1
493 except (io.FileLoadError, OSError) as xerror:
494 failures.append(abspath)
495 logger.warning(xerror)
496 else:
497 yield tfile
499 abort = progress.update(iload+1)
500 if abort:
501 break
503 progress.update(nload)
505 if failures:
506 logger.warning(
507 'The following file%s caused problems and will be ignored:\n' %
508 util.plural_s(len(failures)) + '\n'.join(failures))
510 if cache:
511 cache.dump_modified()
512 finally:
513 progress.finish()
516def tlen(x):
517 return x.tmax-x.tmin
520class TracesGroup(object):
522 '''
523 Trace container base class.
525 Base class for Pile, SubPile, and TracesFile, i.e. anything containing
526 a collection of several traces. A TracesGroup object maintains lookup sets
527 of some of the traces meta-information, as well as a combined time-range
528 of its contents.
529 '''
531 def __init__(self, parent):
532 self.parent = parent
533 self.empty()
534 self.nupdates = 0
535 self.abspath = None
537 def set_parent(self, parent):
538 self.parent = parent
540 def get_parent(self):
541 return self.parent
543 def empty(self):
544 self.networks, self.stations, self.locations, self.channels, \
545 self.nslc_ids, self.deltats = [Counter() for x in range(6)]
546 self.by_tmin = Sorted([], 'tmin')
547 self.by_tmax = Sorted([], 'tmax')
548 self.by_tlen = Sorted([], tlen)
549 self.by_mtime = Sorted([], 'mtime')
550 self.tmin, self.tmax = None, None
551 self.deltatmin, self.deltatmax = None, None
553 def trees_from_content(self, content):
554 self.by_tmin = Sorted(content, 'tmin')
555 self.by_tmax = Sorted(content, 'tmax')
556 self.by_tlen = Sorted(content, tlen)
557 self.by_mtime = Sorted(content, 'mtime')
558 self.adjust_minmax()
560 def fix_unicode_codes(self):
561 for net in self.networks:
562 if isinstance(net, str):
563 return
565 self.networks = fix_unicode_copy(self.networks, str)
566 self.stations = fix_unicode_copy(self.stations, str)
567 self.locations = fix_unicode_copy(self.locations, str)
568 self.channels = fix_unicode_copy(self.channels, str)
569 self.nslc_ids = fix_unicode_copy(
570 self.nslc_ids, lambda k: tuple(str(x) for x in k))
572 def add(self, content):
573 '''
574 Add content to traces group and update indices.
576 Accepts :py:class:`pyrocko.trace.Trace` objects and
577 :py:class:`pyrocko.pile.TracesGroup` objects.
578 '''
580 if isinstance(content, (trace.Trace, TracesGroup)):
581 content = [content]
583 for c in content:
585 if isinstance(c, TracesGroup):
586 self.networks.update(c.networks)
587 self.stations.update(c.stations)
588 self.locations.update(c.locations)
589 self.channels.update(c.channels)
590 self.nslc_ids.update(c.nslc_ids)
591 self.deltats.update(c.deltats)
593 self.by_tmin.insert_many(c.by_tmin)
594 self.by_tmax.insert_many(c.by_tmax)
595 self.by_tlen.insert_many(c.by_tlen)
596 self.by_mtime.insert_many(c.by_mtime)
598 elif isinstance(c, trace.Trace):
599 self.networks[c.network] += 1
600 self.stations[c.station] += 1
601 self.locations[c.location] += 1
602 self.channels[c.channel] += 1
603 self.nslc_ids[c.nslc_id] += 1
604 self.deltats[c.deltat] += 1
606 self.by_tmin.insert(c)
607 self.by_tmax.insert(c)
608 self.by_tlen.insert(c)
609 self.by_mtime.insert(c)
611 self.adjust_minmax()
613 self.nupdates += 1
614 self.notify_listeners('add', content)
616 if self.parent is not None:
617 self.parent.add(content)
619 def remove(self, content):
620 '''
621 Remove content to traces group and update indices.
622 '''
623 if isinstance(content, (trace.Trace, TracesGroup)):
624 content = [content]
626 for c in content:
628 if isinstance(c, TracesGroup):
629 self.networks.subtract(c.networks)
630 self.stations.subtract(c.stations)
631 self.locations.subtract(c.locations)
632 self.channels.subtract(c.channels)
633 self.nslc_ids.subtract(c.nslc_ids)
634 self.deltats.subtract(c.deltats)
636 self.by_tmin.remove_many(c.by_tmin)
637 self.by_tmax.remove_many(c.by_tmax)
638 self.by_tlen.remove_many(c.by_tlen)
639 self.by_mtime.remove_many(c.by_mtime)
641 elif isinstance(c, trace.Trace):
642 self.networks.subtract1(c.network)
643 self.stations.subtract1(c.station)
644 self.locations.subtract1(c.location)
645 self.channels.subtract1(c.channel)
646 self.nslc_ids.subtract1(c.nslc_id)
647 self.deltats.subtract1(c.deltat)
649 self.by_tmin.remove(c)
650 self.by_tmax.remove(c)
651 self.by_tlen.remove(c)
652 self.by_mtime.remove(c)
654 self.adjust_minmax()
656 self.nupdates += 1
657 self.notify_listeners('remove', content)
659 if self.parent is not None:
660 self.parent.remove(content)
662 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None):
663 '''
664 Return list of :py:class:`pyrocko.trace.Trace` objects where given
665 arguments ``tmin`` and ``tmax`` match.
667 :param tmin: start time
668 :param tmax: end time
669 :param group_selector: lambda expression taking group dict of regex
670 match object as a single argument and which returns true or false
671 to keep or reject a file (default: ``None``)
672 :param trace_selector: lambda expression taking group dict of regex
673 match object as a single argument and which returns true or false
674 to keep or reject a file (default: ``None``)
675 '''
677 if not self.by_tmin or not self.is_relevant(
678 tmin, tmax, group_selector):
680 return []
682 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax)
683 if tr.is_relevant(tmin, tmax, trace_selector)]
685 def adjust_minmax(self):
686 if self.by_tmin:
687 self.tmin = self.by_tmin.min().tmin
688 self.tmax = self.by_tmax.max().tmax
689 t = self.by_tlen.max()
690 self.tlenmax = t.tmax - t.tmin
691 self.mtime = self.by_mtime.max().mtime
692 deltats = list(self.deltats.keys())
693 self.deltatmin = min(deltats)
694 self.deltatmax = max(deltats)
695 else:
696 self.tmin = None
697 self.tmax = None
698 self.tlenmax = None
699 self.mtime = None
700 self.deltatmin = None
701 self.deltatmax = None
703 def notify_listeners(self, what, content):
704 pass
706 def get_update_count(self):
707 return self.nupdates
709 def overlaps(self, tmin, tmax):
710 return self.tmin is not None \
711 and tmax >= self.tmin and self.tmax >= tmin
713 def is_relevant(self, tmin, tmax, group_selector=None):
714 if self.tmin is None or self.tmax is None:
715 return False
716 return tmax >= self.tmin and self.tmax >= tmin and (
717 group_selector is None or group_selector(self))
720class MemTracesFile(TracesGroup):
722 '''
723 This is needed to make traces without an actual disc file to be inserted
724 into a Pile.
725 '''
727 def __init__(self, parent, traces):
728 TracesGroup.__init__(self, parent)
729 self.add(traces)
730 self.mtime = time.time()
732 def add(self, traces):
733 if isinstance(traces, trace.Trace):
734 traces = [traces]
736 for tr in traces:
737 tr.file = self
739 TracesGroup.add(self, traces)
741 def load_headers(self, mtime=None):
742 pass
744 def load_data(self):
745 pass
747 def use_data(self):
748 pass
750 def drop_data(self):
751 pass
753 def reload_if_modified(self):
754 return False
756 def iter_traces(self):
757 for tr in self.by_tmin:
758 yield tr
760 def get_traces(self):
761 return list(self.by_tmin)
763 def gather_keys(self, gather, selector=None):
764 keys = set()
765 for tr in self.by_tmin:
766 if selector is None or selector(tr):
767 keys.add(gather(tr))
769 return keys
771 def __str__(self):
773 s = 'MemTracesFile\n'
774 s += 'file mtime: %s\n' % util.time_to_str(self.mtime)
775 s += 'number of traces: %i\n' % len(self.by_tmin)
776 s += 'timerange: %s - %s\n' % (
777 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
778 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
779 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
780 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
781 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
782 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
783 return s
786class TracesFile(TracesGroup):
787 def __init__(
788 self, parent, abspath, format,
789 substitutions=None, mtime=None):
791 TracesGroup.__init__(self, parent)
792 self.abspath = abspath
793 self.format = format
794 self.traces = []
795 self.data_loaded = False
796 self.data_use_count = 0
797 self.substitutions = substitutions
798 self.load_headers(mtime=mtime)
799 self.mtime = mtime
801 def load_headers(self, mtime=None):
802 logger.debug('loading headers from file: %s' % self.abspath)
803 if mtime is None:
804 self.mtime = os.stat(self.abspath)[8]
806 def kgen(tr):
807 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id
809 self.remove(self.traces)
810 ks = set()
811 for tr in io.load(self.abspath,
812 format=self.format,
813 getdata=False,
814 substitutions=self.substitutions):
816 k = kgen(tr)
817 if k not in ks:
818 ks.add(k)
819 self.traces.append(tr)
820 tr.file = self
822 self.add(self.traces)
824 self.data_loaded = False
825 self.data_use_count = 0
827 def load_data(self, force=False):
828 file_changed = False
829 if not self.data_loaded or force:
830 logger.debug('loading data from file: %s' % self.abspath)
832 def kgen(tr):
833 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id
835 traces_ = io.load(self.abspath, format=self.format, getdata=True,
836 substitutions=self.substitutions)
838 # prevent adding duplicate snippets from corrupt mseed files
839 k_loaded = set()
840 traces = []
841 for tr in traces_:
842 k = kgen(tr)
843 if k not in k_loaded:
844 k_loaded.add(k)
845 traces.append(tr)
847 k_current_d = dict((kgen(tr), tr) for tr in self.traces)
848 k_current = set(k_current_d)
849 k_new = k_loaded - k_current
850 k_delete = k_current - k_loaded
851 k_unchanged = k_current & k_loaded
853 for tr in self.traces[:]:
854 if kgen(tr) in k_delete:
855 self.remove(tr)
856 self.traces.remove(tr)
857 tr.file = None
858 file_changed = True
860 for tr in traces:
861 if kgen(tr) in k_new:
862 tr.file = self
863 self.traces.append(tr)
864 self.add(tr)
865 file_changed = True
867 for tr in traces:
868 if kgen(tr) in k_unchanged:
869 ctr = k_current_d[kgen(tr)]
870 ctr.ydata = tr.ydata
872 self.data_loaded = True
874 if file_changed:
875 logger.debug('reloaded (file may have changed): %s' % self.abspath)
877 return file_changed
879 def use_data(self):
880 if not self.data_loaded:
881 raise Exception('Data not loaded')
882 self.data_use_count += 1
884 def drop_data(self):
885 if self.data_loaded:
886 if self.data_use_count == 1:
887 logger.debug('forgetting data of file: %s' % self.abspath)
888 for tr in self.traces:
889 tr.drop_data()
891 self.data_loaded = False
893 self.data_use_count -= 1
894 else:
895 self.data_use_count = 0
897 def reload_if_modified(self):
898 mtime = os.stat(self.abspath)[8]
899 if mtime != self.mtime:
900 logger.debug(
901 'mtime=%i, reloading file: %s' % (mtime, self.abspath))
903 self.mtime = mtime
904 if self.data_loaded:
905 self.load_data(force=True)
906 else:
907 self.load_headers()
909 return True
911 return False
913 def iter_traces(self):
914 for tr in self.traces:
915 yield tr
917 def gather_keys(self, gather, selector=None):
918 keys = set()
919 for tr in self.by_tmin:
920 if selector is None or selector(tr):
921 keys.add(gather(tr))
923 return keys
925 def __str__(self):
926 s = 'TracesFile\n'
927 s += 'abspath: %s\n' % self.abspath
928 s += 'file mtime: %s\n' % util.time_to_str(self.mtime)
929 s += 'number of traces: %i\n' % len(self.traces)
930 s += 'timerange: %s - %s\n' % (
931 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
932 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
933 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
934 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
935 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
936 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
937 return s
940class FilenameAttributeError(Exception):
941 pass
944class SubPile(TracesGroup):
945 def __init__(self, parent):
946 TracesGroup.__init__(self, parent)
947 self.files = []
948 self.empty()
950 def add_file(self, file):
951 self.files.append(file)
952 file.set_parent(self)
953 self.add(file)
955 def remove_file(self, file):
956 self.files.remove(file)
957 file.set_parent(None)
958 self.remove(file)
960 def remove_files(self, files):
961 for file in files:
962 self.files.remove(file)
963 file.set_parent(None)
964 self.remove(files)
966 def gather_keys(self, gather, selector=None):
967 keys = set()
968 for file in self.files:
969 keys |= file.gather_keys(gather, selector)
971 return keys
973 def iter_traces(
974 self,
975 load_data=False,
976 return_abspath=False,
977 group_selector=None,
978 trace_selector=None):
980 for file in self.files:
982 if group_selector and not group_selector(file):
983 continue
985 must_drop = False
986 if load_data:
987 file.load_data()
988 file.use_data()
989 must_drop = True
991 for tr in file.iter_traces():
992 if trace_selector and not trace_selector(tr):
993 continue
995 if return_abspath:
996 yield file.abspath, tr
997 else:
998 yield tr
1000 if must_drop:
1001 file.drop_data()
1003 def iter_files(self):
1004 for file in self.files:
1005 yield file
1007 def reload_modified(self):
1008 modified = False
1009 for file in self.files:
1010 modified |= file.reload_if_modified()
1012 return modified
1014 def __str__(self):
1015 s = 'SubPile\n'
1016 s += 'number of files: %i\n' % len(self.files)
1017 s += 'timerange: %s - %s\n' % (
1018 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
1019 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
1020 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
1021 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
1022 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
1023 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
1024 return s
1027class Batch(object):
1028 '''
1029 Batch of waveforms from window wise data extraction.
1031 Encapsulates state and results yielded for each window in window wise
1032 waveform extraction with the :py:meth:`Pile.chopper` method (when the
1033 `style='batch'` keyword argument set).
1035 *Attributes:*
1037 .. py:attribute:: tmin
1039 Start of this time window.
1041 .. py:attribute:: tmax
1043 End of this time window.
1045 .. py:attribute:: i
1047 Index of this time window in sequence.
1049 .. py:attribute:: n
1051 Total number of time windows in sequence.
1053 .. py:attribute:: traces
1055 Extracted waveforms for this time window.
1056 '''
1058 def __init__(self, tmin, tmax, i, n, traces):
1059 self.tmin = tmin
1060 self.tmax = tmax
1061 self.i = i
1062 self.n = n
1063 self.traces = traces
1066class Pile(TracesGroup):
1067 '''
1068 Waveform archive lookup, data loading and caching infrastructure.
1069 '''
1071 def __init__(self):
1072 TracesGroup.__init__(self, None)
1073 self.subpiles = {}
1074 self.open_files = {}
1075 self.listeners = []
1076 self.abspaths = set()
1078 def add_listener(self, obj):
1079 self.listeners.append(util.smart_weakref(obj))
1081 def notify_listeners(self, what, content):
1082 for ref in self.listeners:
1083 obj = ref()
1084 if obj:
1085 obj(what, content)
1087 def load_files(
1088 self, filenames,
1089 filename_attributes=None,
1090 fileformat='mseed',
1091 cache=None,
1092 show_progress=True,
1093 update_progress=None):
1095 load = loader(
1096 filenames, fileformat, cache, filename_attributes,
1097 show_progress=show_progress,
1098 update_progress=update_progress)
1100 self.add_files(load)
1102 def add_files(self, files):
1103 for file in files:
1104 self.add_file(file)
1106 def add_file(self, file):
1107 if file.abspath is not None and file.abspath in self.abspaths:
1108 logger.warning('File already in pile: %s' % file.abspath)
1109 return
1111 if file.deltatmin is None:
1112 logger.warning('Sampling rate of all traces are zero in file: %s' %
1113 file.abspath)
1114 return
1116 subpile = self.dispatch(file)
1117 subpile.add_file(file)
1118 if file.abspath is not None:
1119 self.abspaths.add(file.abspath)
1121 def remove_file(self, file):
1122 subpile = file.get_parent()
1123 if subpile is not None:
1124 subpile.remove_file(file)
1125 if file.abspath is not None:
1126 self.abspaths.remove(file.abspath)
1128 def remove_files(self, files):
1129 subpile_files = {}
1130 for file in files:
1131 subpile = file.get_parent()
1132 if subpile not in subpile_files:
1133 subpile_files[subpile] = []
1135 subpile_files[subpile].append(file)
1137 for subpile, files in subpile_files.items():
1138 subpile.remove_files(files)
1139 for file in files:
1140 if file.abspath is not None:
1141 self.abspaths.remove(file.abspath)
1143 def dispatch_key(self, file):
1144 dt = int(math.floor(math.log(file.deltatmin)))
1145 return dt
1147 def dispatch(self, file):
1148 k = self.dispatch_key(file)
1149 if k not in self.subpiles:
1150 self.subpiles[k] = SubPile(self)
1152 return self.subpiles[k]
1154 def get_deltats(self):
1155 return list(self.deltats.keys())
1157 def chop(
1158 self, tmin, tmax,
1159 group_selector=None,
1160 trace_selector=None,
1161 snap=(round, round),
1162 include_last=False,
1163 load_data=True):
1165 chopped = []
1166 used_files = set()
1168 traces = self.relevant(tmin, tmax, group_selector, trace_selector)
1169 if load_data:
1170 files_changed = False
1171 for tr in traces:
1172 if tr.file and tr.file not in used_files:
1173 if tr.file.load_data():
1174 files_changed = True
1176 if tr.file is not None:
1177 used_files.add(tr.file)
1179 if files_changed:
1180 traces = self.relevant(
1181 tmin, tmax, group_selector, trace_selector)
1183 for tr in traces:
1184 if not load_data and tr.ydata is not None:
1185 tr = tr.copy(data=False)
1186 tr.ydata = None
1188 try:
1189 chopped.append(tr.chop(
1190 tmin, tmax,
1191 inplace=False,
1192 snap=snap,
1193 include_last=include_last))
1195 except trace.NoData:
1196 pass
1198 return chopped, used_files
1200 def _process_chopped(
1201 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
1202 tpad):
1204 chopped.sort(key=lambda a: a.full_id)
1205 if degap:
1206 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap)
1208 if not want_incomplete:
1209 chopped_weeded = []
1210 for tr in chopped:
1211 emin = tr.tmin - (wmin-tpad)
1212 emax = tr.tmax + tr.deltat - (wmax+tpad)
1213 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
1214 chopped_weeded.append(tr)
1216 elif degap:
1217 if (0. < emin <= 5. * tr.deltat and
1218 -5. * tr.deltat <= emax < 0.):
1220 tr.extend(
1221 wmin-tpad,
1222 wmax+tpad-tr.deltat,
1223 fillmethod='repeat')
1225 chopped_weeded.append(tr)
1227 chopped = chopped_weeded
1229 for tr in chopped:
1230 tr.wmin = wmin
1231 tr.wmax = wmax
1233 return chopped
1235 def chopper(
1236 self,
1237 tmin=None, tmax=None, tinc=None, tpad=0.,
1238 group_selector=None, trace_selector=None,
1239 want_incomplete=True, degap=True, maxgap=5, maxlap=None,
1240 keep_current_files_open=False, accessor_id=None,
1241 snap=(round, round), include_last=False, load_data=True,
1242 style=None):
1244 '''
1245 Get iterator for shifting window wise data extraction from waveform
1246 archive.
1248 :param tmin: start time (default uses start time of available data)
1249 :param tmax: end time (default uses end time of available data)
1250 :param tinc: time increment (window shift time) (default uses
1251 ``tmax-tmin``)
1252 :param tpad: padding time appended on either side of the data windows
1253 (window overlap is ``2*tpad``)
1254 :param group_selector: filter callback taking :py:class:`TracesGroup`
1255 objects
1256 :param trace_selector: filter callback taking
1257 :py:class:`pyrocko.trace.Trace` objects
1258 :param want_incomplete: if set to ``False``, gappy/incomplete traces
1259 are discarded from the results
1260 :param degap: whether to try to connect traces and to remove gaps and
1261 overlaps
1262 :param maxgap: maximum gap size in samples which is filled with
1263 interpolated samples when ``degap`` is ``True``
1264 :param maxlap: maximum overlap size in samples which is removed when
1265 ``degap`` is ``True``
1266 :param keep_current_files_open: whether to keep cached trace data in
1267 memory after the iterator has ended
1268 :param accessor_id: if given, used as a key to identify different
1269 points of extraction for the decision of when to release cached
1270 trace data (should be used when data is alternately extracted from
1271 more than one region / selection)
1272 :param snap: replaces Python's :py:func:`round` function which is used
1273 to determine indices where to start and end the trace data array
1274 :param include_last: whether to include last sample
1275 :param load_data: whether to load the waveform data. If set to
1276 ``False``, traces with no data samples, but with correct
1277 meta-information are returned
1278 :param style: set to ``'batch'`` to yield waveforms and information
1279 about the chopper state as :py:class:`Batch` objects. By default
1280 lists of :py:class:`pyrocko.trace.Trace` objects are yielded.
1281 :returns: iterator providing extracted waveforms for each extracted
1282 window. See ``style`` argument for details.
1283 '''
1284 if tmin is None:
1285 if self.tmin is None:
1286 logger.warning("Pile's tmin is not set - pile may be empty.")
1287 return
1288 tmin = self.tmin + tpad
1290 if tmax is None:
1291 if self.tmax is None:
1292 logger.warning("Pile's tmax is not set - pile may be empty.")
1293 return
1294 tmax = self.tmax - tpad
1296 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector):
1297 return
1299 if accessor_id not in self.open_files:
1300 self.open_files[accessor_id] = set()
1302 open_files = self.open_files[accessor_id]
1304 if tinc is None:
1305 tinc = tmax - tmin
1306 nwin = 1
1307 else:
1308 eps = tinc * 1e-6
1309 if tinc != 0.0:
1310 nwin = int(((tmax - eps) - tmin) / tinc) + 1
1311 else:
1312 nwin = 1
1314 for iwin in range(nwin):
1315 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
1317 chopped, used_files = self.chop(
1318 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap,
1319 include_last, load_data)
1321 for file in used_files - open_files:
1322 # increment datause counter on newly opened files
1323 file.use_data()
1325 open_files.update(used_files)
1327 processed = self._process_chopped(
1328 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
1329 tpad)
1331 if style == 'batch':
1332 yield Batch(
1333 tmin=wmin,
1334 tmax=wmax,
1335 i=iwin,
1336 n=nwin,
1337 traces=processed)
1339 else:
1340 yield processed
1342 unused_files = open_files - used_files
1344 while unused_files:
1345 file = unused_files.pop()
1346 file.drop_data()
1347 open_files.remove(file)
1349 if not keep_current_files_open:
1350 while open_files:
1351 file = open_files.pop()
1352 file.drop_data()
1354 def all(self, *args, **kwargs):
1355 '''
1356 Shortcut to aggregate :py:meth:`chopper` output into a single list.
1357 '''
1359 alltraces = []
1360 for traces in self.chopper(*args, **kwargs):
1361 alltraces.extend(traces)
1363 return alltraces
1365 def iter_all(self, *args, **kwargs):
1366 for traces in self.chopper(*args, **kwargs):
1367 for tr in traces:
1368 yield tr
1370 def chopper_grouped(self, gather, progress=None, *args, **kwargs):
1371 keys = self.gather_keys(gather)
1372 if len(keys) == 0:
1373 return
1375 outer_group_selector = None
1376 if 'group_selector' in kwargs:
1377 outer_group_selector = kwargs['group_selector']
1379 outer_trace_selector = None
1380 if 'trace_selector' in kwargs:
1381 outer_trace_selector = kwargs['trace_selector']
1383 # the use of this gather-cache makes it impossible to modify the pile
1384 # during chopping
1385 gather_cache = {}
1386 pbar = None
1387 try:
1388 if progress is not None:
1389 pbar = util.progressbar(progress, len(keys))
1391 for ikey, key in enumerate(keys):
1392 def tsel(tr):
1393 return gather(tr) == key and (
1394 outer_trace_selector is None
1395 or outer_trace_selector(tr))
1397 def gsel(gr):
1398 if gr not in gather_cache:
1399 gather_cache[gr] = gr.gather_keys(gather)
1401 return key in gather_cache[gr] and (
1402 outer_group_selector is None
1403 or outer_group_selector(gr))
1405 kwargs['trace_selector'] = tsel
1406 kwargs['group_selector'] = gsel
1408 for traces in self.chopper(*args, **kwargs):
1409 yield traces
1411 if pbar:
1412 pbar.update(ikey+1)
1414 finally:
1415 if pbar:
1416 pbar.finish()
1418 def gather_keys(self, gather, selector=None):
1419 keys = set()
1420 for subpile in self.subpiles.values():
1421 keys |= subpile.gather_keys(gather, selector)
1423 return sorted(keys)
1425 def iter_traces(
1426 self,
1427 load_data=False,
1428 return_abspath=False,
1429 group_selector=None,
1430 trace_selector=None):
1432 '''
1433 Iterate over all traces in pile.
1435 :param load_data: whether to load the waveform data, by default empty
1436 traces are yielded
1437 :param return_abspath: if ``True`` yield tuples containing absolute
1438 file path and :py:class:`pyrocko.trace.Trace` objects
1439 :param group_selector: filter callback taking :py:class:`TracesGroup`
1440 objects
1441 :param trace_selector: filter callback taking
1442 :py:class:`pyrocko.trace.Trace` objects
1444 Example; yields only traces, where the station code is 'HH1'::
1446 test_pile = pile.make_pile('/local/test_trace_directory')
1447 for t in test_pile.iter_traces(
1448 trace_selector=lambda tr: tr.station=='HH1'):
1450 print t
1451 '''
1453 for subpile in self.subpiles.values():
1454 if not group_selector or group_selector(subpile):
1455 for tr in subpile.iter_traces(load_data, return_abspath,
1456 group_selector, trace_selector):
1457 yield tr
1459 def iter_files(self):
1460 for subpile in self.subpiles.values():
1461 for file in subpile.iter_files():
1462 yield file
1464 def reload_modified(self):
1465 modified = False
1466 for subpile in self.subpiles.values():
1467 modified |= subpile.reload_modified()
1469 return modified
1471 def get_tmin(self):
1472 return self.tmin
1474 def get_tmax(self):
1475 return self.tmax
1477 def get_deltatmin(self):
1478 return self.deltatmin
1480 def get_deltatmax(self):
1481 return self.deltatmax
1483 def is_empty(self):
1484 return self.tmin is None and self.tmax is None
1486 def __str__(self):
1487 if self.tmin is not None and self.tmax is not None:
1488 tmin = util.time_to_str(self.tmin)
1489 tmax = util.time_to_str(self.tmax)
1490 s = 'Pile\n'
1491 s += 'number of subpiles: %i\n' % len(self.subpiles)
1492 s += 'timerange: %s - %s\n' % (tmin, tmax)
1493 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
1494 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
1495 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
1496 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
1497 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
1499 else:
1500 s = 'empty Pile'
1502 return s
1504 def snuffle(self, **kwargs):
1505 '''
1506 Visualize it.
1508 :param stations: list of :py:class:`pyrocko.model.Station` objects or
1509 ``None``
1510 :param events: list of :py:class:`pyrocko.model.Event` objects or
1511 ``None``
1512 :param markers: list of :py:class:`pyrocko.gui.snuffler.marker.Marker`
1513 objects or ``None``
1514 :param ntracks: float, number of tracks to be shown initially
1515 (default: 12)
1516 :param follow: time interval (in seconds) for real time follow mode or
1517 ``None``
1518 :param controls: bool, whether to show the main controls (default:
1519 ``True``)
1520 :param opengl: bool, whether to use opengl (default: ``False``)
1521 '''
1523 from pyrocko.gui.snuffler.snuffler import snuffle
1524 snuffle(self, **kwargs)
1527def make_pile(
1528 paths=None, selector=None, regex=None,
1529 fileformat='mseed',
1530 cachedirname=None, show_progress=True):
1532 '''
1533 Create pile from given file and directory names.
1535 :param paths: filenames and/or directories to look for traces. If paths is
1536 ``None`` ``sys.argv[1:]`` is used.
1537 :param selector: lambda expression taking group dict of regex match object
1538 as a single argument and which returns true or false to keep or reject
1539 a file
1540 :param regex: regular expression which filenames have to match
1541 :param fileformat: format of the files ('mseed', 'sac', 'kan',
1542 'from_extension', 'detect')
1543 :param cachedirname: loader cache is stored under this directory. It is
1544 created as neccessary.
1545 :param show_progress: show progress bar and other progress information
1546 '''
1548 if show_progress_force_off:
1549 show_progress = False
1551 if isinstance(paths, str):
1552 paths = [paths]
1554 if paths is None:
1555 paths = sys.argv[1:]
1557 if cachedirname is None:
1558 cachedirname = config.config().cache_dir
1560 fns = util.select_files(
1561 paths, include=regex, selector=selector, show_progress=show_progress)
1563 cache = get_cache(cachedirname)
1564 p = Pile()
1565 p.load_files(
1566 sorted(fns),
1567 cache=cache,
1568 fileformat=fileformat,
1569 show_progress=show_progress)
1571 return p
1574class Injector(trace.States):
1576 def __init__(
1577 self, pile,
1578 fixation_length=None,
1579 path=None,
1580 format='from_extension',
1581 forget_fixed=False):
1583 trace.States.__init__(self)
1584 self._pile = pile
1585 self._fixation_length = fixation_length
1586 self._format = format
1587 self._path = path
1588 self._forget_fixed = forget_fixed
1590 def set_fixation_length(self, length):
1591 '''
1592 Set length after which the fixation method is called on buffer traces.
1594 The length should be given in seconds. Give None to disable.
1595 '''
1596 self.fixate_all()
1597 self._fixation_length = length # in seconds
1599 def set_save_path(
1600 self,
1601 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_'
1602 '%(tmin)s_%(tmax)s.mseed'):
1604 self.fixate_all()
1605 self._path = path
1607 def inject(self, trace):
1608 logger.debug('Received a trace: %s' % trace)
1610 buf = self.get(trace)
1611 if buf is None:
1612 trbuf = trace.copy()
1613 buf = MemTracesFile(None, [trbuf])
1614 self._pile.add_file(buf)
1615 self.set(trace, buf)
1617 else:
1618 self._pile.remove_file(buf)
1619 trbuf = buf.get_traces()[0]
1620 buf.remove(trbuf)
1621 trbuf.append(trace.ydata)
1622 buf.add(trbuf)
1623 self._pile.add_file(buf)
1624 self.set(trace, buf)
1626 trbuf = buf.get_traces()[0]
1627 if self._fixation_length is not None:
1628 if trbuf.tmax - trbuf.tmin > self._fixation_length:
1629 self._fixate(buf, complete=False)
1631 def fixate_all(self):
1632 for state in list(self._states.values()):
1633 self._fixate(state[-1])
1635 self._states = {}
1637 def free(self, buf):
1638 self._fixate(buf)
1640 def _fixate(self, buf, complete=True):
1641 trbuf = buf.get_traces()[0]
1642 del_state = True
1643 if self._path:
1644 if self._fixation_length is not None:
1645 ttmin = trbuf.tmin
1646 ytmin = util.year_start(ttmin)
1647 n = int(math.floor((ttmin - ytmin) / self._fixation_length))
1648 tmin = ytmin + n*self._fixation_length
1649 traces = []
1650 t = tmin
1651 while t <= trbuf.tmax:
1652 try:
1653 traces.append(
1654 trbuf.chop(
1655 t,
1656 t+self._fixation_length,
1657 inplace=False,
1658 snap=(math.ceil, math.ceil)))
1660 except trace.NoData:
1661 pass
1662 t += self._fixation_length
1664 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \
1665 trbuf.deltat/100. or complete:
1667 self._pile.remove_file(buf)
1669 else: # reinsert incomplete last part
1670 new_trbuf = traces.pop()
1671 self._pile.remove_file(buf)
1672 buf.remove(trbuf)
1673 buf.add(new_trbuf)
1674 self._pile.add_file(buf)
1675 del_state = False
1677 else:
1678 traces = [trbuf]
1679 self._pile.remove_file(buf)
1681 fns = io.save(traces, self._path, format=self._format)
1683 if not self._forget_fixed:
1684 self._pile.load_files(
1685 fns, show_progress=False, fileformat=self._format)
1687 if del_state:
1688 del self._states[trbuf.nslc_id]
1690 def __del__(self):
1691 self.fixate_all()