1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
5from __future__ import absolute_import, division, print_function
7import os
8import logging
9import time
10import weakref
11import copy
12import re
13import sys
14import operator
15import math
16import hashlib
17try:
18 import cPickle as pickle
19except ImportError:
20 import pickle
23from . import avl
24from . import trace, io, util
25from . import config
26from .trace import degapper
29is_windows = sys.platform.startswith('win')
30show_progress_force_off = False
31version_salt = 'v1-'
34def ehash(s):
35 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest()
38def cmp(a, b):
39 return int(a > b) - int(a < b)
42def sl(s):
43 return [str(x) for x in sorted(s)]
46class Counter(dict):
48 def __missing__(self, k):
49 return 0
51 def update(self, other):
52 for k, v in other.items():
53 self[k] += v
55 def subtract(self, other):
56 for k, v in other.items():
57 self[k] -= v
58 if self[k] <= 0:
59 del self[k]
61 def subtract1(self, k):
62 self[k] -= 1
63 if self[k] <= 0:
64 del self[k]
67def fix_unicode_copy(counter, func):
68 counter_new = Counter()
69 for k in counter:
70 counter_new[func(k)] = counter[k]
71 return counter_new
74pjoin = os.path.join
75logger = logging.getLogger('pyrocko.pile')
78def avl_remove_exact(avltree, element):
79 ilo, ihi = avltree.span(element)
80 for i in range(ilo, ihi):
81 if avltree[i] is element:
82 avltree.remove_at(i)
83 return i
85 raise ValueError(
86 'avl_remove_exact(avltree, element): element not in avltree')
89def cmpfunc(key):
90 if isinstance(key, str):
91 # special cases; these run about 50% faster than the generic one on
92 # Python 2.5
93 if key == 'tmin':
94 return lambda a, b: cmp(a.tmin, b.tmin)
95 if key == 'tmax':
96 return lambda a, b: cmp(a.tmax, b.tmax)
98 key = operator.attrgetter(key)
100 return lambda a, b: cmp(key(a), key(b))
103g_dummys = {}
106def get_dummy(key):
107 if key not in g_dummys:
108 class Dummy(object):
109 def __init__(self, k):
110 setattr(self, key, k)
112 g_dummys[key] = Dummy
114 return g_dummys[key]
117class TooMany(Exception):
118 def __init__(self, n):
119 Exception.__init__(self)
120 self.n = n
123class Sorted(object):
124 def __init__(self, values=[], key=None):
125 self._set_key(key)
126 self._avl = avl.new(values, self._cmp)
128 def _set_key(self, key):
129 self._key = key
130 self._cmp = cmpfunc(key)
131 if isinstance(key, str):
132 self._dummy = get_dummy(key)
134 def __getstate__(self):
135 state = list(self._avl.iter()), self._key
136 return state
138 def __setstate__(self, state):
139 l, key = state
140 self._set_key(key)
141 self._avl = avl.from_iter(iter(l), len(l))
143 def insert(self, value):
144 self._avl.insert(value)
146 def remove(self, value):
147 return avl_remove_exact(self._avl, value)
149 def remove_at(self, i):
150 return self._avl.remove_at(i)
152 def insert_many(self, values):
153 for value in values:
154 self._avl.insert(value)
156 def remove_many(self, values):
157 for value in values:
158 avl_remove_exact(self._avl, value)
160 def __iter__(self):
161 return iter(self._avl)
163 def with_key_in(self, kmin, kmax):
164 omin, omax = self._dummy(kmin), self._dummy(kmax)
165 ilo, ihi = self._avl.span(omin, omax)
166 return self._avl[ilo:ihi]
168 def with_key_in_limited(self, kmin, kmax, nmax):
169 omin, omax = self._dummy(kmin), self._dummy(kmax)
170 ilo, ihi = self._avl.span(omin, omax)
171 if ihi - ilo > nmax:
172 raise TooMany(ihi - ilo)
174 return self._avl[ilo:ihi]
176 def index(self, value):
177 ilo, ihi = self._avl.span(value)
178 for i in range(ilo, ihi):
179 if self._avl[i] is value:
180 return i
182 raise ValueError('element is not in avl tree')
184 def min(self):
185 return self._avl.min()
187 def max(self):
188 return self._avl.max()
190 def __len__(self):
191 return len(self._avl)
193 def __getitem__(self, i):
194 return self._avl[i]
197class TracesFileCache(object):
198 '''
199 Manages trace metainformation cache.
201 For each directory with files containing traces, one cache file is
202 maintained to hold the trace metainformation of all files which are
203 contained in the directory.
204 '''
206 caches = {}
208 def __init__(self, cachedir):
209 '''
210 Create new cache.
212 :param cachedir: directory to hold the cache files.
213 '''
215 self.cachedir = cachedir
216 self.dircaches = {}
217 self.modified = set()
218 util.ensuredir(self.cachedir)
220 def get(self, abspath):
221 '''
222 Try to get an item from the cache.
224 :param abspath: absolute path of the object to retrieve
226 :returns: a stored object is returned or None if nothing could be
227 found.
228 '''
230 dircache = self._get_dircache_for(abspath)
231 if abspath in dircache:
232 return dircache[abspath]
233 return None
235 def put(self, abspath, tfile):
236 '''
237 Put an item into the cache.
239 :param abspath: absolute path of the object to be stored
240 :param tfile: object to be stored
241 '''
243 cachepath = self._dircachepath(abspath)
244 # get lock on cachepath here
245 dircache = self._get_dircache(cachepath)
246 dircache[abspath] = tfile
247 self.modified.add(cachepath)
249 def dump_modified(self):
250 '''
251 Save any modifications to disk.
252 '''
254 for cachepath in self.modified:
255 self._dump_dircache(self.dircaches[cachepath], cachepath)
256 # unlock
258 self.modified = set()
260 def clean(self):
261 '''
262 Weed out missing files from the disk caches.
263 '''
265 self.dump_modified()
267 for fn in os.listdir(self.cachedir):
268 if len(fn) == 40:
269 cache = self._load_dircache(pjoin(self.cachedir, fn))
270 self._dump_dircache(cache, pjoin(self.cachedir, fn))
272 def _get_dircache_for(self, abspath):
273 return self._get_dircache(self._dircachepath(abspath))
275 def _get_dircache(self, cachepath):
276 if cachepath not in self.dircaches:
277 if os.path.isfile(cachepath):
278 self.dircaches[cachepath] = self._load_dircache(cachepath)
279 else:
280 self.dircaches[cachepath] = {}
282 return self.dircaches[cachepath]
284 def _dircachepath(self, abspath):
285 cachefn = ehash(os.path.dirname(abspath))
286 return pjoin(self.cachedir, cachefn)
288 def _load_dircache(self, cachefilename):
290 with open(cachefilename, 'rb') as f:
291 cache = pickle.load(f)
293 # weed out files which no longer exist
294 for fn in list(cache.keys()):
295 if not os.path.isfile(fn):
296 del cache[fn]
298 time_float = util.get_time_float()
300 for v in cache.values():
301 v.trees_from_content(v.traces)
302 for tr in v.traces:
303 tr.file = v
304 # fix Py2 codes to not include unicode when the cache file
305 # was created with Py3
306 if not isinstance(tr.station, str):
307 tr.prune_from_reuse_cache()
308 tr.set_codes(
309 str(tr.network),
310 str(tr.station),
311 str(tr.location),
312 str(tr.channel))
314 tr.tmin = time_float(tr.tmin)
315 tr.tmax = time_float(tr.tmax)
317 v.data_use_count = 0
318 v.data_loaded = False
319 v.fix_unicode_codes()
321 return cache
323 def _dump_dircache(self, cache, cachefilename):
325 if not cache:
326 if os.path.exists(cachefilename):
327 os.remove(cachefilename)
328 return
330 # make a copy without the parents and the binsearch trees
331 cache_copy = {}
332 for fn in cache.keys():
333 trf = copy.copy(cache[fn])
334 trf.parent = None
335 trf.by_tmin = None
336 trf.by_tmax = None
337 trf.by_tlen = None
338 trf.by_mtime = None
339 trf.data_use_count = 0
340 trf.data_loaded = False
341 traces = []
342 for tr in trf.traces:
343 tr = tr.copy(data=False)
344 tr.ydata = None
345 tr.meta = None
346 tr.file = trf
347 traces.append(tr)
349 trf.traces = traces
351 cache_copy[fn] = trf
353 tmpfn = cachefilename+'.%i.tmp' % os.getpid()
354 with open(tmpfn, 'wb') as f:
355 pickle.dump(cache_copy, f, protocol=2)
357 if is_windows and os.path.exists(cachefilename):
358 # windows doesn't allow to rename over existing file
359 os.unlink(cachefilename)
361 os.rename(tmpfn, cachefilename)
364def get_cache(cachedir):
365 '''
366 Get global TracesFileCache object for given directory.
367 '''
368 if cachedir not in TracesFileCache.caches:
369 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir)
371 return TracesFileCache.caches[cachedir]
374def loader(
375 filenames, fileformat, cache, filename_attributes,
376 show_progress=True, update_progress=None):
378 if show_progress_force_off:
379 show_progress = False
381 class Progress(object):
382 def __init__(self, label, n):
383 self._label = label
384 self._n = n
385 self._bar = None
386 if show_progress:
387 self._bar = util.progressbar(label, self._n)
389 if update_progress:
390 update_progress(label, 0, self._n)
392 def update(self, i):
393 if self._bar:
394 if i < self._n-1:
395 self._bar.update(i)
396 else:
397 self._bar.finish()
398 self._bar = None
400 abort = False
401 if update_progress:
402 abort = update_progress(self._label, i, self._n)
404 return abort
406 def finish(self):
407 if self._bar:
408 self._bar.finish()
409 self._bar = None
411 if not filenames:
412 logger.warning('No files to load from')
413 return
415 regex = None
416 if filename_attributes:
417 regex = re.compile(filename_attributes)
419 try:
420 progress = Progress('Looking at files', len(filenames))
422 failures = []
423 to_load = []
424 for i, filename in enumerate(filenames):
425 try:
426 abspath = os.path.abspath(filename)
428 substitutions = None
429 if regex:
430 m = regex.search(filename)
431 if not m:
432 raise FilenameAttributeError(
433 "Cannot get attributes with pattern '%s' "
434 "from path '%s'" % (filename_attributes, filename))
436 substitutions = {}
437 for k in m.groupdict():
438 if k in ('network', 'station', 'location', 'channel'):
439 substitutions[k] = m.groupdict()[k]
441 mtime = os.stat(filename)[8]
442 tfile = None
443 if cache:
444 tfile = cache.get(abspath)
446 mustload = (
447 not tfile or
448 (tfile.format != fileformat and fileformat != 'detect') or
449 tfile.mtime != mtime or
450 substitutions is not None)
452 to_load.append(
453 (mustload, mtime, abspath, substitutions, tfile))
455 except (OSError, FilenameAttributeError) as xerror:
456 failures.append(abspath)
457 logger.warning(xerror)
459 abort = progress.update(i+1)
460 if abort:
461 progress.update(len(filenames))
462 return
464 progress.update(len(filenames))
466 to_load.sort(key=lambda x: x[2])
468 nload = len([1 for x in to_load if x[0]])
469 iload = 0
471 count_all = False
472 if nload < 0.01*len(to_load):
473 nload = len(to_load)
474 count_all = True
476 if to_load:
477 progress = Progress('Scanning files', nload)
479 for (mustload, mtime, abspath, substitutions, tfile) in to_load:
480 try:
481 if mustload:
482 tfile = TracesFile(
483 None, abspath, fileformat,
484 substitutions=substitutions, mtime=mtime)
486 if cache and not substitutions:
487 cache.put(abspath, tfile)
489 if not count_all:
490 iload += 1
492 if count_all:
493 iload += 1
495 except (io.FileLoadError, OSError) as xerror:
496 failures.append(abspath)
497 logger.warning(xerror)
498 else:
499 yield tfile
501 abort = progress.update(iload+1)
502 if abort:
503 break
505 progress.update(nload)
507 if failures:
508 logger.warning(
509 'The following file%s caused problems and will be ignored:\n' %
510 util.plural_s(len(failures)) + '\n'.join(failures))
512 if cache:
513 cache.dump_modified()
514 finally:
515 progress.finish()
518def tlen(x):
519 return x.tmax-x.tmin
522class TracesGroup(object):
524 '''
525 Trace container base class.
527 Base class for Pile, SubPile, and TracesFile, i.e. anything containing
528 a collection of several traces. A TracesGroup object maintains lookup sets
529 of some of the traces meta-information, as well as a combined time-range
530 of its contents.
531 '''
533 def __init__(self, parent):
534 self.parent = parent
535 self.empty()
536 self.nupdates = 0
537 self.abspath = None
539 def set_parent(self, parent):
540 self.parent = parent
542 def get_parent(self):
543 return self.parent
545 def empty(self):
546 self.networks, self.stations, self.locations, self.channels, \
547 self.nslc_ids, self.deltats = [Counter() for x in range(6)]
548 self.by_tmin = Sorted([], 'tmin')
549 self.by_tmax = Sorted([], 'tmax')
550 self.by_tlen = Sorted([], tlen)
551 self.by_mtime = Sorted([], 'mtime')
552 self.tmin, self.tmax = None, None
553 self.deltatmin, self.deltatmax = None, None
555 def trees_from_content(self, content):
556 self.by_tmin = Sorted(content, 'tmin')
557 self.by_tmax = Sorted(content, 'tmax')
558 self.by_tlen = Sorted(content, tlen)
559 self.by_mtime = Sorted(content, 'mtime')
560 self.adjust_minmax()
562 def fix_unicode_codes(self):
563 for net in self.networks:
564 if isinstance(net, str):
565 return
567 self.networks = fix_unicode_copy(self.networks, str)
568 self.stations = fix_unicode_copy(self.stations, str)
569 self.locations = fix_unicode_copy(self.locations, str)
570 self.channels = fix_unicode_copy(self.channels, str)
571 self.nslc_ids = fix_unicode_copy(
572 self.nslc_ids, lambda k: tuple(str(x) for x in k))
574 def add(self, content):
575 '''
576 Add content to traces group and update indices.
578 Accepts :py:class:`pyrocko.trace.Trace` objects and
579 :py:class:`pyrocko.pile.TracesGroup` objects.
580 '''
582 if isinstance(content, (trace.Trace, TracesGroup)):
583 content = [content]
585 for c in content:
587 if isinstance(c, TracesGroup):
588 self.networks.update(c.networks)
589 self.stations.update(c.stations)
590 self.locations.update(c.locations)
591 self.channels.update(c.channels)
592 self.nslc_ids.update(c.nslc_ids)
593 self.deltats.update(c.deltats)
595 self.by_tmin.insert_many(c.by_tmin)
596 self.by_tmax.insert_many(c.by_tmax)
597 self.by_tlen.insert_many(c.by_tlen)
598 self.by_mtime.insert_many(c.by_mtime)
600 elif isinstance(c, trace.Trace):
601 self.networks[c.network] += 1
602 self.stations[c.station] += 1
603 self.locations[c.location] += 1
604 self.channels[c.channel] += 1
605 self.nslc_ids[c.nslc_id] += 1
606 self.deltats[c.deltat] += 1
608 self.by_tmin.insert(c)
609 self.by_tmax.insert(c)
610 self.by_tlen.insert(c)
611 self.by_mtime.insert(c)
613 self.adjust_minmax()
615 self.nupdates += 1
616 self.notify_listeners('add')
618 if self.parent is not None:
619 self.parent.add(content)
621 def remove(self, content):
622 '''
623 Remove content to traces group and update indices.
624 '''
625 if isinstance(content, (trace.Trace, TracesGroup)):
626 content = [content]
628 for c in content:
630 if isinstance(c, TracesGroup):
631 self.networks.subtract(c.networks)
632 self.stations.subtract(c.stations)
633 self.locations.subtract(c.locations)
634 self.channels.subtract(c.channels)
635 self.nslc_ids.subtract(c.nslc_ids)
636 self.deltats.subtract(c.deltats)
638 self.by_tmin.remove_many(c.by_tmin)
639 self.by_tmax.remove_many(c.by_tmax)
640 self.by_tlen.remove_many(c.by_tlen)
641 self.by_mtime.remove_many(c.by_mtime)
643 elif isinstance(c, trace.Trace):
644 self.networks.subtract1(c.network)
645 self.stations.subtract1(c.station)
646 self.locations.subtract1(c.location)
647 self.channels.subtract1(c.channel)
648 self.nslc_ids.subtract1(c.nslc_id)
649 self.deltats.subtract1(c.deltat)
651 self.by_tmin.remove(c)
652 self.by_tmax.remove(c)
653 self.by_tlen.remove(c)
654 self.by_mtime.remove(c)
656 self.adjust_minmax()
658 self.nupdates += 1
659 self.notify_listeners('remove')
661 if self.parent is not None:
662 self.parent.remove(content)
664 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None):
665 '''
666 Return list of :py:class:`pyrocko.trace.Trace` objects where given
667 arguments ``tmin`` and ``tmax`` match.
669 :param tmin: start time
670 :param tmax: end time
671 :param group_selector: lambda expression taking group dict of regex
672 match object as a single argument and which returns true or false
673 to keep or reject a file (default: ``None``)
674 :param trace_selector: lambda expression taking group dict of regex
675 match object as a single argument and which returns true or false
676 to keep or reject a file (default: ``None``)
677 '''
679 if not self.by_tmin or not self.is_relevant(
680 tmin, tmax, group_selector):
682 return []
684 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax)
685 if tr.is_relevant(tmin, tmax, trace_selector)]
687 def adjust_minmax(self):
688 if self.by_tmin:
689 self.tmin = self.by_tmin.min().tmin
690 self.tmax = self.by_tmax.max().tmax
691 t = self.by_tlen.max()
692 self.tlenmax = t.tmax - t.tmin
693 self.mtime = self.by_mtime.max().mtime
694 deltats = list(self.deltats.keys())
695 self.deltatmin = min(deltats)
696 self.deltatmax = max(deltats)
697 else:
698 self.tmin = None
699 self.tmax = None
700 self.tlenmax = None
701 self.mtime = None
702 self.deltatmin = None
703 self.deltatmax = None
705 def notify_listeners(self, what):
706 pass
708 def get_update_count(self):
709 return self.nupdates
711 def overlaps(self, tmin, tmax):
712 return self.tmin is not None \
713 and tmax >= self.tmin and self.tmax >= tmin
715 def is_relevant(self, tmin, tmax, group_selector=None):
716 if self.tmin is None or self.tmax is None:
717 return False
718 return tmax >= self.tmin and self.tmax >= tmin and (
719 group_selector is None or group_selector(self))
722class MemTracesFile(TracesGroup):
724 '''
725 This is needed to make traces without an actual disc file to be inserted
726 into a Pile.
727 '''
729 def __init__(self, parent, traces):
730 TracesGroup.__init__(self, parent)
731 self.add(traces)
732 self.mtime = time.time()
734 def add(self, traces):
735 if isinstance(traces, trace.Trace):
736 traces = [traces]
738 for tr in traces:
739 tr.file = self
741 TracesGroup.add(self, traces)
743 def load_headers(self, mtime=None):
744 pass
746 def load_data(self):
747 pass
749 def use_data(self):
750 pass
752 def drop_data(self):
753 pass
755 def reload_if_modified(self):
756 return False
758 def iter_traces(self):
759 for tr in self.by_tmin:
760 yield tr
762 def get_traces(self):
763 return list(self.by_tmin)
765 def gather_keys(self, gather, selector=None):
766 keys = set()
767 for tr in self.by_tmin:
768 if selector is None or selector(tr):
769 keys.add(gather(tr))
771 return keys
773 def __str__(self):
775 s = 'MemTracesFile\n'
776 s += 'file mtime: %s\n' % util.time_to_str(self.mtime)
777 s += 'number of traces: %i\n' % len(self.by_tmin)
778 s += 'timerange: %s - %s\n' % (
779 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
780 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
781 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
782 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
783 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
784 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
785 return s
788class TracesFile(TracesGroup):
789 def __init__(
790 self, parent, abspath, format,
791 substitutions=None, mtime=None):
793 TracesGroup.__init__(self, parent)
794 self.abspath = abspath
795 self.format = format
796 self.traces = []
797 self.data_loaded = False
798 self.data_use_count = 0
799 self.substitutions = substitutions
800 self.load_headers(mtime=mtime)
801 self.mtime = mtime
803 def load_headers(self, mtime=None):
804 logger.debug('loading headers from file: %s' % self.abspath)
805 if mtime is None:
806 self.mtime = os.stat(self.abspath)[8]
808 def kgen(tr):
809 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id
811 self.remove(self.traces)
812 ks = set()
813 for tr in io.load(self.abspath,
814 format=self.format,
815 getdata=False,
816 substitutions=self.substitutions):
818 k = kgen(tr)
819 if k not in ks:
820 ks.add(k)
821 self.traces.append(tr)
822 tr.file = self
824 self.add(self.traces)
826 self.data_loaded = False
827 self.data_use_count = 0
829 def load_data(self, force=False):
830 file_changed = False
831 if not self.data_loaded or force:
832 logger.debug('loading data from file: %s' % self.abspath)
834 def kgen(tr):
835 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id
837 traces_ = io.load(self.abspath, format=self.format, getdata=True,
838 substitutions=self.substitutions)
840 # prevent adding duplicate snippets from corrupt mseed files
841 k_loaded = set()
842 traces = []
843 for tr in traces_:
844 k = kgen(tr)
845 if k not in k_loaded:
846 k_loaded.add(k)
847 traces.append(tr)
849 k_current_d = dict((kgen(tr), tr) for tr in self.traces)
850 k_current = set(k_current_d)
851 k_new = k_loaded - k_current
852 k_delete = k_current - k_loaded
853 k_unchanged = k_current & k_loaded
855 for tr in self.traces[:]:
856 if kgen(tr) in k_delete:
857 self.remove(tr)
858 self.traces.remove(tr)
859 tr.file = None
860 file_changed = True
862 for tr in traces:
863 if kgen(tr) in k_new:
864 tr.file = self
865 self.traces.append(tr)
866 self.add(tr)
867 file_changed = True
869 for tr in traces:
870 if kgen(tr) in k_unchanged:
871 ctr = k_current_d[kgen(tr)]
872 ctr.ydata = tr.ydata
874 self.data_loaded = True
876 if file_changed:
877 logger.debug('reloaded (file may have changed): %s' % self.abspath)
879 return file_changed
881 def use_data(self):
882 if not self.data_loaded:
883 raise Exception('Data not loaded')
884 self.data_use_count += 1
886 def drop_data(self):
887 if self.data_loaded:
888 if self.data_use_count == 1:
889 logger.debug('forgetting data of file: %s' % self.abspath)
890 for tr in self.traces:
891 tr.drop_data()
893 self.data_loaded = False
895 self.data_use_count -= 1
896 else:
897 self.data_use_count = 0
899 def reload_if_modified(self):
900 mtime = os.stat(self.abspath)[8]
901 if mtime != self.mtime:
902 logger.debug(
903 'mtime=%i, reloading file: %s' % (mtime, self.abspath))
905 self.mtime = mtime
906 if self.data_loaded:
907 self.load_data(force=True)
908 else:
909 self.load_headers()
911 return True
913 return False
915 def iter_traces(self):
916 for tr in self.traces:
917 yield tr
919 def gather_keys(self, gather, selector=None):
920 keys = set()
921 for tr in self.by_tmin:
922 if selector is None or selector(tr):
923 keys.add(gather(tr))
925 return keys
927 def __str__(self):
928 s = 'TracesFile\n'
929 s += 'abspath: %s\n' % self.abspath
930 s += 'file mtime: %s\n' % util.time_to_str(self.mtime)
931 s += 'number of traces: %i\n' % len(self.traces)
932 s += 'timerange: %s - %s\n' % (
933 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
934 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
935 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
936 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
937 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
938 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
939 return s
942class FilenameAttributeError(Exception):
943 pass
946class SubPile(TracesGroup):
947 def __init__(self, parent):
948 TracesGroup.__init__(self, parent)
949 self.files = []
950 self.empty()
952 def add_file(self, file):
953 self.files.append(file)
954 file.set_parent(self)
955 self.add(file)
957 def remove_file(self, file):
958 self.files.remove(file)
959 file.set_parent(None)
960 self.remove(file)
962 def remove_files(self, files):
963 for file in files:
964 self.files.remove(file)
965 file.set_parent(None)
966 self.remove(files)
968 def gather_keys(self, gather, selector=None):
969 keys = set()
970 for file in self.files:
971 keys |= file.gather_keys(gather, selector)
973 return keys
975 def iter_traces(
976 self,
977 load_data=False,
978 return_abspath=False,
979 group_selector=None,
980 trace_selector=None):
982 for file in self.files:
984 if group_selector and not group_selector(file):
985 continue
987 must_drop = False
988 if load_data:
989 file.load_data()
990 file.use_data()
991 must_drop = True
993 for tr in file.iter_traces():
994 if trace_selector and not trace_selector(tr):
995 continue
997 if return_abspath:
998 yield file.abspath, tr
999 else:
1000 yield tr
1002 if must_drop:
1003 file.drop_data()
1005 def iter_files(self):
1006 for file in self.files:
1007 yield file
1009 def reload_modified(self):
1010 modified = False
1011 for file in self.files:
1012 modified |= file.reload_if_modified()
1014 return modified
1016 def __str__(self):
1017 s = 'SubPile\n'
1018 s += 'number of files: %i\n' % len(self.files)
1019 s += 'timerange: %s - %s\n' % (
1020 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
1021 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
1022 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
1023 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
1024 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
1025 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
1026 return s
1029class Batch(object):
1030 '''
1031 Batch of waveforms from window wise data extraction.
1033 Encapsulates state and results yielded for each window in window wise
1034 waveform extraction with the :py:meth:`Pile.chopper` method (when the
1035 `style='batch'` keyword argument set).
1037 *Attributes:*
1039 .. py:attribute:: tmin
1041 Start of this time window.
1043 .. py:attribute:: tmax
1045 End of this time window.
1047 .. py:attribute:: i
1049 Index of this time window in sequence.
1051 .. py:attribute:: n
1053 Total number of time windows in sequence.
1055 .. py:attribute:: traces
1057 Extracted waveforms for this time window.
1058 '''
1060 def __init__(self, tmin, tmax, i, n, traces):
1061 self.tmin = tmin
1062 self.tmax = tmax
1063 self.i = i
1064 self.n = n
1065 self.traces = traces
1068class Pile(TracesGroup):
1069 '''
1070 Waveform archive lookup, data loading and caching infrastructure.
1071 '''
1073 def __init__(self):
1074 TracesGroup.__init__(self, None)
1075 self.subpiles = {}
1076 self.open_files = {}
1077 self.listeners = []
1078 self.abspaths = set()
1080 def add_listener(self, obj):
1081 self.listeners.append(weakref.ref(obj))
1083 def notify_listeners(self, what):
1084 for ref in self.listeners:
1085 obj = ref()
1086 if obj:
1087 obj.pile_changed(what)
1089 def load_files(
1090 self, filenames,
1091 filename_attributes=None,
1092 fileformat='mseed',
1093 cache=None,
1094 show_progress=True,
1095 update_progress=None):
1097 load = loader(
1098 filenames, fileformat, cache, filename_attributes,
1099 show_progress=show_progress,
1100 update_progress=update_progress)
1102 self.add_files(load)
1104 def add_files(self, files):
1105 for file in files:
1106 self.add_file(file)
1108 def add_file(self, file):
1109 if file.abspath is not None and file.abspath in self.abspaths:
1110 logger.warning('File already in pile: %s' % file.abspath)
1111 return
1113 if file.deltatmin is None:
1114 logger.warning('Sampling rate of all traces are zero in file: %s' %
1115 file.abspath)
1116 return
1118 subpile = self.dispatch(file)
1119 subpile.add_file(file)
1120 if file.abspath is not None:
1121 self.abspaths.add(file.abspath)
1123 def remove_file(self, file):
1124 subpile = file.get_parent()
1125 if subpile is not None:
1126 subpile.remove_file(file)
1127 if file.abspath is not None:
1128 self.abspaths.remove(file.abspath)
1130 def remove_files(self, files):
1131 subpile_files = {}
1132 for file in files:
1133 subpile = file.get_parent()
1134 if subpile not in subpile_files:
1135 subpile_files[subpile] = []
1137 subpile_files[subpile].append(file)
1139 for subpile, files in subpile_files.items():
1140 subpile.remove_files(files)
1141 for file in files:
1142 if file.abspath is not None:
1143 self.abspaths.remove(file.abspath)
1145 def dispatch_key(self, file):
1146 dt = int(math.floor(math.log(file.deltatmin)))
1147 return dt
1149 def dispatch(self, file):
1150 k = self.dispatch_key(file)
1151 if k not in self.subpiles:
1152 self.subpiles[k] = SubPile(self)
1154 return self.subpiles[k]
1156 def get_deltats(self):
1157 return list(self.deltats.keys())
1159 def chop(
1160 self, tmin, tmax,
1161 group_selector=None,
1162 trace_selector=None,
1163 snap=(round, round),
1164 include_last=False,
1165 load_data=True):
1167 chopped = []
1168 used_files = set()
1170 traces = self.relevant(tmin, tmax, group_selector, trace_selector)
1171 if load_data:
1172 files_changed = False
1173 for tr in traces:
1174 if tr.file and tr.file not in used_files:
1175 if tr.file.load_data():
1176 files_changed = True
1178 if tr.file is not None:
1179 used_files.add(tr.file)
1181 if files_changed:
1182 traces = self.relevant(
1183 tmin, tmax, group_selector, trace_selector)
1185 for tr in traces:
1186 if not load_data and tr.ydata is not None:
1187 tr = tr.copy(data=False)
1188 tr.ydata = None
1190 try:
1191 chopped.append(tr.chop(
1192 tmin, tmax,
1193 inplace=False,
1194 snap=snap,
1195 include_last=include_last))
1197 except trace.NoData:
1198 pass
1200 return chopped, used_files
1202 def _process_chopped(
1203 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
1204 tpad):
1206 chopped.sort(key=lambda a: a.full_id)
1207 if degap:
1208 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap)
1210 if not want_incomplete:
1211 chopped_weeded = []
1212 for tr in chopped:
1213 emin = tr.tmin - (wmin-tpad)
1214 emax = tr.tmax + tr.deltat - (wmax+tpad)
1215 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
1216 chopped_weeded.append(tr)
1218 elif degap:
1219 if (0. < emin <= 5. * tr.deltat and
1220 -5. * tr.deltat <= emax < 0.):
1222 tr.extend(
1223 wmin-tpad,
1224 wmax+tpad-tr.deltat,
1225 fillmethod='repeat')
1227 chopped_weeded.append(tr)
1229 chopped = chopped_weeded
1231 for tr in chopped:
1232 tr.wmin = wmin
1233 tr.wmax = wmax
1235 return chopped
1237 def chopper(
1238 self,
1239 tmin=None, tmax=None, tinc=None, tpad=0.,
1240 group_selector=None, trace_selector=None,
1241 want_incomplete=True, degap=True, maxgap=5, maxlap=None,
1242 keep_current_files_open=False, accessor_id=None,
1243 snap=(round, round), include_last=False, load_data=True,
1244 style=None):
1246 '''
1247 Get iterator for shifting window wise data extraction from waveform
1248 archive.
1250 :param tmin: start time (default uses start time of available data)
1251 :param tmax: end time (default uses end time of available data)
1252 :param tinc: time increment (window shift time) (default uses
1253 ``tmax-tmin``)
1254 :param tpad: padding time appended on either side of the data windows
1255 (window overlap is ``2*tpad``)
1256 :param group_selector: filter callback taking :py:class:`TracesGroup`
1257 objects
1258 :param trace_selector: filter callback taking
1259 :py:class:`pyrocko.trace.Trace` objects
1260 :param want_incomplete: if set to ``False``, gappy/incomplete traces
1261 are discarded from the results
1262 :param degap: whether to try to connect traces and to remove gaps and
1263 overlaps
1264 :param maxgap: maximum gap size in samples which is filled with
1265 interpolated samples when ``degap`` is ``True``
1266 :param maxlap: maximum overlap size in samples which is removed when
1267 ``degap`` is ``True``
1268 :param keep_current_files_open: whether to keep cached trace data in
1269 memory after the iterator has ended
1270 :param accessor_id: if given, used as a key to identify different
1271 points of extraction for the decision of when to release cached
1272 trace data (should be used when data is alternately extracted from
1273 more than one region / selection)
1274 :param snap: replaces Python's :py:func:`round` function which is used
1275 to determine indices where to start and end the trace data array
1276 :param include_last: whether to include last sample
1277 :param load_data: whether to load the waveform data. If set to
1278 ``False``, traces with no data samples, but with correct
1279 meta-information are returned
1280 :param style: set to ``'batch'`` to yield waveforms and information
1281 about the chopper state as :py:class:`Batch` objects. By default
1282 lists of :py:class:`pyrocko.trace.Trace` objects are yielded.
1283 :returns: iterator providing extracted waveforms for each extracted
1284 window. See ``style`` argument for details.
1285 '''
1286 if tmin is None:
1287 if self.tmin is None:
1288 logger.warning('Pile\'s tmin is not set - pile may be empty.')
1289 return
1290 tmin = self.tmin + tpad
1292 if tmax is None:
1293 if self.tmax is None:
1294 logger.warning('Pile\'s tmax is not set - pile may be empty.')
1295 return
1296 tmax = self.tmax - tpad
1298 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector):
1299 return
1301 if accessor_id not in self.open_files:
1302 self.open_files[accessor_id] = set()
1304 open_files = self.open_files[accessor_id]
1306 if tinc is None:
1307 tinc = tmax - tmin
1308 nwin = 1
1309 else:
1310 eps = tinc * 1e-6
1311 if tinc != 0.0:
1312 nwin = int(((tmax - eps) - tmin) / tinc) + 1
1313 else:
1314 nwin = 1
1316 for iwin in range(nwin):
1317 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
1319 chopped, used_files = self.chop(
1320 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap,
1321 include_last, load_data)
1323 for file in used_files - open_files:
1324 # increment datause counter on newly opened files
1325 file.use_data()
1327 open_files.update(used_files)
1329 processed = self._process_chopped(
1330 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
1331 tpad)
1333 if style == 'batch':
1334 yield Batch(
1335 tmin=wmin,
1336 tmax=wmax,
1337 i=iwin,
1338 n=nwin,
1339 traces=processed)
1341 else:
1342 yield processed
1344 unused_files = open_files - used_files
1346 while unused_files:
1347 file = unused_files.pop()
1348 file.drop_data()
1349 open_files.remove(file)
1351 if not keep_current_files_open:
1352 while open_files:
1353 file = open_files.pop()
1354 file.drop_data()
1356 def all(self, *args, **kwargs):
1357 '''
1358 Shortcut to aggregate :py:meth:`chopper` output into a single list.
1359 '''
1361 alltraces = []
1362 for traces in self.chopper(*args, **kwargs):
1363 alltraces.extend(traces)
1365 return alltraces
1367 def iter_all(self, *args, **kwargs):
1368 for traces in self.chopper(*args, **kwargs):
1369 for tr in traces:
1370 yield tr
1372 def chopper_grouped(self, gather, progress=None, *args, **kwargs):
1373 keys = self.gather_keys(gather)
1374 if len(keys) == 0:
1375 return
1377 outer_group_selector = None
1378 if 'group_selector' in kwargs:
1379 outer_group_selector = kwargs['group_selector']
1381 outer_trace_selector = None
1382 if 'trace_selector' in kwargs:
1383 outer_trace_selector = kwargs['trace_selector']
1385 # the use of this gather-cache makes it impossible to modify the pile
1386 # during chopping
1387 gather_cache = {}
1388 pbar = None
1389 try:
1390 if progress is not None:
1391 pbar = util.progressbar(progress, len(keys))
1393 for ikey, key in enumerate(keys):
1394 def tsel(tr):
1395 return gather(tr) == key and (
1396 outer_trace_selector is None
1397 or outer_trace_selector(tr))
1399 def gsel(gr):
1400 if gr not in gather_cache:
1401 gather_cache[gr] = gr.gather_keys(gather)
1403 return key in gather_cache[gr] and (
1404 outer_group_selector is None
1405 or outer_group_selector(gr))
1407 kwargs['trace_selector'] = tsel
1408 kwargs['group_selector'] = gsel
1410 for traces in self.chopper(*args, **kwargs):
1411 yield traces
1413 if pbar:
1414 pbar.update(ikey+1)
1416 finally:
1417 if pbar:
1418 pbar.finish()
1420 def gather_keys(self, gather, selector=None):
1421 keys = set()
1422 for subpile in self.subpiles.values():
1423 keys |= subpile.gather_keys(gather, selector)
1425 return sorted(keys)
1427 def iter_traces(
1428 self,
1429 load_data=False,
1430 return_abspath=False,
1431 group_selector=None,
1432 trace_selector=None):
1434 '''
1435 Iterate over all traces in pile.
1437 :param load_data: whether to load the waveform data, by default empty
1438 traces are yielded
1439 :param return_abspath: if ``True`` yield tuples containing absolute
1440 file path and :py:class:`pyrocko.trace.Trace` objects
1441 :param group_selector: filter callback taking :py:class:`TracesGroup`
1442 objects
1443 :param trace_selector: filter callback taking
1444 :py:class:`pyrocko.trace.Trace` objects
1446 Example; yields only traces, where the station code is 'HH1'::
1448 test_pile = pile.make_pile('/local/test_trace_directory')
1449 for t in test_pile.iter_traces(
1450 trace_selector=lambda tr: tr.station=='HH1'):
1452 print t
1453 '''
1455 for subpile in self.subpiles.values():
1456 if not group_selector or group_selector(subpile):
1457 for tr in subpile.iter_traces(load_data, return_abspath,
1458 group_selector, trace_selector):
1459 yield tr
1461 def iter_files(self):
1462 for subpile in self.subpiles.values():
1463 for file in subpile.iter_files():
1464 yield file
1466 def reload_modified(self):
1467 modified = False
1468 for subpile in self.subpiles.values():
1469 modified |= subpile.reload_modified()
1471 return modified
1473 def get_tmin(self):
1474 return self.tmin
1476 def get_tmax(self):
1477 return self.tmax
1479 def get_deltatmin(self):
1480 return self.deltatmin
1482 def get_deltatmax(self):
1483 return self.deltatmax
1485 def is_empty(self):
1486 return self.tmin is None and self.tmax is None
1488 def __str__(self):
1489 if self.tmin is not None and self.tmax is not None:
1490 tmin = util.time_to_str(self.tmin)
1491 tmax = util.time_to_str(self.tmax)
1492 s = 'Pile\n'
1493 s += 'number of subpiles: %i\n' % len(self.subpiles)
1494 s += 'timerange: %s - %s\n' % (tmin, tmax)
1495 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
1496 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
1497 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
1498 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
1499 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
1501 else:
1502 s = 'empty Pile'
1504 return s
1506 def snuffle(self, **kwargs):
1507 '''
1508 Visualize it.
1510 :param stations: list of `pyrocko.model.Station` objects or ``None``
1511 :param events: list of `pyrocko.model.Event` objects or ``None``
1512 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None``
1513 :param ntracks: float, number of tracks to be shown initially
1514 (default: 12)
1515 :param follow: time interval (in seconds) for real time follow mode or
1516 ``None``
1517 :param controls: bool, whether to show the main controls (default:
1518 ``True``)
1519 :param opengl: bool, whether to use opengl (default: ``False``)
1520 '''
1522 from pyrocko.gui.snuffler import snuffle
1523 snuffle(self, **kwargs)
1526def make_pile(
1527 paths=None, selector=None, regex=None,
1528 fileformat='mseed',
1529 cachedirname=None, show_progress=True):
1531 '''
1532 Create pile from given file and directory names.
1534 :param paths: filenames and/or directories to look for traces. If paths is
1535 ``None`` ``sys.argv[1:]`` is used.
1536 :param selector: lambda expression taking group dict of regex match object
1537 as a single argument and which returns true or false to keep or reject
1538 a file
1539 :param regex: regular expression which filenames have to match
1540 :param fileformat: format of the files ('mseed', 'sac', 'kan',
1541 'from_extension', 'detect')
1542 :param cachedirname: loader cache is stored under this directory. It is
1543 created as neccessary.
1544 :param show_progress: show progress bar and other progress information
1545 '''
1547 if show_progress_force_off:
1548 show_progress = False
1550 if isinstance(paths, str):
1551 paths = [paths]
1553 if paths is None:
1554 paths = sys.argv[1:]
1556 if cachedirname is None:
1557 cachedirname = config.config().cache_dir
1559 fns = util.select_files(
1560 paths, include=regex, selector=selector, show_progress=show_progress)
1562 cache = get_cache(cachedirname)
1563 p = Pile()
1564 p.load_files(
1565 sorted(fns),
1566 cache=cache,
1567 fileformat=fileformat,
1568 show_progress=show_progress)
1570 return p
1573class Injector(trace.States):
1575 def __init__(
1576 self, pile,
1577 fixation_length=None,
1578 path=None,
1579 format='from_extension',
1580 forget_fixed=False):
1582 trace.States.__init__(self)
1583 self._pile = pile
1584 self._fixation_length = fixation_length
1585 self._format = format
1586 self._path = path
1587 self._forget_fixed = forget_fixed
1589 def set_fixation_length(self, length):
1590 '''
1591 Set length after which the fixation method is called on buffer traces.
1593 The length should be given in seconds. Give None to disable.
1594 '''
1595 self.fixate_all()
1596 self._fixation_length = length # in seconds
1598 def set_save_path(
1599 self,
1600 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_'
1601 '%(tmin)s_%(tmax)s.mseed'):
1603 self.fixate_all()
1604 self._path = path
1606 def inject(self, trace):
1607 logger.debug('Received a trace: %s' % trace)
1609 buf = self.get(trace)
1610 if buf is None:
1611 trbuf = trace.copy()
1612 buf = MemTracesFile(None, [trbuf])
1613 self._pile.add_file(buf)
1614 self.set(trace, buf)
1616 else:
1617 self._pile.remove_file(buf)
1618 trbuf = buf.get_traces()[0]
1619 buf.remove(trbuf)
1620 trbuf.append(trace.ydata)
1621 buf.add(trbuf)
1622 self._pile.add_file(buf)
1623 self.set(trace, buf)
1625 trbuf = buf.get_traces()[0]
1626 if self._fixation_length is not None:
1627 if trbuf.tmax - trbuf.tmin > self._fixation_length:
1628 self._fixate(buf, complete=False)
1630 def fixate_all(self):
1631 for state in list(self._states.values()):
1632 self._fixate(state[-1])
1634 self._states = {}
1636 def free(self, buf):
1637 self._fixate(buf)
1639 def _fixate(self, buf, complete=True):
1640 trbuf = buf.get_traces()[0]
1641 del_state = True
1642 if self._path:
1643 if self._fixation_length is not None:
1644 ttmin = trbuf.tmin
1645 ytmin = util.year_start(ttmin)
1646 n = int(math.floor((ttmin - ytmin) / self._fixation_length))
1647 tmin = ytmin + n*self._fixation_length
1648 traces = []
1649 t = tmin
1650 while t <= trbuf.tmax:
1651 try:
1652 traces.append(
1653 trbuf.chop(
1654 t,
1655 t+self._fixation_length,
1656 inplace=False,
1657 snap=(math.ceil, math.ceil)))
1659 except trace.NoData:
1660 pass
1661 t += self._fixation_length
1663 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \
1664 trbuf.deltat/100. or complete:
1666 self._pile.remove_file(buf)
1668 else: # reinsert incomplete last part
1669 new_trbuf = traces.pop()
1670 self._pile.remove_file(buf)
1671 buf.remove(trbuf)
1672 buf.add(new_trbuf)
1673 self._pile.add_file(buf)
1674 del_state = False
1676 else:
1677 traces = [trbuf]
1678 self._pile.remove_file(buf)
1680 fns = io.save(traces, self._path, format=self._format)
1682 if not self._forget_fixed:
1683 self._pile.load_files(
1684 fns, show_progress=False, fileformat=self._format)
1686 if del_state:
1687 del self._states[trbuf.nslc_id]
1689 def __del__(self):
1690 self.fixate_all()