1# https://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function, division
8import os
9import logging
10import time
11import weakref
12import copy
13import re
14import sys
15import operator
16import math
17import hashlib
18try:
19 import cPickle as pickle
20except ImportError:
21 import pickle
24from . import avl
25from . import trace, io, util
26from . import config
27from .trace import degapper
30is_windows = sys.platform.startswith('win')
31show_progress_force_off = False
32version_salt = 'v1-'
35def ehash(s):
36 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest()
39def cmp(a, b):
40 return int(a > b) - int(a < b)
43def sl(s):
44 return [str(x) for x in sorted(s)]
47class Counter(dict):
49 def __missing__(self, k):
50 return 0
52 def update(self, other):
53 for k, v in other.items():
54 self[k] += v
56 def subtract(self, other):
57 for k, v in other.items():
58 self[k] -= v
59 if self[k] <= 0:
60 del self[k]
62 def subtract1(self, k):
63 self[k] -= 1
64 if self[k] <= 0:
65 del self[k]
68def fix_unicode_copy(counter, func):
69 counter_new = Counter()
70 for k in counter:
71 counter_new[func(k)] = counter[k]
72 return counter_new
75pjoin = os.path.join
76logger = logging.getLogger('pyrocko.pile')
79def avl_remove_exact(avltree, element):
80 ilo, ihi = avltree.span(element)
81 for i in range(ilo, ihi):
82 if avltree[i] is element:
83 avltree.remove_at(i)
84 return i
86 raise ValueError(
87 'avl_remove_exact(avltree, element): element not in avltree')
90def cmpfunc(key):
91 if isinstance(key, str):
92 # special cases; these run about 50% faster than the generic one on
93 # Python 2.5
94 if key == 'tmin':
95 return lambda a, b: cmp(a.tmin, b.tmin)
96 if key == 'tmax':
97 return lambda a, b: cmp(a.tmax, b.tmax)
99 key = operator.attrgetter(key)
101 return lambda a, b: cmp(key(a), key(b))
104g_dummys = {}
107def get_dummy(key):
108 if key not in g_dummys:
109 class Dummy(object):
110 def __init__(self, k):
111 setattr(self, key, k)
113 g_dummys[key] = Dummy
115 return g_dummys[key]
118class TooMany(Exception):
119 def __init__(self, n):
120 Exception.__init__(self)
121 self.n = n
124class Sorted(object):
125 def __init__(self, values=[], key=None):
126 self._set_key(key)
127 self._avl = avl.new(values, self._cmp)
129 def _set_key(self, key):
130 self._key = key
131 self._cmp = cmpfunc(key)
132 if isinstance(key, str):
133 self._dummy = get_dummy(key)
135 def __getstate__(self):
136 state = list(self._avl.iter()), self._key
137 return state
139 def __setstate__(self, state):
140 l, key = state
141 self._set_key(key)
142 self._avl = avl.from_iter(iter(l), len(l))
144 def insert(self, value):
145 self._avl.insert(value)
147 def remove(self, value):
148 return avl_remove_exact(self._avl, value)
150 def remove_at(self, i):
151 return self._avl.remove_at(i)
153 def insert_many(self, values):
154 for value in values:
155 self._avl.insert(value)
157 def remove_many(self, values):
158 for value in values:
159 avl_remove_exact(self._avl, value)
161 def __iter__(self):
162 return iter(self._avl)
164 def with_key_in(self, kmin, kmax):
165 omin, omax = self._dummy(kmin), self._dummy(kmax)
166 ilo, ihi = self._avl.span(omin, omax)
167 return self._avl[ilo:ihi]
169 def with_key_in_limited(self, kmin, kmax, nmax):
170 omin, omax = self._dummy(kmin), self._dummy(kmax)
171 ilo, ihi = self._avl.span(omin, omax)
172 if ihi - ilo > nmax:
173 raise TooMany(ihi - ilo)
175 return self._avl[ilo:ihi]
177 def index(self, value):
178 ilo, ihi = self._avl.span(value)
179 for i in range(ilo, ihi):
180 if self._avl[i] is value:
181 return i
183 raise ValueError('element is not in avl tree')
185 def min(self):
186 return self._avl.min()
188 def max(self):
189 return self._avl.max()
191 def __len__(self):
192 return len(self._avl)
194 def __getitem__(self, i):
195 return self._avl[i]
198class TracesFileCache(object):
199 '''
200 Manages trace metainformation cache.
202 For each directory with files containing traces, one cache file is
203 maintained to hold the trace metainformation of all files which are
204 contained in the directory.
205 '''
207 caches = {}
209 def __init__(self, cachedir):
210 '''
211 Create new cache.
213 :param cachedir: directory to hold the cache files.
214 '''
216 self.cachedir = cachedir
217 self.dircaches = {}
218 self.modified = set()
219 util.ensuredir(self.cachedir)
221 def get(self, abspath):
222 '''
223 Try to get an item from the cache.
225 :param abspath: absolute path of the object to retrieve
227 :returns: a stored object is returned or None if nothing could be
228 found.
229 '''
231 dircache = self._get_dircache_for(abspath)
232 if abspath in dircache:
233 return dircache[abspath]
234 return None
236 def put(self, abspath, tfile):
237 '''
238 Put an item into the cache.
240 :param abspath: absolute path of the object to be stored
241 :param tfile: object to be stored
242 '''
244 cachepath = self._dircachepath(abspath)
245 # get lock on cachepath here
246 dircache = self._get_dircache(cachepath)
247 dircache[abspath] = tfile
248 self.modified.add(cachepath)
250 def dump_modified(self):
251 '''
252 Save any modifications to disk.
253 '''
255 for cachepath in self.modified:
256 self._dump_dircache(self.dircaches[cachepath], cachepath)
257 # unlock
259 self.modified = set()
261 def clean(self):
262 '''
263 Weed out missing files from the disk caches.
264 '''
266 self.dump_modified()
268 for fn in os.listdir(self.cachedir):
269 if len(fn) == 40:
270 cache = self._load_dircache(pjoin(self.cachedir, fn))
271 self._dump_dircache(cache, pjoin(self.cachedir, fn))
273 def _get_dircache_for(self, abspath):
274 return self._get_dircache(self._dircachepath(abspath))
276 def _get_dircache(self, cachepath):
277 if cachepath not in self.dircaches:
278 if os.path.isfile(cachepath):
279 self.dircaches[cachepath] = self._load_dircache(cachepath)
280 else:
281 self.dircaches[cachepath] = {}
283 return self.dircaches[cachepath]
285 def _dircachepath(self, abspath):
286 cachefn = ehash(os.path.dirname(abspath))
287 return pjoin(self.cachedir, cachefn)
289 def _load_dircache(self, cachefilename):
291 with open(cachefilename, 'rb') as f:
292 cache = pickle.load(f)
294 # weed out files which no longer exist
295 for fn in list(cache.keys()):
296 if not os.path.isfile(fn):
297 del cache[fn]
299 time_float = util.get_time_float()
301 for v in cache.values():
302 v.trees_from_content(v.traces)
303 for tr in v.traces:
304 tr.file = v
305 # fix Py2 codes to not include unicode when the cache file
306 # was created with Py3
307 if not isinstance(tr.station, str):
308 tr.prune_from_reuse_cache()
309 tr.set_codes(
310 str(tr.network),
311 str(tr.station),
312 str(tr.location),
313 str(tr.channel))
315 tr.tmin = time_float(tr.tmin)
316 tr.tmax = time_float(tr.tmax)
318 v.data_use_count = 0
319 v.data_loaded = False
320 v.fix_unicode_codes()
322 return cache
324 def _dump_dircache(self, cache, cachefilename):
326 if not cache:
327 if os.path.exists(cachefilename):
328 os.remove(cachefilename)
329 return
331 # make a copy without the parents and the binsearch trees
332 cache_copy = {}
333 for fn in cache.keys():
334 trf = copy.copy(cache[fn])
335 trf.parent = None
336 trf.by_tmin = None
337 trf.by_tmax = None
338 trf.by_tlen = None
339 trf.by_mtime = None
340 trf.data_use_count = 0
341 trf.data_loaded = False
342 traces = []
343 for tr in trf.traces:
344 tr = tr.copy(data=False)
345 tr.ydata = None
346 tr.meta = None
347 tr.file = trf
348 traces.append(tr)
350 trf.traces = traces
352 cache_copy[fn] = trf
354 tmpfn = cachefilename+'.%i.tmp' % os.getpid()
355 with open(tmpfn, 'wb') as f:
356 pickle.dump(cache_copy, f, protocol=2)
358 if is_windows and os.path.exists(cachefilename):
359 # windows doesn't allow to rename over existing file
360 os.unlink(cachefilename)
362 os.rename(tmpfn, cachefilename)
365def get_cache(cachedir):
366 '''
367 Get global TracesFileCache object for given directory.
368 '''
369 if cachedir not in TracesFileCache.caches:
370 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir)
372 return TracesFileCache.caches[cachedir]
375def loader(
376 filenames, fileformat, cache, filename_attributes,
377 show_progress=True, update_progress=None):
379 if show_progress_force_off:
380 show_progress = False
382 class Progress(object):
383 def __init__(self, label, n):
384 self._label = label
385 self._n = n
386 self._bar = None
387 if show_progress:
388 self._bar = util.progressbar(label, self._n)
390 if update_progress:
391 update_progress(label, 0, self._n)
393 def update(self, i):
394 if self._bar:
395 if i < self._n-1:
396 self._bar.update(i)
397 else:
398 self._bar.finish()
399 self._bar = None
401 abort = False
402 if update_progress:
403 abort = update_progress(self._label, i, self._n)
405 return abort
407 def finish(self):
408 if self._bar:
409 self._bar.finish()
410 self._bar = None
412 if not filenames:
413 logger.warning('No files to load from')
414 return
416 regex = None
417 if filename_attributes:
418 regex = re.compile(filename_attributes)
420 try:
421 progress = Progress('Looking at files', len(filenames))
423 failures = []
424 to_load = []
425 for i, filename in enumerate(filenames):
426 try:
427 abspath = os.path.abspath(filename)
429 substitutions = None
430 if regex:
431 m = regex.search(filename)
432 if not m:
433 raise FilenameAttributeError(
434 "Cannot get attributes with pattern '%s' "
435 "from path '%s'" % (filename_attributes, filename))
437 substitutions = {}
438 for k in m.groupdict():
439 if k in ('network', 'station', 'location', 'channel'):
440 substitutions[k] = m.groupdict()[k]
442 mtime = os.stat(filename)[8]
443 tfile = None
444 if cache:
445 tfile = cache.get(abspath)
447 mustload = (
448 not tfile or
449 (tfile.format != fileformat and fileformat != 'detect') or
450 tfile.mtime != mtime or
451 substitutions is not None)
453 to_load.append(
454 (mustload, mtime, abspath, substitutions, tfile))
456 except (OSError, FilenameAttributeError) as xerror:
457 failures.append(abspath)
458 logger.warning(xerror)
460 abort = progress.update(i+1)
461 if abort:
462 progress.update(len(filenames))
463 return
465 progress.update(len(filenames))
467 to_load.sort(key=lambda x: x[2])
469 nload = len([1 for x in to_load if x[0]])
470 iload = 0
472 count_all = False
473 if nload < 0.01*len(to_load):
474 nload = len(to_load)
475 count_all = True
477 if to_load:
478 progress = Progress('Scanning files', nload)
480 for (mustload, mtime, abspath, substitutions, tfile) in to_load:
481 try:
482 if mustload:
483 tfile = TracesFile(
484 None, abspath, fileformat,
485 substitutions=substitutions, mtime=mtime)
487 if cache and not substitutions:
488 cache.put(abspath, tfile)
490 if not count_all:
491 iload += 1
493 if count_all:
494 iload += 1
496 except (io.FileLoadError, OSError) as xerror:
497 failures.append(abspath)
498 logger.warning(xerror)
499 else:
500 yield tfile
502 abort = progress.update(iload+1)
503 if abort:
504 break
506 progress.update(nload)
508 if failures:
509 logger.warning(
510 'The following file%s caused problems and will be ignored:\n' %
511 util.plural_s(len(failures)) + '\n'.join(failures))
513 if cache:
514 cache.dump_modified()
515 finally:
516 progress.finish()
519def tlen(x):
520 return x.tmax-x.tmin
523class TracesGroup(object):
525 '''
526 Trace container base class.
528 Base class for Pile, SubPile, and TracesFile, i.e. anything containing
529 a collection of several traces. A TracesGroup object maintains lookup sets
530 of some of the traces meta-information, as well as a combined time-range
531 of its contents.
532 '''
534 def __init__(self, parent):
535 self.parent = parent
536 self.empty()
537 self.nupdates = 0
538 self.abspath = None
540 def set_parent(self, parent):
541 self.parent = parent
543 def get_parent(self):
544 return self.parent
546 def empty(self):
547 self.networks, self.stations, self.locations, self.channels, \
548 self.nslc_ids, self.deltats = [Counter() for x in range(6)]
549 self.by_tmin = Sorted([], 'tmin')
550 self.by_tmax = Sorted([], 'tmax')
551 self.by_tlen = Sorted([], tlen)
552 self.by_mtime = Sorted([], 'mtime')
553 self.tmin, self.tmax = None, None
554 self.deltatmin, self.deltatmax = None, None
556 def trees_from_content(self, content):
557 self.by_tmin = Sorted(content, 'tmin')
558 self.by_tmax = Sorted(content, 'tmax')
559 self.by_tlen = Sorted(content, tlen)
560 self.by_mtime = Sorted(content, 'mtime')
561 self.adjust_minmax()
563 def fix_unicode_codes(self):
564 for net in self.networks:
565 if isinstance(net, str):
566 return
568 self.networks = fix_unicode_copy(self.networks, str)
569 self.stations = fix_unicode_copy(self.stations, str)
570 self.locations = fix_unicode_copy(self.locations, str)
571 self.channels = fix_unicode_copy(self.channels, str)
572 self.nslc_ids = fix_unicode_copy(
573 self.nslc_ids, lambda k: tuple(str(x) for x in k))
575 def add(self, content):
576 '''
577 Add content to traces group and update indices.
579 Accepts :py:class:`pyrocko.trace.Trace` objects and
580 :py:class:`pyrocko.pile.TracesGroup` objects.
581 '''
583 if isinstance(content, (trace.Trace, TracesGroup)):
584 content = [content]
586 for c in content:
588 if isinstance(c, TracesGroup):
589 self.networks.update(c.networks)
590 self.stations.update(c.stations)
591 self.locations.update(c.locations)
592 self.channels.update(c.channels)
593 self.nslc_ids.update(c.nslc_ids)
594 self.deltats.update(c.deltats)
596 self.by_tmin.insert_many(c.by_tmin)
597 self.by_tmax.insert_many(c.by_tmax)
598 self.by_tlen.insert_many(c.by_tlen)
599 self.by_mtime.insert_many(c.by_mtime)
601 elif isinstance(c, trace.Trace):
602 self.networks[c.network] += 1
603 self.stations[c.station] += 1
604 self.locations[c.location] += 1
605 self.channels[c.channel] += 1
606 self.nslc_ids[c.nslc_id] += 1
607 self.deltats[c.deltat] += 1
609 self.by_tmin.insert(c)
610 self.by_tmax.insert(c)
611 self.by_tlen.insert(c)
612 self.by_mtime.insert(c)
614 self.adjust_minmax()
616 self.nupdates += 1
617 self.notify_listeners('add')
619 if self.parent is not None:
620 self.parent.add(content)
622 def remove(self, content):
623 '''
624 Remove content to traces group and update indices.
625 '''
626 if isinstance(content, (trace.Trace, TracesGroup)):
627 content = [content]
629 for c in content:
631 if isinstance(c, TracesGroup):
632 self.networks.subtract(c.networks)
633 self.stations.subtract(c.stations)
634 self.locations.subtract(c.locations)
635 self.channels.subtract(c.channels)
636 self.nslc_ids.subtract(c.nslc_ids)
637 self.deltats.subtract(c.deltats)
639 self.by_tmin.remove_many(c.by_tmin)
640 self.by_tmax.remove_many(c.by_tmax)
641 self.by_tlen.remove_many(c.by_tlen)
642 self.by_mtime.remove_many(c.by_mtime)
644 elif isinstance(c, trace.Trace):
645 self.networks.subtract1(c.network)
646 self.stations.subtract1(c.station)
647 self.locations.subtract1(c.location)
648 self.channels.subtract1(c.channel)
649 self.nslc_ids.subtract1(c.nslc_id)
650 self.deltats.subtract1(c.deltat)
652 self.by_tmin.remove(c)
653 self.by_tmax.remove(c)
654 self.by_tlen.remove(c)
655 self.by_mtime.remove(c)
657 self.adjust_minmax()
659 self.nupdates += 1
660 self.notify_listeners('remove')
662 if self.parent is not None:
663 self.parent.remove(content)
665 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None):
666 '''
667 Return list of :py:class:`pyrocko.trace.Trace` objects where given
668 arguments ``tmin`` and ``tmax`` match.
670 :param tmin: start time
671 :param tmax: end time
672 :param group_selector: lambda expression taking group dict of regex
673 match object as a single argument and which returns true or false
674 to keep or reject a file (default: ``None``)
675 :param trace_selector: lambda expression taking group dict of regex
676 match object as a single argument and which returns true or false
677 to keep or reject a file (default: ``None``)
678 '''
680 if not self.by_tmin or not self.is_relevant(
681 tmin, tmax, group_selector):
683 return []
685 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax)
686 if tr.is_relevant(tmin, tmax, trace_selector)]
688 def adjust_minmax(self):
689 if self.by_tmin:
690 self.tmin = self.by_tmin.min().tmin
691 self.tmax = self.by_tmax.max().tmax
692 t = self.by_tlen.max()
693 self.tlenmax = t.tmax - t.tmin
694 self.mtime = self.by_mtime.max().mtime
695 deltats = list(self.deltats.keys())
696 self.deltatmin = min(deltats)
697 self.deltatmax = max(deltats)
698 else:
699 self.tmin = None
700 self.tmax = None
701 self.tlenmax = None
702 self.mtime = None
703 self.deltatmin = None
704 self.deltatmax = None
706 def notify_listeners(self, what):
707 pass
709 def get_update_count(self):
710 return self.nupdates
712 def overlaps(self, tmin, tmax):
713 return self.tmin is not None \
714 and tmax >= self.tmin and self.tmax >= tmin
716 def is_relevant(self, tmin, tmax, group_selector=None):
717 if self.tmin is None or self.tmax is None:
718 return False
719 return tmax >= self.tmin and self.tmax >= tmin and (
720 group_selector is None or group_selector(self))
723class MemTracesFile(TracesGroup):
725 '''
726 This is needed to make traces without an actual disc file to be inserted
727 into a Pile.
728 '''
730 def __init__(self, parent, traces):
731 TracesGroup.__init__(self, parent)
732 self.add(traces)
733 self.mtime = time.time()
735 def add(self, traces):
736 if isinstance(traces, trace.Trace):
737 traces = [traces]
739 for tr in traces:
740 tr.file = self
742 TracesGroup.add(self, traces)
744 def load_headers(self, mtime=None):
745 pass
747 def load_data(self):
748 pass
750 def use_data(self):
751 pass
753 def drop_data(self):
754 pass
756 def reload_if_modified(self):
757 return False
759 def iter_traces(self):
760 for tr in self.by_tmin:
761 yield tr
763 def get_traces(self):
764 return list(self.by_tmin)
766 def gather_keys(self, gather, selector=None):
767 keys = set()
768 for tr in self.by_tmin:
769 if selector is None or selector(tr):
770 keys.add(gather(tr))
772 return keys
774 def __str__(self):
776 s = 'MemTracesFile\n'
777 s += 'file mtime: %s\n' % util.time_to_str(self.mtime)
778 s += 'number of traces: %i\n' % len(self.by_tmin)
779 s += 'timerange: %s - %s\n' % (
780 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
781 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
782 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
783 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
784 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
785 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
786 return s
789class TracesFile(TracesGroup):
790 def __init__(
791 self, parent, abspath, format,
792 substitutions=None, mtime=None):
794 TracesGroup.__init__(self, parent)
795 self.abspath = abspath
796 self.format = format
797 self.traces = []
798 self.data_loaded = False
799 self.data_use_count = 0
800 self.substitutions = substitutions
801 self.load_headers(mtime=mtime)
802 self.mtime = mtime
804 def load_headers(self, mtime=None):
805 logger.debug('loading headers from file: %s' % self.abspath)
806 if mtime is None:
807 self.mtime = os.stat(self.abspath)[8]
809 def kgen(tr):
810 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id
812 self.remove(self.traces)
813 ks = set()
814 for tr in io.load(self.abspath,
815 format=self.format,
816 getdata=False,
817 substitutions=self.substitutions):
819 k = kgen(tr)
820 if k not in ks:
821 ks.add(k)
822 self.traces.append(tr)
823 tr.file = self
825 self.add(self.traces)
827 self.data_loaded = False
828 self.data_use_count = 0
830 def load_data(self, force=False):
831 file_changed = False
832 if not self.data_loaded or force:
833 logger.debug('loading data from file: %s' % self.abspath)
835 def kgen(tr):
836 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id
838 traces_ = io.load(self.abspath, format=self.format, getdata=True,
839 substitutions=self.substitutions)
841 # prevent adding duplicate snippets from corrupt mseed files
842 k_loaded = set()
843 traces = []
844 for tr in traces_:
845 k = kgen(tr)
846 if k not in k_loaded:
847 k_loaded.add(k)
848 traces.append(tr)
850 k_current_d = dict((kgen(tr), tr) for tr in self.traces)
851 k_current = set(k_current_d)
852 k_new = k_loaded - k_current
853 k_delete = k_current - k_loaded
854 k_unchanged = k_current & k_loaded
856 for tr in self.traces[:]:
857 if kgen(tr) in k_delete:
858 self.remove(tr)
859 self.traces.remove(tr)
860 tr.file = None
861 file_changed = True
863 for tr in traces:
864 if kgen(tr) in k_new:
865 tr.file = self
866 self.traces.append(tr)
867 self.add(tr)
868 file_changed = True
870 for tr in traces:
871 if kgen(tr) in k_unchanged:
872 ctr = k_current_d[kgen(tr)]
873 ctr.ydata = tr.ydata
875 self.data_loaded = True
877 if file_changed:
878 logger.debug('reloaded (file may have changed): %s' % self.abspath)
880 return file_changed
882 def use_data(self):
883 if not self.data_loaded:
884 raise Exception('Data not loaded')
885 self.data_use_count += 1
887 def drop_data(self):
888 if self.data_loaded:
889 if self.data_use_count == 1:
890 logger.debug('forgetting data of file: %s' % self.abspath)
891 for tr in self.traces:
892 tr.drop_data()
894 self.data_loaded = False
896 self.data_use_count -= 1
897 else:
898 self.data_use_count = 0
900 def reload_if_modified(self):
901 mtime = os.stat(self.abspath)[8]
902 if mtime != self.mtime:
903 logger.debug(
904 'mtime=%i, reloading file: %s' % (mtime, self.abspath))
906 self.mtime = mtime
907 if self.data_loaded:
908 self.load_data(force=True)
909 else:
910 self.load_headers()
912 return True
914 return False
916 def iter_traces(self):
917 for tr in self.traces:
918 yield tr
920 def gather_keys(self, gather, selector=None):
921 keys = set()
922 for tr in self.by_tmin:
923 if selector is None or selector(tr):
924 keys.add(gather(tr))
926 return keys
928 def __str__(self):
929 s = 'TracesFile\n'
930 s += 'abspath: %s\n' % self.abspath
931 s += 'file mtime: %s\n' % util.time_to_str(self.mtime)
932 s += 'number of traces: %i\n' % len(self.traces)
933 s += 'timerange: %s - %s\n' % (
934 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
935 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
936 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
937 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
938 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
939 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
940 return s
943class FilenameAttributeError(Exception):
944 pass
947class SubPile(TracesGroup):
948 def __init__(self, parent):
949 TracesGroup.__init__(self, parent)
950 self.files = []
951 self.empty()
953 def add_file(self, file):
954 self.files.append(file)
955 file.set_parent(self)
956 self.add(file)
958 def remove_file(self, file):
959 self.files.remove(file)
960 file.set_parent(None)
961 self.remove(file)
963 def remove_files(self, files):
964 for file in files:
965 self.files.remove(file)
966 file.set_parent(None)
967 self.remove(files)
969 def gather_keys(self, gather, selector=None):
970 keys = set()
971 for file in self.files:
972 keys |= file.gather_keys(gather, selector)
974 return keys
976 def iter_traces(
977 self,
978 load_data=False,
979 return_abspath=False,
980 group_selector=None,
981 trace_selector=None):
983 for file in self.files:
985 if group_selector and not group_selector(file):
986 continue
988 must_drop = False
989 if load_data:
990 file.load_data()
991 file.use_data()
992 must_drop = True
994 for tr in file.iter_traces():
995 if trace_selector and not trace_selector(tr):
996 continue
998 if return_abspath:
999 yield file.abspath, tr
1000 else:
1001 yield tr
1003 if must_drop:
1004 file.drop_data()
1006 def iter_files(self):
1007 for file in self.files:
1008 yield file
1010 def reload_modified(self):
1011 modified = False
1012 for file in self.files:
1013 modified |= file.reload_if_modified()
1015 return modified
1017 def __str__(self):
1018 s = 'SubPile\n'
1019 s += 'number of files: %i\n' % len(self.files)
1020 s += 'timerange: %s - %s\n' % (
1021 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
1022 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
1023 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
1024 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
1025 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
1026 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
1027 return s
1030class Batch(object):
1031 '''
1032 Batch of waveforms from window wise data extraction.
1034 Encapsulates state and results yielded for each window in window wise
1035 waveform extraction with the :py:meth:`Pile.chopper` method (when the
1036 `style='batch'` keyword argument set).
1038 *Attributes:*
1040 .. py:attribute:: tmin
1042 Start of this time window.
1044 .. py:attribute:: tmax
1046 End of this time window.
1048 .. py:attribute:: i
1050 Index of this time window in sequence.
1052 .. py:attribute:: n
1054 Total number of time windows in sequence.
1056 .. py:attribute:: traces
1058 Extracted waveforms for this time window.
1059 '''
1061 def __init__(self, tmin, tmax, i, n, traces):
1062 self.tmin = tmin
1063 self.tmax = tmax
1064 self.i = i
1065 self.n = n
1066 self.traces = traces
1069class Pile(TracesGroup):
1070 '''
1071 Waveform archive lookup, data loading and caching infrastructure.
1072 '''
1074 def __init__(self):
1075 TracesGroup.__init__(self, None)
1076 self.subpiles = {}
1077 self.open_files = {}
1078 self.listeners = []
1079 self.abspaths = set()
1081 def add_listener(self, obj):
1082 self.listeners.append(weakref.ref(obj))
1084 def notify_listeners(self, what):
1085 for ref in self.listeners:
1086 obj = ref()
1087 if obj:
1088 obj.pile_changed(what)
1090 def load_files(
1091 self, filenames,
1092 filename_attributes=None,
1093 fileformat='mseed',
1094 cache=None,
1095 show_progress=True,
1096 update_progress=None):
1098 load = loader(
1099 filenames, fileformat, cache, filename_attributes,
1100 show_progress=show_progress,
1101 update_progress=update_progress)
1103 self.add_files(load)
1105 def add_files(self, files):
1106 for file in files:
1107 self.add_file(file)
1109 def add_file(self, file):
1110 if file.abspath is not None and file.abspath in self.abspaths:
1111 logger.warning('File already in pile: %s' % file.abspath)
1112 return
1114 if file.deltatmin is None:
1115 logger.warning('Sampling rate of all traces are zero in file: %s' %
1116 file.abspath)
1117 return
1119 subpile = self.dispatch(file)
1120 subpile.add_file(file)
1121 if file.abspath is not None:
1122 self.abspaths.add(file.abspath)
1124 def remove_file(self, file):
1125 subpile = file.get_parent()
1126 if subpile is not None:
1127 subpile.remove_file(file)
1128 if file.abspath is not None:
1129 self.abspaths.remove(file.abspath)
1131 def remove_files(self, files):
1132 subpile_files = {}
1133 for file in files:
1134 subpile = file.get_parent()
1135 if subpile not in subpile_files:
1136 subpile_files[subpile] = []
1138 subpile_files[subpile].append(file)
1140 for subpile, files in subpile_files.items():
1141 subpile.remove_files(files)
1142 for file in files:
1143 if file.abspath is not None:
1144 self.abspaths.remove(file.abspath)
1146 def dispatch_key(self, file):
1147 dt = int(math.floor(math.log(file.deltatmin)))
1148 return dt
1150 def dispatch(self, file):
1151 k = self.dispatch_key(file)
1152 if k not in self.subpiles:
1153 self.subpiles[k] = SubPile(self)
1155 return self.subpiles[k]
1157 def get_deltats(self):
1158 return list(self.deltats.keys())
1160 def chop(
1161 self, tmin, tmax,
1162 group_selector=None,
1163 trace_selector=None,
1164 snap=(round, round),
1165 include_last=False,
1166 load_data=True):
1168 chopped = []
1169 used_files = set()
1171 traces = self.relevant(tmin, tmax, group_selector, trace_selector)
1172 if load_data:
1173 files_changed = False
1174 for tr in traces:
1175 if tr.file and tr.file not in used_files:
1176 if tr.file.load_data():
1177 files_changed = True
1179 if tr.file is not None:
1180 used_files.add(tr.file)
1182 if files_changed:
1183 traces = self.relevant(
1184 tmin, tmax, group_selector, trace_selector)
1186 for tr in traces:
1187 if not load_data and tr.ydata is not None:
1188 tr = tr.copy(data=False)
1189 tr.ydata = None
1191 try:
1192 chopped.append(tr.chop(
1193 tmin, tmax,
1194 inplace=False,
1195 snap=snap,
1196 include_last=include_last))
1198 except trace.NoData:
1199 pass
1201 return chopped, used_files
1203 def _process_chopped(
1204 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
1205 tpad):
1207 chopped.sort(key=lambda a: a.full_id)
1208 if degap:
1209 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap)
1211 if not want_incomplete:
1212 chopped_weeded = []
1213 for tr in chopped:
1214 emin = tr.tmin - (wmin-tpad)
1215 emax = tr.tmax + tr.deltat - (wmax+tpad)
1216 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
1217 chopped_weeded.append(tr)
1219 elif degap:
1220 if (0. < emin <= 5. * tr.deltat and
1221 -5. * tr.deltat <= emax < 0.):
1223 tr.extend(
1224 wmin-tpad,
1225 wmax+tpad-tr.deltat,
1226 fillmethod='repeat')
1228 chopped_weeded.append(tr)
1230 chopped = chopped_weeded
1232 for tr in chopped:
1233 tr.wmin = wmin
1234 tr.wmax = wmax
1236 return chopped
1238 def chopper(
1239 self,
1240 tmin=None, tmax=None, tinc=None, tpad=0.,
1241 group_selector=None, trace_selector=None,
1242 want_incomplete=True, degap=True, maxgap=5, maxlap=None,
1243 keep_current_files_open=False, accessor_id=None,
1244 snap=(round, round), include_last=False, load_data=True,
1245 style=None):
1247 '''
1248 Get iterator for shifting window wise data extraction from waveform
1249 archive.
1251 :param tmin: start time (default uses start time of available data)
1252 :param tmax: end time (default uses end time of available data)
1253 :param tinc: time increment (window shift time) (default uses
1254 ``tmax-tmin``)
1255 :param tpad: padding time appended on either side of the data windows
1256 (window overlap is ``2*tpad``)
1257 :param group_selector: filter callback taking :py:class:`TracesGroup`
1258 objects
1259 :param trace_selector: filter callback taking
1260 :py:class:`pyrocko.trace.Trace` objects
1261 :param want_incomplete: if set to ``False``, gappy/incomplete traces
1262 are discarded from the results
1263 :param degap: whether to try to connect traces and to remove gaps and
1264 overlaps
1265 :param maxgap: maximum gap size in samples which is filled with
1266 interpolated samples when ``degap`` is ``True``
1267 :param maxlap: maximum overlap size in samples which is removed when
1268 ``degap`` is ``True``
1269 :param keep_current_files_open: whether to keep cached trace data in
1270 memory after the iterator has ended
1271 :param accessor_id: if given, used as a key to identify different
1272 points of extraction for the decision of when to release cached
1273 trace data (should be used when data is alternately extracted from
1274 more than one region / selection)
1275 :param snap: replaces Python's :py:func:`round` function which is used
1276 to determine indices where to start and end the trace data array
1277 :param include_last: whether to include last sample
1278 :param load_data: whether to load the waveform data. If set to
1279 ``False``, traces with no data samples, but with correct
1280 meta-information are returned
1281 :param style: set to ``'batch'`` to yield waveforms and information
1282 about the chopper state as :py:class:`Batch` objects. By default
1283 lists of :py:class:`pyrocko.trace.Trace` objects are yielded.
1284 :returns: iterator providing extracted waveforms for each extracted
1285 window. See ``style`` argument for details.
1286 '''
1287 if tmin is None:
1288 if self.tmin is None:
1289 logger.warning('Pile\'s tmin is not set - pile may be empty.')
1290 return
1291 tmin = self.tmin + tpad
1293 if tmax is None:
1294 if self.tmax is None:
1295 logger.warning('Pile\'s tmax is not set - pile may be empty.')
1296 return
1297 tmax = self.tmax - tpad
1299 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector):
1300 return
1302 if accessor_id not in self.open_files:
1303 self.open_files[accessor_id] = set()
1305 open_files = self.open_files[accessor_id]
1307 if tinc is None:
1308 tinc = tmax - tmin
1309 nwin = 1
1310 else:
1311 eps = tinc * 1e-6
1312 if tinc != 0.0:
1313 nwin = int(((tmax - eps) - tmin) / tinc) + 1
1314 else:
1315 nwin = 1
1317 for iwin in range(nwin):
1318 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
1320 chopped, used_files = self.chop(
1321 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap,
1322 include_last, load_data)
1324 for file in used_files - open_files:
1325 # increment datause counter on newly opened files
1326 file.use_data()
1328 open_files.update(used_files)
1330 processed = self._process_chopped(
1331 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
1332 tpad)
1334 if style == 'batch':
1335 yield Batch(
1336 tmin=wmin,
1337 tmax=wmax,
1338 i=iwin,
1339 n=nwin,
1340 traces=processed)
1342 else:
1343 yield processed
1345 unused_files = open_files - used_files
1347 while unused_files:
1348 file = unused_files.pop()
1349 file.drop_data()
1350 open_files.remove(file)
1352 if not keep_current_files_open:
1353 while open_files:
1354 file = open_files.pop()
1355 file.drop_data()
1357 def all(self, *args, **kwargs):
1358 '''
1359 Shortcut to aggregate :py:meth:`chopper` output into a single list.
1360 '''
1362 alltraces = []
1363 for traces in self.chopper(*args, **kwargs):
1364 alltraces.extend(traces)
1366 return alltraces
1368 def iter_all(self, *args, **kwargs):
1369 for traces in self.chopper(*args, **kwargs):
1370 for tr in traces:
1371 yield tr
1373 def chopper_grouped(self, gather, progress=None, *args, **kwargs):
1374 keys = self.gather_keys(gather)
1375 if len(keys) == 0:
1376 return
1378 outer_group_selector = None
1379 if 'group_selector' in kwargs:
1380 outer_group_selector = kwargs['group_selector']
1382 outer_trace_selector = None
1383 if 'trace_selector' in kwargs:
1384 outer_trace_selector = kwargs['trace_selector']
1386 # the use of this gather-cache makes it impossible to modify the pile
1387 # during chopping
1388 gather_cache = {}
1389 pbar = None
1390 try:
1391 if progress is not None:
1392 pbar = util.progressbar(progress, len(keys))
1394 for ikey, key in enumerate(keys):
1395 def tsel(tr):
1396 return gather(tr) == key and (
1397 outer_trace_selector is None
1398 or outer_trace_selector(tr))
1400 def gsel(gr):
1401 if gr not in gather_cache:
1402 gather_cache[gr] = gr.gather_keys(gather)
1404 return key in gather_cache[gr] and (
1405 outer_group_selector is None
1406 or outer_group_selector(gr))
1408 kwargs['trace_selector'] = tsel
1409 kwargs['group_selector'] = gsel
1411 for traces in self.chopper(*args, **kwargs):
1412 yield traces
1414 if pbar:
1415 pbar.update(ikey+1)
1417 finally:
1418 if pbar:
1419 pbar.finish()
1421 def gather_keys(self, gather, selector=None):
1422 keys = set()
1423 for subpile in self.subpiles.values():
1424 keys |= subpile.gather_keys(gather, selector)
1426 return sorted(keys)
1428 def iter_traces(
1429 self,
1430 load_data=False,
1431 return_abspath=False,
1432 group_selector=None,
1433 trace_selector=None):
1435 '''
1436 Iterate over all traces in pile.
1438 :param load_data: whether to load the waveform data, by default empty
1439 traces are yielded
1440 :param return_abspath: if ``True`` yield tuples containing absolute
1441 file path and :py:class:`pyrocko.trace.Trace` objects
1442 :param group_selector: filter callback taking :py:class:`TracesGroup`
1443 objects
1444 :param trace_selector: filter callback taking
1445 :py:class:`pyrocko.trace.Trace` objects
1447 Example; yields only traces, where the station code is 'HH1'::
1449 test_pile = pile.make_pile('/local/test_trace_directory')
1450 for t in test_pile.iter_traces(
1451 trace_selector=lambda tr: tr.station=='HH1'):
1453 print t
1454 '''
1456 for subpile in self.subpiles.values():
1457 if not group_selector or group_selector(subpile):
1458 for tr in subpile.iter_traces(load_data, return_abspath,
1459 group_selector, trace_selector):
1460 yield tr
1462 def iter_files(self):
1463 for subpile in self.subpiles.values():
1464 for file in subpile.iter_files():
1465 yield file
1467 def reload_modified(self):
1468 modified = False
1469 for subpile in self.subpiles.values():
1470 modified |= subpile.reload_modified()
1472 return modified
1474 def get_tmin(self):
1475 return self.tmin
1477 def get_tmax(self):
1478 return self.tmax
1480 def get_deltatmin(self):
1481 return self.deltatmin
1483 def get_deltatmax(self):
1484 return self.deltatmax
1486 def is_empty(self):
1487 return self.tmin is None and self.tmax is None
1489 def __str__(self):
1490 if self.tmin is not None and self.tmax is not None:
1491 tmin = util.time_to_str(self.tmin)
1492 tmax = util.time_to_str(self.tmax)
1493 s = 'Pile\n'
1494 s += 'number of subpiles: %i\n' % len(self.subpiles)
1495 s += 'timerange: %s - %s\n' % (tmin, tmax)
1496 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
1497 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
1498 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
1499 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
1500 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
1502 else:
1503 s = 'empty Pile'
1505 return s
1507 def snuffle(self, **kwargs):
1508 '''
1509 Visualize it.
1511 :param stations: list of :py:class:`pyrocko.model.Station` objects or
1512 ``None``
1513 :param events: list of :py:class:`pyrocko.model.Event` objects or
1514 ``None``
1515 :param markers: list of :py:class:`pyrocko.gui.snuffler.marker.Marker`
1516 objects or ``None``
1517 :param ntracks: float, number of tracks to be shown initially
1518 (default: 12)
1519 :param follow: time interval (in seconds) for real time follow mode or
1520 ``None``
1521 :param controls: bool, whether to show the main controls (default:
1522 ``True``)
1523 :param opengl: bool, whether to use opengl (default: ``False``)
1524 '''
1526 from pyrocko.gui.snuffler.snuffler import snuffle
1527 snuffle(self, **kwargs)
1530def make_pile(
1531 paths=None, selector=None, regex=None,
1532 fileformat='mseed',
1533 cachedirname=None, show_progress=True):
1535 '''
1536 Create pile from given file and directory names.
1538 :param paths: filenames and/or directories to look for traces. If paths is
1539 ``None`` ``sys.argv[1:]`` is used.
1540 :param selector: lambda expression taking group dict of regex match object
1541 as a single argument and which returns true or false to keep or reject
1542 a file
1543 :param regex: regular expression which filenames have to match
1544 :param fileformat: format of the files ('mseed', 'sac', 'kan',
1545 'from_extension', 'detect')
1546 :param cachedirname: loader cache is stored under this directory. It is
1547 created as neccessary.
1548 :param show_progress: show progress bar and other progress information
1549 '''
1551 if show_progress_force_off:
1552 show_progress = False
1554 if isinstance(paths, str):
1555 paths = [paths]
1557 if paths is None:
1558 paths = sys.argv[1:]
1560 if cachedirname is None:
1561 cachedirname = config.config().cache_dir
1563 fns = util.select_files(
1564 paths, include=regex, selector=selector, show_progress=show_progress)
1566 cache = get_cache(cachedirname)
1567 p = Pile()
1568 p.load_files(
1569 sorted(fns),
1570 cache=cache,
1571 fileformat=fileformat,
1572 show_progress=show_progress)
1574 return p
1577class Injector(trace.States):
1579 def __init__(
1580 self, pile,
1581 fixation_length=None,
1582 path=None,
1583 format='from_extension',
1584 forget_fixed=False):
1586 trace.States.__init__(self)
1587 self._pile = pile
1588 self._fixation_length = fixation_length
1589 self._format = format
1590 self._path = path
1591 self._forget_fixed = forget_fixed
1593 def set_fixation_length(self, length):
1594 '''
1595 Set length after which the fixation method is called on buffer traces.
1597 The length should be given in seconds. Give None to disable.
1598 '''
1599 self.fixate_all()
1600 self._fixation_length = length # in seconds
1602 def set_save_path(
1603 self,
1604 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_'
1605 '%(tmin)s_%(tmax)s.mseed'):
1607 self.fixate_all()
1608 self._path = path
1610 def inject(self, trace):
1611 logger.debug('Received a trace: %s' % trace)
1613 buf = self.get(trace)
1614 if buf is None:
1615 trbuf = trace.copy()
1616 buf = MemTracesFile(None, [trbuf])
1617 self._pile.add_file(buf)
1618 self.set(trace, buf)
1620 else:
1621 self._pile.remove_file(buf)
1622 trbuf = buf.get_traces()[0]
1623 buf.remove(trbuf)
1624 trbuf.append(trace.ydata)
1625 buf.add(trbuf)
1626 self._pile.add_file(buf)
1627 self.set(trace, buf)
1629 trbuf = buf.get_traces()[0]
1630 if self._fixation_length is not None:
1631 if trbuf.tmax - trbuf.tmin > self._fixation_length:
1632 self._fixate(buf, complete=False)
1634 def fixate_all(self):
1635 for state in list(self._states.values()):
1636 self._fixate(state[-1])
1638 self._states = {}
1640 def free(self, buf):
1641 self._fixate(buf)
1643 def _fixate(self, buf, complete=True):
1644 trbuf = buf.get_traces()[0]
1645 del_state = True
1646 if self._path:
1647 if self._fixation_length is not None:
1648 ttmin = trbuf.tmin
1649 ytmin = util.year_start(ttmin)
1650 n = int(math.floor((ttmin - ytmin) / self._fixation_length))
1651 tmin = ytmin + n*self._fixation_length
1652 traces = []
1653 t = tmin
1654 while t <= trbuf.tmax:
1655 try:
1656 traces.append(
1657 trbuf.chop(
1658 t,
1659 t+self._fixation_length,
1660 inplace=False,
1661 snap=(math.ceil, math.ceil)))
1663 except trace.NoData:
1664 pass
1665 t += self._fixation_length
1667 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \
1668 trbuf.deltat/100. or complete:
1670 self._pile.remove_file(buf)
1672 else: # reinsert incomplete last part
1673 new_trbuf = traces.pop()
1674 self._pile.remove_file(buf)
1675 buf.remove(trbuf)
1676 buf.add(new_trbuf)
1677 self._pile.add_file(buf)
1678 del_state = False
1680 else:
1681 traces = [trbuf]
1682 self._pile.remove_file(buf)
1684 fns = io.save(traces, self._path, format=self._format)
1686 if not self._forget_fixed:
1687 self._pile.load_files(
1688 fns, show_progress=False, fileformat=self._format)
1690 if del_state:
1691 del self._states[trbuf.nslc_id]
1693 def __del__(self):
1694 self.fixate_all()