1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
5from __future__ import absolute_import, division, print_function
7import os
8import logging
9import time
10import weakref
11import copy
12import re
13import sys
14import operator
15import math
16import hashlib
17try:
18 import cPickle as pickle
19except ImportError:
20 import pickle
23from . import avl
24from . import trace, io, util
25from . import config
26from .trace import degapper
29is_windows = sys.platform.startswith('win')
30show_progress_force_off = False
31version_salt = 'v1-'
34def ehash(s):
35 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest()
38def cmp(a, b):
39 return int(a > b) - int(a < b)
42def sl(s):
43 return [str(x) for x in sorted(s)]
46class Counter(dict):
48 def __missing__(self, k):
49 return 0
51 def update(self, other):
52 for k, v in other.items():
53 self[k] += v
55 def subtract(self, other):
56 for k, v in other.items():
57 self[k] -= v
58 if self[k] <= 0:
59 del self[k]
61 def subtract1(self, k):
62 self[k] -= 1
63 if self[k] <= 0:
64 del self[k]
67def fix_unicode_copy(counter, func):
68 counter_new = Counter()
69 for k in counter:
70 counter_new[func(k)] = counter[k]
71 return counter_new
74pjoin = os.path.join
75logger = logging.getLogger('pyrocko.pile')
78def avl_remove_exact(avltree, element):
79 ilo, ihi = avltree.span(element)
80 for i in range(ilo, ihi):
81 if avltree[i] is element:
82 avltree.remove_at(i)
83 return i
85 raise ValueError(
86 'avl_remove_exact(avltree, element): element not in avltree')
89def cmpfunc(key):
90 if isinstance(key, str):
91 # special cases; these run about 50% faster than the generic one on
92 # Python 2.5
93 if key == 'tmin':
94 return lambda a, b: cmp(a.tmin, b.tmin)
95 if key == 'tmax':
96 return lambda a, b: cmp(a.tmax, b.tmax)
98 key = operator.attrgetter(key)
100 return lambda a, b: cmp(key(a), key(b))
103g_dummys = {}
106def get_dummy(key):
107 if key not in g_dummys:
108 class Dummy(object):
109 def __init__(self, k):
110 setattr(self, key, k)
112 g_dummys[key] = Dummy
114 return g_dummys[key]
117class TooMany(Exception):
118 def __init__(self, n):
119 Exception.__init__(self)
120 self.n = n
123class Sorted(object):
124 def __init__(self, values=[], key=None):
125 self._set_key(key)
126 self._avl = avl.new(values, self._cmp)
128 def _set_key(self, key):
129 self._key = key
130 self._cmp = cmpfunc(key)
131 if isinstance(key, str):
132 self._dummy = get_dummy(key)
134 def __getstate__(self):
135 state = list(self._avl.iter()), self._key
136 return state
138 def __setstate__(self, state):
139 l, key = state
140 self._set_key(key)
141 self._avl = avl.from_iter(iter(l), len(l))
143 def insert(self, value):
144 self._avl.insert(value)
146 def remove(self, value):
147 return avl_remove_exact(self._avl, value)
149 def remove_at(self, i):
150 return self._avl.remove_at(i)
152 def insert_many(self, values):
153 for value in values:
154 self._avl.insert(value)
156 def remove_many(self, values):
157 for value in values:
158 avl_remove_exact(self._avl, value)
160 def __iter__(self):
161 return iter(self._avl)
163 def with_key_in(self, kmin, kmax):
164 omin, omax = self._dummy(kmin), self._dummy(kmax)
165 ilo, ihi = self._avl.span(omin, omax)
166 return self._avl[ilo:ihi]
168 def with_key_in_limited(self, kmin, kmax, nmax):
169 omin, omax = self._dummy(kmin), self._dummy(kmax)
170 ilo, ihi = self._avl.span(omin, omax)
171 if ihi - ilo > nmax:
172 raise TooMany(ihi - ilo)
174 return self._avl[ilo:ihi]
176 def index(self, value):
177 ilo, ihi = self._avl.span(value)
178 for i in range(ilo, ihi):
179 if self._avl[i] is value:
180 return i
182 raise ValueError('element is not in avl tree')
184 def min(self):
185 return self._avl.min()
187 def max(self):
188 return self._avl.max()
190 def __len__(self):
191 return len(self._avl)
193 def __getitem__(self, i):
194 return self._avl[i]
197class TracesFileCache(object):
198 '''
199 Manages trace metainformation cache.
201 For each directory with files containing traces, one cache file is
202 maintained to hold the trace metainformation of all files which are
203 contained in the directory.
204 '''
206 caches = {}
208 def __init__(self, cachedir):
209 '''
210 Create new cache.
212 :param cachedir: directory to hold the cache files.
213 '''
215 self.cachedir = cachedir
216 self.dircaches = {}
217 self.modified = set()
218 util.ensuredir(self.cachedir)
220 def get(self, abspath):
221 '''
222 Try to get an item from the cache.
224 :param abspath: absolute path of the object to retrieve
226 :returns: a stored object is returned or None if nothing could be
227 found.
228 '''
230 dircache = self._get_dircache_for(abspath)
231 if abspath in dircache:
232 return dircache[abspath]
233 return None
235 def put(self, abspath, tfile):
236 '''
237 Put an item into the cache.
239 :param abspath: absolute path of the object to be stored
240 :param tfile: object to be stored
241 '''
243 cachepath = self._dircachepath(abspath)
244 # get lock on cachepath here
245 dircache = self._get_dircache(cachepath)
246 dircache[abspath] = tfile
247 self.modified.add(cachepath)
249 def dump_modified(self):
250 '''
251 Save any modifications to disk.
252 '''
254 for cachepath in self.modified:
255 self._dump_dircache(self.dircaches[cachepath], cachepath)
256 # unlock
258 self.modified = set()
260 def clean(self):
261 '''
262 Weed out missing files from the disk caches.
263 '''
265 self.dump_modified()
267 for fn in os.listdir(self.cachedir):
268 if len(fn) == 40:
269 cache = self._load_dircache(pjoin(self.cachedir, fn))
270 self._dump_dircache(cache, pjoin(self.cachedir, fn))
272 def _get_dircache_for(self, abspath):
273 return self._get_dircache(self._dircachepath(abspath))
275 def _get_dircache(self, cachepath):
276 if cachepath not in self.dircaches:
277 if os.path.isfile(cachepath):
278 self.dircaches[cachepath] = self._load_dircache(cachepath)
279 else:
280 self.dircaches[cachepath] = {}
282 return self.dircaches[cachepath]
284 def _dircachepath(self, abspath):
285 cachefn = ehash(os.path.dirname(abspath))
286 return pjoin(self.cachedir, cachefn)
288 def _load_dircache(self, cachefilename):
290 with open(cachefilename, 'rb') as f:
291 cache = pickle.load(f)
293 # weed out files which no longer exist
294 for fn in list(cache.keys()):
295 if not os.path.isfile(fn):
296 del cache[fn]
298 time_float = util.get_time_float()
300 for v in cache.values():
301 v.trees_from_content(v.traces)
302 for tr in v.traces:
303 tr.file = v
304 # fix Py2 codes to not include unicode when the cache file
305 # was created with Py3
306 if not isinstance(tr.station, str):
307 tr.prune_from_reuse_cache()
308 tr.set_codes(
309 str(tr.network),
310 str(tr.station),
311 str(tr.location),
312 str(tr.channel))
314 tr.tmin = time_float(tr.tmin)
315 tr.tmax = time_float(tr.tmax)
317 v.data_use_count = 0
318 v.data_loaded = False
319 v.fix_unicode_codes()
321 return cache
323 def _dump_dircache(self, cache, cachefilename):
325 if not cache:
326 if os.path.exists(cachefilename):
327 os.remove(cachefilename)
328 return
330 # make a copy without the parents and the binsearch trees
331 cache_copy = {}
332 for fn in cache.keys():
333 trf = copy.copy(cache[fn])
334 trf.parent = None
335 trf.by_tmin = None
336 trf.by_tmax = None
337 trf.by_tlen = None
338 trf.by_mtime = None
339 trf.data_use_count = 0
340 trf.data_loaded = False
341 traces = []
342 for tr in trf.traces:
343 tr = tr.copy(data=False)
344 tr.ydata = None
345 tr.meta = None
346 tr.file = trf
347 traces.append(tr)
349 trf.traces = traces
351 cache_copy[fn] = trf
353 tmpfn = cachefilename+'.%i.tmp' % os.getpid()
354 with open(tmpfn, 'wb') as f:
355 pickle.dump(cache_copy, f, protocol=2)
357 if is_windows and os.path.exists(cachefilename):
358 # windows doesn't allow to rename over existing file
359 os.unlink(cachefilename)
361 os.rename(tmpfn, cachefilename)
364def get_cache(cachedir):
365 '''
366 Get global TracesFileCache object for given directory.
367 '''
368 if cachedir not in TracesFileCache.caches:
369 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir)
371 return TracesFileCache.caches[cachedir]
374def loader(
375 filenames, fileformat, cache, filename_attributes,
376 show_progress=True, update_progress=None):
378 if show_progress_force_off:
379 show_progress = False
381 class Progress(object):
382 def __init__(self, label, n):
383 self._label = label
384 self._n = n
385 self._bar = None
386 if show_progress:
387 self._bar = util.progressbar(label, self._n)
389 if update_progress:
390 update_progress(label, 0, self._n)
392 def update(self, i):
393 if self._bar:
394 if i < self._n-1:
395 self._bar.update(i)
396 else:
397 self._bar.finish()
398 self._bar = None
400 abort = False
401 if update_progress:
402 abort = update_progress(self._label, i, self._n)
404 return abort
406 if not filenames:
407 logger.warning('No files to load from')
408 return
410 regex = None
411 if filename_attributes:
412 regex = re.compile(filename_attributes)
414 progress = Progress('Looking at files', len(filenames))
416 failures = []
417 to_load = []
418 for i, filename in enumerate(filenames):
419 try:
420 abspath = os.path.abspath(filename)
422 substitutions = None
423 if regex:
424 m = regex.search(filename)
425 if not m:
426 raise FilenameAttributeError(
427 "Cannot get attributes with pattern '%s' "
428 "from path '%s'" % (filename_attributes, filename))
430 substitutions = {}
431 for k in m.groupdict():
432 if k in ('network', 'station', 'location', 'channel'):
433 substitutions[k] = m.groupdict()[k]
435 mtime = os.stat(filename)[8]
436 tfile = None
437 if cache:
438 tfile = cache.get(abspath)
440 mustload = (
441 not tfile or
442 (tfile.format != fileformat and fileformat != 'detect') or
443 tfile.mtime != mtime or
444 substitutions is not None)
446 to_load.append((mustload, mtime, abspath, substitutions, tfile))
448 except (OSError, FilenameAttributeError) as xerror:
449 failures.append(abspath)
450 logger.warning(xerror)
452 abort = progress.update(i+1)
453 if abort:
454 progress.update(len(filenames))
455 return
457 progress.update(len(filenames))
459 to_load.sort(key=lambda x: x[2])
461 nload = len([1 for x in to_load if x[0]])
462 iload = 0
464 count_all = False
465 if nload < 0.01*len(to_load):
466 nload = len(to_load)
467 count_all = True
469 if to_load:
470 progress = Progress('Scanning files', nload)
472 for (mustload, mtime, abspath, substitutions, tfile) in to_load:
473 try:
474 if mustload:
475 tfile = TracesFile(
476 None, abspath, fileformat,
477 substitutions=substitutions, mtime=mtime)
479 if cache and not substitutions:
480 cache.put(abspath, tfile)
482 if not count_all:
483 iload += 1
485 if count_all:
486 iload += 1
488 except (io.FileLoadError, OSError) as xerror:
489 failures.append(abspath)
490 logger.warning(xerror)
491 else:
492 yield tfile
494 abort = progress.update(iload+1)
495 if abort:
496 break
498 progress.update(nload)
500 if failures:
501 logger.warning(
502 'The following file%s caused problems and will be ignored:\n' %
503 util.plural_s(len(failures)) + '\n'.join(failures))
505 if cache:
506 cache.dump_modified()
509def tlen(x):
510 return x.tmax-x.tmin
513class TracesGroup(object):
515 '''
516 Trace container base class.
518 Base class for Pile, SubPile, and TracesFile, i.e. anything containing
519 a collection of several traces. A TracesGroup object maintains lookup sets
520 of some of the traces meta-information, as well as a combined time-range
521 of its contents.
522 '''
524 def __init__(self, parent):
525 self.parent = parent
526 self.empty()
527 self.nupdates = 0
528 self.abspath = None
530 def set_parent(self, parent):
531 self.parent = parent
533 def get_parent(self):
534 return self.parent
536 def empty(self):
537 self.networks, self.stations, self.locations, self.channels, \
538 self.nslc_ids, self.deltats = [Counter() for x in range(6)]
539 self.by_tmin = Sorted([], 'tmin')
540 self.by_tmax = Sorted([], 'tmax')
541 self.by_tlen = Sorted([], tlen)
542 self.by_mtime = Sorted([], 'mtime')
543 self.tmin, self.tmax = None, None
544 self.deltatmin, self.deltatmax = None, None
546 def trees_from_content(self, content):
547 self.by_tmin = Sorted(content, 'tmin')
548 self.by_tmax = Sorted(content, 'tmax')
549 self.by_tlen = Sorted(content, tlen)
550 self.by_mtime = Sorted(content, 'mtime')
551 self.adjust_minmax()
553 def fix_unicode_codes(self):
554 for net in self.networks:
555 if isinstance(net, str):
556 return
558 self.networks = fix_unicode_copy(self.networks, str)
559 self.stations = fix_unicode_copy(self.stations, str)
560 self.locations = fix_unicode_copy(self.locations, str)
561 self.channels = fix_unicode_copy(self.channels, str)
562 self.nslc_ids = fix_unicode_copy(
563 self.nslc_ids, lambda k: tuple(str(x) for x in k))
565 def add(self, content):
566 '''
567 Add content to traces group and update indices.
569 Accepts :py:class:`pyrocko.trace.Trace` objects and
570 :py:class:`pyrocko.pile.TracesGroup` objects.
571 '''
573 if isinstance(content, (trace.Trace, TracesGroup)):
574 content = [content]
576 for c in content:
578 if isinstance(c, TracesGroup):
579 self.networks.update(c.networks)
580 self.stations.update(c.stations)
581 self.locations.update(c.locations)
582 self.channels.update(c.channels)
583 self.nslc_ids.update(c.nslc_ids)
584 self.deltats.update(c.deltats)
586 self.by_tmin.insert_many(c.by_tmin)
587 self.by_tmax.insert_many(c.by_tmax)
588 self.by_tlen.insert_many(c.by_tlen)
589 self.by_mtime.insert_many(c.by_mtime)
591 elif isinstance(c, trace.Trace):
592 self.networks[c.network] += 1
593 self.stations[c.station] += 1
594 self.locations[c.location] += 1
595 self.channels[c.channel] += 1
596 self.nslc_ids[c.nslc_id] += 1
597 self.deltats[c.deltat] += 1
599 self.by_tmin.insert(c)
600 self.by_tmax.insert(c)
601 self.by_tlen.insert(c)
602 self.by_mtime.insert(c)
604 self.adjust_minmax()
606 self.nupdates += 1
607 self.notify_listeners('add')
609 if self.parent is not None:
610 self.parent.add(content)
612 def remove(self, content):
613 '''
614 Remove content to traces group and update indices.
615 '''
616 if isinstance(content, (trace.Trace, TracesGroup)):
617 content = [content]
619 for c in content:
621 if isinstance(c, TracesGroup):
622 self.networks.subtract(c.networks)
623 self.stations.subtract(c.stations)
624 self.locations.subtract(c.locations)
625 self.channels.subtract(c.channels)
626 self.nslc_ids.subtract(c.nslc_ids)
627 self.deltats.subtract(c.deltats)
629 self.by_tmin.remove_many(c.by_tmin)
630 self.by_tmax.remove_many(c.by_tmax)
631 self.by_tlen.remove_many(c.by_tlen)
632 self.by_mtime.remove_many(c.by_mtime)
634 elif isinstance(c, trace.Trace):
635 self.networks.subtract1(c.network)
636 self.stations.subtract1(c.station)
637 self.locations.subtract1(c.location)
638 self.channels.subtract1(c.channel)
639 self.nslc_ids.subtract1(c.nslc_id)
640 self.deltats.subtract1(c.deltat)
642 self.by_tmin.remove(c)
643 self.by_tmax.remove(c)
644 self.by_tlen.remove(c)
645 self.by_mtime.remove(c)
647 self.adjust_minmax()
649 self.nupdates += 1
650 self.notify_listeners('remove')
652 if self.parent is not None:
653 self.parent.remove(content)
655 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None):
656 '''
657 Return list of :py:class:`pyrocko.trace.Trace` objects where given
658 arguments ``tmin`` and ``tmax`` match.
660 :param tmin: start time
661 :param tmax: end time
662 :param group_selector: lambda expression taking group dict of regex
663 match object as a single argument and which returns true or false
664 to keep or reject a file (default: ``None``)
665 :param trace_selector: lambda expression taking group dict of regex
666 match object as a single argument and which returns true or false
667 to keep or reject a file (default: ``None``)
668 '''
670 if not self.by_tmin or not self.is_relevant(
671 tmin, tmax, group_selector):
673 return []
675 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax)
676 if tr.is_relevant(tmin, tmax, trace_selector)]
678 def adjust_minmax(self):
679 if self.by_tmin:
680 self.tmin = self.by_tmin.min().tmin
681 self.tmax = self.by_tmax.max().tmax
682 t = self.by_tlen.max()
683 self.tlenmax = t.tmax - t.tmin
684 self.mtime = self.by_mtime.max().mtime
685 deltats = list(self.deltats.keys())
686 self.deltatmin = min(deltats)
687 self.deltatmax = max(deltats)
688 else:
689 self.tmin = None
690 self.tmax = None
691 self.tlenmax = None
692 self.mtime = None
693 self.deltatmin = None
694 self.deltatmax = None
696 def notify_listeners(self, what):
697 pass
699 def get_update_count(self):
700 return self.nupdates
702 def overlaps(self, tmin, tmax):
703 return self.tmin is not None \
704 and tmax >= self.tmin and self.tmax >= tmin
706 def is_relevant(self, tmin, tmax, group_selector=None):
707 if self.tmin is None or self.tmax is None:
708 return False
709 return tmax >= self.tmin and self.tmax >= tmin and (
710 group_selector is None or group_selector(self))
713class MemTracesFile(TracesGroup):
715 '''
716 This is needed to make traces without an actual disc file to be inserted
717 into a Pile.
718 '''
720 def __init__(self, parent, traces):
721 TracesGroup.__init__(self, parent)
722 self.add(traces)
723 self.mtime = time.time()
725 def add(self, traces):
726 if isinstance(traces, trace.Trace):
727 traces = [traces]
729 for tr in traces:
730 tr.file = self
732 TracesGroup.add(self, traces)
734 def load_headers(self, mtime=None):
735 pass
737 def load_data(self):
738 pass
740 def use_data(self):
741 pass
743 def drop_data(self):
744 pass
746 def reload_if_modified(self):
747 return False
749 def iter_traces(self):
750 for tr in self.by_tmin:
751 yield tr
753 def get_traces(self):
754 return list(self.by_tmin)
756 def gather_keys(self, gather, selector=None):
757 keys = set()
758 for tr in self.by_tmin:
759 if selector is None or selector(tr):
760 keys.add(gather(tr))
762 return keys
764 def __str__(self):
766 s = 'MemTracesFile\n'
767 s += 'file mtime: %s\n' % util.time_to_str(self.mtime)
768 s += 'number of traces: %i\n' % len(self.by_tmin)
769 s += 'timerange: %s - %s\n' % (
770 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
771 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
772 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
773 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
774 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
775 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
776 return s
779class TracesFile(TracesGroup):
780 def __init__(
781 self, parent, abspath, format,
782 substitutions=None, mtime=None):
784 TracesGroup.__init__(self, parent)
785 self.abspath = abspath
786 self.format = format
787 self.traces = []
788 self.data_loaded = False
789 self.data_use_count = 0
790 self.substitutions = substitutions
791 self.load_headers(mtime=mtime)
792 self.mtime = mtime
794 def load_headers(self, mtime=None):
795 logger.debug('loading headers from file: %s' % self.abspath)
796 if mtime is None:
797 self.mtime = os.stat(self.abspath)[8]
799 def kgen(tr):
800 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id
802 self.remove(self.traces)
803 ks = set()
804 for tr in io.load(self.abspath,
805 format=self.format,
806 getdata=False,
807 substitutions=self.substitutions):
809 k = kgen(tr)
810 if k not in ks:
811 ks.add(k)
812 self.traces.append(tr)
813 tr.file = self
815 self.add(self.traces)
817 self.data_loaded = False
818 self.data_use_count = 0
820 def load_data(self, force=False):
821 file_changed = False
822 if not self.data_loaded or force:
823 logger.debug('loading data from file: %s' % self.abspath)
825 def kgen(tr):
826 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id
828 traces_ = io.load(self.abspath, format=self.format, getdata=True,
829 substitutions=self.substitutions)
831 # prevent adding duplicate snippets from corrupt mseed files
832 k_loaded = set()
833 traces = []
834 for tr in traces_:
835 k = kgen(tr)
836 if k not in k_loaded:
837 k_loaded.add(k)
838 traces.append(tr)
840 k_current_d = dict((kgen(tr), tr) for tr in self.traces)
841 k_current = set(k_current_d)
842 k_new = k_loaded - k_current
843 k_delete = k_current - k_loaded
844 k_unchanged = k_current & k_loaded
846 for tr in self.traces[:]:
847 if kgen(tr) in k_delete:
848 self.remove(tr)
849 self.traces.remove(tr)
850 tr.file = None
851 file_changed = True
853 for tr in traces:
854 if kgen(tr) in k_new:
855 tr.file = self
856 self.traces.append(tr)
857 self.add(tr)
858 file_changed = True
860 for tr in traces:
861 if kgen(tr) in k_unchanged:
862 ctr = k_current_d[kgen(tr)]
863 ctr.ydata = tr.ydata
865 self.data_loaded = True
867 if file_changed:
868 logger.debug('reloaded (file may have changed): %s' % self.abspath)
870 return file_changed
872 def use_data(self):
873 if not self.data_loaded:
874 raise Exception('Data not loaded')
875 self.data_use_count += 1
877 def drop_data(self):
878 if self.data_loaded:
879 if self.data_use_count == 1:
880 logger.debug('forgetting data of file: %s' % self.abspath)
881 for tr in self.traces:
882 tr.drop_data()
884 self.data_loaded = False
886 self.data_use_count -= 1
887 else:
888 self.data_use_count = 0
890 def reload_if_modified(self):
891 mtime = os.stat(self.abspath)[8]
892 if mtime != self.mtime:
893 logger.debug(
894 'mtime=%i, reloading file: %s' % (mtime, self.abspath))
896 self.mtime = mtime
897 if self.data_loaded:
898 self.load_data(force=True)
899 else:
900 self.load_headers()
902 return True
904 return False
906 def iter_traces(self):
907 for tr in self.traces:
908 yield tr
910 def gather_keys(self, gather, selector=None):
911 keys = set()
912 for tr in self.by_tmin:
913 if selector is None or selector(tr):
914 keys.add(gather(tr))
916 return keys
918 def __str__(self):
919 s = 'TracesFile\n'
920 s += 'abspath: %s\n' % self.abspath
921 s += 'file mtime: %s\n' % util.time_to_str(self.mtime)
922 s += 'number of traces: %i\n' % len(self.traces)
923 s += 'timerange: %s - %s\n' % (
924 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
925 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
926 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
927 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
928 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
929 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
930 return s
933class FilenameAttributeError(Exception):
934 pass
937class SubPile(TracesGroup):
938 def __init__(self, parent):
939 TracesGroup.__init__(self, parent)
940 self.files = []
941 self.empty()
943 def add_file(self, file):
944 self.files.append(file)
945 file.set_parent(self)
946 self.add(file)
948 def remove_file(self, file):
949 self.files.remove(file)
950 file.set_parent(None)
951 self.remove(file)
953 def remove_files(self, files):
954 for file in files:
955 self.files.remove(file)
956 file.set_parent(None)
957 self.remove(files)
959 def gather_keys(self, gather, selector=None):
960 keys = set()
961 for file in self.files:
962 keys |= file.gather_keys(gather, selector)
964 return keys
966 def iter_traces(
967 self,
968 load_data=False,
969 return_abspath=False,
970 group_selector=None,
971 trace_selector=None):
973 for file in self.files:
975 if group_selector and not group_selector(file):
976 continue
978 must_drop = False
979 if load_data:
980 file.load_data()
981 file.use_data()
982 must_drop = True
984 for tr in file.iter_traces():
985 if trace_selector and not trace_selector(tr):
986 continue
988 if return_abspath:
989 yield file.abspath, tr
990 else:
991 yield tr
993 if must_drop:
994 file.drop_data()
996 def iter_files(self):
997 for file in self.files:
998 yield file
1000 def reload_modified(self):
1001 modified = False
1002 for file in self.files:
1003 modified |= file.reload_if_modified()
1005 return modified
1007 def __str__(self):
1008 s = 'SubPile\n'
1009 s += 'number of files: %i\n' % len(self.files)
1010 s += 'timerange: %s - %s\n' % (
1011 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
1012 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
1013 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
1014 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
1015 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
1016 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
1017 return s
1020class Batch(object):
1021 '''
1022 Batch of waveforms from window wise data extraction.
1024 Encapsulates state and results yielded for each window in window wise
1025 waveform extraction with the :py:meth:`Pile.chopper` method (when the
1026 `style='batch'` keyword argument set).
1028 *Attributes:*
1030 .. py:attribute:: tmin
1032 Start of this time window.
1034 .. py:attribute:: tmax
1036 End of this time window.
1038 .. py:attribute:: i
1040 Index of this time window in sequence.
1042 .. py:attribute:: n
1044 Total number of time windows in sequence.
1046 .. py:attribute:: traces
1048 Extracted waveforms for this time window.
1049 '''
1051 def __init__(self, tmin, tmax, i, n, traces):
1052 self.tmin = tmin
1053 self.tmax = tmax
1054 self.i = i
1055 self.n = n
1056 self.traces = traces
1059class Pile(TracesGroup):
1060 '''
1061 Waveform archive lookup, data loading and caching infrastructure.
1062 '''
1064 def __init__(self):
1065 TracesGroup.__init__(self, None)
1066 self.subpiles = {}
1067 self.open_files = {}
1068 self.listeners = []
1069 self.abspaths = set()
1071 def add_listener(self, obj):
1072 self.listeners.append(weakref.ref(obj))
1074 def notify_listeners(self, what):
1075 for ref in self.listeners:
1076 obj = ref()
1077 if obj:
1078 obj.pile_changed(what)
1080 def load_files(
1081 self, filenames,
1082 filename_attributes=None,
1083 fileformat='mseed',
1084 cache=None,
1085 show_progress=True,
1086 update_progress=None):
1088 load = loader(
1089 filenames, fileformat, cache, filename_attributes,
1090 show_progress=show_progress,
1091 update_progress=update_progress)
1093 self.add_files(load)
1095 def add_files(self, files):
1096 for file in files:
1097 self.add_file(file)
1099 def add_file(self, file):
1100 if file.abspath is not None and file.abspath in self.abspaths:
1101 logger.warning('File already in pile: %s' % file.abspath)
1102 return
1104 if file.deltatmin is None:
1105 logger.warning('Sampling rate of all traces are zero in file: %s' %
1106 file.abspath)
1107 return
1109 subpile = self.dispatch(file)
1110 subpile.add_file(file)
1111 if file.abspath is not None:
1112 self.abspaths.add(file.abspath)
1114 def remove_file(self, file):
1115 subpile = file.get_parent()
1116 if subpile is not None:
1117 subpile.remove_file(file)
1118 if file.abspath is not None:
1119 self.abspaths.remove(file.abspath)
1121 def remove_files(self, files):
1122 subpile_files = {}
1123 for file in files:
1124 subpile = file.get_parent()
1125 if subpile not in subpile_files:
1126 subpile_files[subpile] = []
1128 subpile_files[subpile].append(file)
1130 for subpile, files in subpile_files.items():
1131 subpile.remove_files(files)
1132 for file in files:
1133 if file.abspath is not None:
1134 self.abspaths.remove(file.abspath)
1136 def dispatch_key(self, file):
1137 dt = int(math.floor(math.log(file.deltatmin)))
1138 return dt
1140 def dispatch(self, file):
1141 k = self.dispatch_key(file)
1142 if k not in self.subpiles:
1143 self.subpiles[k] = SubPile(self)
1145 return self.subpiles[k]
1147 def get_deltats(self):
1148 return list(self.deltats.keys())
1150 def chop(
1151 self, tmin, tmax,
1152 group_selector=None,
1153 trace_selector=None,
1154 snap=(round, round),
1155 include_last=False,
1156 load_data=True):
1158 chopped = []
1159 used_files = set()
1161 traces = self.relevant(tmin, tmax, group_selector, trace_selector)
1162 if load_data:
1163 files_changed = False
1164 for tr in traces:
1165 if tr.file and tr.file not in used_files:
1166 if tr.file.load_data():
1167 files_changed = True
1169 if tr.file is not None:
1170 used_files.add(tr.file)
1172 if files_changed:
1173 traces = self.relevant(
1174 tmin, tmax, group_selector, trace_selector)
1176 for tr in traces:
1177 if not load_data and tr.ydata is not None:
1178 tr = tr.copy(data=False)
1179 tr.ydata = None
1181 try:
1182 chopped.append(tr.chop(
1183 tmin, tmax,
1184 inplace=False,
1185 snap=snap,
1186 include_last=include_last))
1188 except trace.NoData:
1189 pass
1191 return chopped, used_files
1193 def _process_chopped(
1194 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
1195 tpad):
1197 chopped.sort(key=lambda a: a.full_id)
1198 if degap:
1199 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap)
1201 if not want_incomplete:
1202 chopped_weeded = []
1203 for tr in chopped:
1204 emin = tr.tmin - (wmin-tpad)
1205 emax = tr.tmax + tr.deltat - (wmax+tpad)
1206 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
1207 chopped_weeded.append(tr)
1209 elif degap:
1210 if (0. < emin <= 5. * tr.deltat and
1211 -5. * tr.deltat <= emax < 0.):
1213 tr.extend(
1214 wmin-tpad,
1215 wmax+tpad-tr.deltat,
1216 fillmethod='repeat')
1218 chopped_weeded.append(tr)
1220 chopped = chopped_weeded
1222 for tr in chopped:
1223 tr.wmin = wmin
1224 tr.wmax = wmax
1226 return chopped
1228 def chopper(
1229 self,
1230 tmin=None, tmax=None, tinc=None, tpad=0.,
1231 group_selector=None, trace_selector=None,
1232 want_incomplete=True, degap=True, maxgap=5, maxlap=None,
1233 keep_current_files_open=False, accessor_id=None,
1234 snap=(round, round), include_last=False, load_data=True,
1235 style=None):
1237 '''
1238 Get iterator for shifting window wise data extraction from waveform
1239 archive.
1241 :param tmin: start time (default uses start time of available data)
1242 :param tmax: end time (default uses end time of available data)
1243 :param tinc: time increment (window shift time) (default uses
1244 ``tmax-tmin``)
1245 :param tpad: padding time appended on either side of the data windows
1246 (window overlap is ``2*tpad``)
1247 :param group_selector: filter callback taking :py:class:`TracesGroup`
1248 objects
1249 :param trace_selector: filter callback taking
1250 :py:class:`pyrocko.trace.Trace` objects
1251 :param want_incomplete: if set to ``False``, gappy/incomplete traces
1252 are discarded from the results
1253 :param degap: whether to try to connect traces and to remove gaps and
1254 overlaps
1255 :param maxgap: maximum gap size in samples which is filled with
1256 interpolated samples when ``degap`` is ``True``
1257 :param maxlap: maximum overlap size in samples which is removed when
1258 ``degap`` is ``True``
1259 :param keep_current_files_open: whether to keep cached trace data in
1260 memory after the iterator has ended
1261 :param accessor_id: if given, used as a key to identify different
1262 points of extraction for the decision of when to release cached
1263 trace data (should be used when data is alternately extracted from
1264 more than one region / selection)
1265 :param snap: replaces Python's :py:func:`round` function which is used
1266 to determine indices where to start and end the trace data array
1267 :param include_last: whether to include last sample
1268 :param load_data: whether to load the waveform data. If set to
1269 ``False``, traces with no data samples, but with correct
1270 meta-information are returned
1271 :param style: set to ``'batch'`` to yield waveforms and information
1272 about the chopper state as :py:class:`Batch` objects. By default
1273 lists of :py:class:`pyrocko.trace.Trace` objects are yielded.
1274 :returns: iterator providing extracted waveforms for each extracted
1275 window. See ``style`` argument for details.
1276 '''
1277 if tmin is None:
1278 if self.tmin is None:
1279 logger.warning('Pile\'s tmin is not set - pile may be empty.')
1280 return
1281 tmin = self.tmin + tpad
1283 if tmax is None:
1284 if self.tmax is None:
1285 logger.warning('Pile\'s tmax is not set - pile may be empty.')
1286 return
1287 tmax = self.tmax - tpad
1289 if tinc is None:
1290 tinc = tmax - tmin
1292 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector):
1293 return
1295 if accessor_id not in self.open_files:
1296 self.open_files[accessor_id] = set()
1298 open_files = self.open_files[accessor_id]
1300 eps = tinc * 1e-6
1301 if tinc != 0.0:
1302 nwin = int(((tmax - eps) - tmin) / tinc) + 1
1303 else:
1304 nwin = 1
1306 for iwin in range(nwin):
1307 chopped = []
1308 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
1310 chopped, used_files = self.chop(
1311 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap,
1312 include_last, load_data)
1314 for file in used_files - open_files:
1315 # increment datause counter on newly opened files
1316 file.use_data()
1318 open_files.update(used_files)
1320 processed = self._process_chopped(
1321 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
1322 tpad)
1324 if style == 'batch':
1325 yield Batch(
1326 tmin=wmin,
1327 tmax=wmax,
1328 i=iwin,
1329 n=nwin,
1330 traces=processed)
1332 else:
1333 yield processed
1335 unused_files = open_files - used_files
1337 while unused_files:
1338 file = unused_files.pop()
1339 file.drop_data()
1340 open_files.remove(file)
1342 if not keep_current_files_open:
1343 while open_files:
1344 file = open_files.pop()
1345 file.drop_data()
1347 def all(self, *args, **kwargs):
1348 '''
1349 Shortcut to aggregate :py:meth:`chopper` output into a single list.
1350 '''
1352 alltraces = []
1353 for traces in self.chopper(*args, **kwargs):
1354 alltraces.extend(traces)
1356 return alltraces
1358 def iter_all(self, *args, **kwargs):
1359 for traces in self.chopper(*args, **kwargs):
1360 for tr in traces:
1361 yield tr
1363 def chopper_grouped(self, gather, progress=None, *args, **kwargs):
1364 keys = self.gather_keys(gather)
1365 if len(keys) == 0:
1366 return
1368 outer_group_selector = None
1369 if 'group_selector' in kwargs:
1370 outer_group_selector = kwargs['group_selector']
1372 outer_trace_selector = None
1373 if 'trace_selector' in kwargs:
1374 outer_trace_selector = kwargs['trace_selector']
1376 # the use of this gather-cache makes it impossible to modify the pile
1377 # during chopping
1378 gather_cache = {}
1379 pbar = None
1380 if progress is not None:
1381 pbar = util.progressbar(progress, len(keys))
1383 for ikey, key in enumerate(keys):
1384 def tsel(tr):
1385 return gather(tr) == key and (outer_trace_selector is None or
1386 outer_trace_selector(tr))
1388 def gsel(gr):
1389 if gr not in gather_cache:
1390 gather_cache[gr] = gr.gather_keys(gather)
1392 return key in gather_cache[gr] and (
1393 outer_group_selector is None or outer_group_selector(gr))
1395 kwargs['trace_selector'] = tsel
1396 kwargs['group_selector'] = gsel
1398 for traces in self.chopper(*args, **kwargs):
1399 yield traces
1401 if pbar:
1402 pbar.update(ikey+1)
1404 if pbar:
1405 pbar.finish()
1407 def gather_keys(self, gather, selector=None):
1408 keys = set()
1409 for subpile in self.subpiles.values():
1410 keys |= subpile.gather_keys(gather, selector)
1412 return sorted(keys)
1414 def iter_traces(
1415 self,
1416 load_data=False,
1417 return_abspath=False,
1418 group_selector=None,
1419 trace_selector=None):
1421 '''
1422 Iterate over all traces in pile.
1424 :param load_data: whether to load the waveform data, by default empty
1425 traces are yielded
1426 :param return_abspath: if ``True`` yield tuples containing absolute
1427 file path and :py:class:`pyrocko.trace.Trace` objects
1428 :param group_selector: filter callback taking :py:class:`TracesGroup`
1429 objects
1430 :param trace_selector: filter callback taking
1431 :py:class:`pyrocko.trace.Trace` objects
1433 Example; yields only traces, where the station code is 'HH1'::
1435 test_pile = pile.make_pile('/local/test_trace_directory')
1436 for t in test_pile.iter_traces(
1437 trace_selector=lambda tr: tr.station=='HH1'):
1439 print t
1440 '''
1442 for subpile in self.subpiles.values():
1443 if not group_selector or group_selector(subpile):
1444 for tr in subpile.iter_traces(load_data, return_abspath,
1445 group_selector, trace_selector):
1446 yield tr
1448 def iter_files(self):
1449 for subpile in self.subpiles.values():
1450 for file in subpile.iter_files():
1451 yield file
1453 def reload_modified(self):
1454 modified = False
1455 for subpile in self.subpiles.values():
1456 modified |= subpile.reload_modified()
1458 return modified
1460 def get_tmin(self):
1461 return self.tmin
1463 def get_tmax(self):
1464 return self.tmax
1466 def get_deltatmin(self):
1467 return self.deltatmin
1469 def get_deltatmax(self):
1470 return self.deltatmax
1472 def is_empty(self):
1473 return self.tmin is None and self.tmax is None
1475 def __str__(self):
1476 if self.tmin is not None and self.tmax is not None:
1477 tmin = util.time_to_str(self.tmin)
1478 tmax = util.time_to_str(self.tmax)
1479 s = 'Pile\n'
1480 s += 'number of subpiles: %i\n' % len(self.subpiles)
1481 s += 'timerange: %s - %s\n' % (tmin, tmax)
1482 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
1483 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
1484 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
1485 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
1486 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
1488 else:
1489 s = 'empty Pile'
1491 return s
1493 def snuffle(self, **kwargs):
1494 '''
1495 Visualize it.
1497 :param stations: list of `pyrocko.model.Station` objects or ``None``
1498 :param events: list of `pyrocko.model.Event` objects or ``None``
1499 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None``
1500 :param ntracks: float, number of tracks to be shown initially
1501 (default: 12)
1502 :param follow: time interval (in seconds) for real time follow mode or
1503 ``None``
1504 :param controls: bool, whether to show the main controls (default:
1505 ``True``)
1506 :param opengl: bool, whether to use opengl (default: ``False``)
1507 '''
1509 from pyrocko.gui.snuffler import snuffle
1510 snuffle(self, **kwargs)
1513def make_pile(
1514 paths=None, selector=None, regex=None,
1515 fileformat='mseed',
1516 cachedirname=None, show_progress=True):
1518 '''
1519 Create pile from given file and directory names.
1521 :param paths: filenames and/or directories to look for traces. If paths is
1522 ``None`` ``sys.argv[1:]`` is used.
1523 :param selector: lambda expression taking group dict of regex match object
1524 as a single argument and which returns true or false to keep or reject
1525 a file
1526 :param regex: regular expression which filenames have to match
1527 :param fileformat: format of the files ('mseed', 'sac', 'kan',
1528 'from_extension', 'detect')
1529 :param cachedirname: loader cache is stored under this directory. It is
1530 created as neccessary.
1531 :param show_progress: show progress bar and other progress information
1532 '''
1534 if show_progress_force_off:
1535 show_progress = False
1537 if isinstance(paths, str):
1538 paths = [paths]
1540 if paths is None:
1541 paths = sys.argv[1:]
1543 if cachedirname is None:
1544 cachedirname = config.config().cache_dir
1546 fns = util.select_files(
1547 paths, selector, regex, show_progress=show_progress)
1549 cache = get_cache(cachedirname)
1550 p = Pile()
1551 p.load_files(
1552 sorted(fns),
1553 cache=cache,
1554 fileformat=fileformat,
1555 show_progress=show_progress)
1557 return p
1560class Injector(trace.States):
1562 def __init__(
1563 self, pile,
1564 fixation_length=None,
1565 path=None,
1566 format='from_extension',
1567 forget_fixed=False):
1569 trace.States.__init__(self)
1570 self._pile = pile
1571 self._fixation_length = fixation_length
1572 self._format = format
1573 self._path = path
1574 self._forget_fixed = forget_fixed
1576 def set_fixation_length(self, length):
1577 '''
1578 Set length after which the fixation method is called on buffer traces.
1580 The length should be given in seconds. Give None to disable.
1581 '''
1582 self.fixate_all()
1583 self._fixation_length = length # in seconds
1585 def set_save_path(
1586 self,
1587 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_'
1588 '%(tmin)s_%(tmax)s.mseed'):
1590 self.fixate_all()
1591 self._path = path
1593 def inject(self, trace):
1594 logger.debug('Received a trace: %s' % trace)
1596 buf = self.get(trace)
1597 if buf is None:
1598 trbuf = trace.copy()
1599 buf = MemTracesFile(None, [trbuf])
1600 self._pile.add_file(buf)
1601 self.set(trace, buf)
1603 else:
1604 self._pile.remove_file(buf)
1605 trbuf = buf.get_traces()[0]
1606 buf.remove(trbuf)
1607 trbuf.append(trace.ydata)
1608 buf.add(trbuf)
1609 self._pile.add_file(buf)
1610 self.set(trace, buf)
1612 trbuf = buf.get_traces()[0]
1613 if self._fixation_length is not None:
1614 if trbuf.tmax - trbuf.tmin > self._fixation_length:
1615 self._fixate(buf, complete=False)
1617 def fixate_all(self):
1618 for state in list(self._states.values()):
1619 self._fixate(state[-1])
1621 self._states = {}
1623 def free(self, buf):
1624 self._fixate(buf)
1626 def _fixate(self, buf, complete=True):
1627 trbuf = buf.get_traces()[0]
1628 del_state = True
1629 if self._path:
1630 if self._fixation_length is not None:
1631 ttmin = trbuf.tmin
1632 ytmin = util.year_start(ttmin)
1633 n = int(math.floor((ttmin - ytmin) / self._fixation_length))
1634 tmin = ytmin + n*self._fixation_length
1635 traces = []
1636 t = tmin
1637 while t <= trbuf.tmax:
1638 try:
1639 traces.append(
1640 trbuf.chop(
1641 t,
1642 t+self._fixation_length,
1643 inplace=False,
1644 snap=(math.ceil, math.ceil)))
1646 except trace.NoData:
1647 pass
1648 t += self._fixation_length
1650 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \
1651 trbuf.deltat/100. or complete:
1653 self._pile.remove_file(buf)
1655 else: # reinsert incomplete last part
1656 new_trbuf = traces.pop()
1657 self._pile.remove_file(buf)
1658 buf.remove(trbuf)
1659 buf.add(new_trbuf)
1660 self._pile.add_file(buf)
1661 del_state = False
1663 else:
1664 traces = [trbuf]
1665 self._pile.remove_file(buf)
1667 fns = io.save(traces, self._path, format=self._format)
1669 if not self._forget_fixed:
1670 self._pile.load_files(
1671 fns, show_progress=False, fileformat=self._format)
1673 if del_state:
1674 del self._states[trbuf.nslc_id]
1676 def __del__(self):
1677 self.fixate_all()