1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import os
7import logging
8import time
9import weakref
10import copy
11import re
12import sys
13import operator
14import math
15import hashlib
16try:
17 import cPickle as pickle
18except ImportError:
19 import pickle
22from . import avl
23from . import trace, io, util
24from . import config
25from .trace import degapper
28is_windows = sys.platform.startswith('win')
29show_progress_force_off = False
30version_salt = 'v1-'
33def ehash(s):
34 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest()
37def cmp(a, b):
38 return int(a > b) - int(a < b)
41def sl(s):
42 return [str(x) for x in sorted(s)]
45class Counter(dict):
47 def __missing__(self, k):
48 return 0
50 def update(self, other):
51 for k, v in other.items():
52 self[k] += v
54 def subtract(self, other):
55 for k, v in other.items():
56 self[k] -= v
57 if self[k] <= 0:
58 del self[k]
60 def subtract1(self, k):
61 self[k] -= 1
62 if self[k] <= 0:
63 del self[k]
66def fix_unicode_copy(counter, func):
67 counter_new = Counter()
68 for k in counter:
69 counter_new[func(k)] = counter[k]
70 return counter_new
73pjoin = os.path.join
74logger = logging.getLogger('pyrocko.pile')
77def avl_remove_exact(avltree, element):
78 ilo, ihi = avltree.span(element)
79 for i in range(ilo, ihi):
80 if avltree[i] is element:
81 avltree.remove_at(i)
82 return i
84 raise ValueError(
85 'avl_remove_exact(avltree, element): element not in avltree')
88def cmpfunc(key):
89 if isinstance(key, str):
90 # special cases; these run about 50% faster than the generic one on
91 # Python 2.5
92 if key == 'tmin':
93 return lambda a, b: cmp(a.tmin, b.tmin)
94 if key == 'tmax':
95 return lambda a, b: cmp(a.tmax, b.tmax)
97 key = operator.attrgetter(key)
99 return lambda a, b: cmp(key(a), key(b))
102g_dummys = {}
105def get_dummy(key):
106 if key not in g_dummys:
107 class Dummy(object):
108 def __init__(self, k):
109 setattr(self, key, k)
111 g_dummys[key] = Dummy
113 return g_dummys[key]
116class TooMany(Exception):
117 def __init__(self, n):
118 Exception.__init__(self)
119 self.n = n
122class Sorted(object):
123 def __init__(self, values=[], key=None):
124 self._set_key(key)
125 self._avl = avl.new(values, self._cmp)
127 def _set_key(self, key):
128 self._key = key
129 self._cmp = cmpfunc(key)
130 if isinstance(key, str):
131 self._dummy = get_dummy(key)
133 def __getstate__(self):
134 state = list(self._avl.iter()), self._key
135 return state
137 def __setstate__(self, state):
138 l, key = state
139 self._set_key(key)
140 self._avl = avl.from_iter(iter(l), len(l))
142 def insert(self, value):
143 self._avl.insert(value)
145 def remove(self, value):
146 return avl_remove_exact(self._avl, value)
148 def remove_at(self, i):
149 return self._avl.remove_at(i)
151 def insert_many(self, values):
152 for value in values:
153 self._avl.insert(value)
155 def remove_many(self, values):
156 for value in values:
157 avl_remove_exact(self._avl, value)
159 def __iter__(self):
160 return iter(self._avl)
162 def with_key_in(self, kmin, kmax):
163 omin, omax = self._dummy(kmin), self._dummy(kmax)
164 ilo, ihi = self._avl.span(omin, omax)
165 return self._avl[ilo:ihi]
167 def with_key_in_limited(self, kmin, kmax, nmax):
168 omin, omax = self._dummy(kmin), self._dummy(kmax)
169 ilo, ihi = self._avl.span(omin, omax)
170 if ihi - ilo > nmax:
171 raise TooMany(ihi - ilo)
173 return self._avl[ilo:ihi]
175 def index(self, value):
176 ilo, ihi = self._avl.span(value)
177 for i in range(ilo, ihi):
178 if self._avl[i] is value:
179 return i
181 raise ValueError('element is not in avl tree')
183 def min(self):
184 return self._avl.min()
186 def max(self):
187 return self._avl.max()
189 def __len__(self):
190 return len(self._avl)
192 def __getitem__(self, i):
193 return self._avl[i]
196class TracesFileCache(object):
197 '''
198 Manages trace metainformation cache.
200 For each directory with files containing traces, one cache file is
201 maintained to hold the trace metainformation of all files which are
202 contained in the directory.
203 '''
205 caches = {}
207 def __init__(self, cachedir):
208 '''
209 Create new cache.
211 :param cachedir: directory to hold the cache files.
212 '''
214 self.cachedir = cachedir
215 self.dircaches = {}
216 self.modified = set()
217 util.ensuredir(self.cachedir)
219 def get(self, abspath):
220 '''
221 Try to get an item from the cache.
223 :param abspath: absolute path of the object to retrieve
225 :returns: a stored object is returned or None if nothing could be
226 found.
227 '''
229 dircache = self._get_dircache_for(abspath)
230 if abspath in dircache:
231 return dircache[abspath]
232 return None
234 def put(self, abspath, tfile):
235 '''
236 Put an item into the cache.
238 :param abspath: absolute path of the object to be stored
239 :param tfile: object to be stored
240 '''
242 cachepath = self._dircachepath(abspath)
243 # get lock on cachepath here
244 dircache = self._get_dircache(cachepath)
245 dircache[abspath] = tfile
246 self.modified.add(cachepath)
248 def dump_modified(self):
249 '''
250 Save any modifications to disk.
251 '''
253 for cachepath in self.modified:
254 self._dump_dircache(self.dircaches[cachepath], cachepath)
255 # unlock
257 self.modified = set()
259 def clean(self):
260 '''
261 Weed out missing files from the disk caches.
262 '''
264 self.dump_modified()
266 for fn in os.listdir(self.cachedir):
267 if len(fn) == 40:
268 cache = self._load_dircache(pjoin(self.cachedir, fn))
269 self._dump_dircache(cache, pjoin(self.cachedir, fn))
271 def _get_dircache_for(self, abspath):
272 return self._get_dircache(self._dircachepath(abspath))
274 def _get_dircache(self, cachepath):
275 if cachepath not in self.dircaches:
276 if os.path.isfile(cachepath):
277 self.dircaches[cachepath] = self._load_dircache(cachepath)
278 else:
279 self.dircaches[cachepath] = {}
281 return self.dircaches[cachepath]
283 def _dircachepath(self, abspath):
284 cachefn = ehash(os.path.dirname(abspath))
285 return pjoin(self.cachedir, cachefn)
287 def _load_dircache(self, cachefilename):
289 with open(cachefilename, 'rb') as f:
290 cache = pickle.load(f)
292 # weed out files which no longer exist
293 for fn in list(cache.keys()):
294 if not os.path.isfile(fn):
295 del cache[fn]
297 time_float = util.get_time_float()
299 for v in cache.values():
300 v.trees_from_content(v.traces)
301 for tr in v.traces:
302 tr.file = v
303 # fix Py2 codes to not include unicode when the cache file
304 # was created with Py3
305 if not isinstance(tr.station, str):
306 tr.prune_from_reuse_cache()
307 tr.set_codes(
308 str(tr.network),
309 str(tr.station),
310 str(tr.location),
311 str(tr.channel))
313 tr.tmin = time_float(tr.tmin)
314 tr.tmax = time_float(tr.tmax)
316 v.data_use_count = 0
317 v.data_loaded = False
318 v.fix_unicode_codes()
320 return cache
322 def _dump_dircache(self, cache, cachefilename):
324 if not cache:
325 if os.path.exists(cachefilename):
326 os.remove(cachefilename)
327 return
329 # make a copy without the parents and the binsearch trees
330 cache_copy = {}
331 for fn in cache.keys():
332 trf = copy.copy(cache[fn])
333 trf.parent = None
334 trf.by_tmin = None
335 trf.by_tmax = None
336 trf.by_tlen = None
337 trf.by_mtime = None
338 trf.data_use_count = 0
339 trf.data_loaded = False
340 traces = []
341 for tr in trf.traces:
342 tr = tr.copy(data=False)
343 tr.ydata = None
344 tr.meta = None
345 tr.file = trf
346 traces.append(tr)
348 trf.traces = traces
350 cache_copy[fn] = trf
352 tmpfn = cachefilename+'.%i.tmp' % os.getpid()
353 with open(tmpfn, 'wb') as f:
354 pickle.dump(cache_copy, f, protocol=2)
356 if is_windows and os.path.exists(cachefilename):
357 # windows doesn't allow to rename over existing file
358 os.unlink(cachefilename)
360 os.rename(tmpfn, cachefilename)
363def get_cache(cachedir):
364 '''
365 Get global TracesFileCache object for given directory.
366 '''
367 if cachedir not in TracesFileCache.caches:
368 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir)
370 return TracesFileCache.caches[cachedir]
373def loader(
374 filenames, fileformat, cache, filename_attributes,
375 show_progress=True, update_progress=None):
377 if show_progress_force_off:
378 show_progress = False
380 class Progress(object):
381 def __init__(self, label, n):
382 self._label = label
383 self._n = n
384 self._bar = None
385 if show_progress:
386 self._bar = util.progressbar(label, self._n)
388 if update_progress:
389 update_progress(label, 0, self._n)
391 def update(self, i):
392 if self._bar:
393 if i < self._n-1:
394 self._bar.update(i)
395 else:
396 self._bar.finish()
397 self._bar = None
399 abort = False
400 if update_progress:
401 abort = update_progress(self._label, i, self._n)
403 return abort
405 def finish(self):
406 if self._bar:
407 self._bar.finish()
408 self._bar = None
410 if not filenames:
411 logger.warning('No files to load from')
412 return
414 regex = None
415 if filename_attributes:
416 regex = re.compile(filename_attributes)
418 try:
419 progress = Progress('Looking at files', len(filenames))
421 failures = []
422 to_load = []
423 for i, filename in enumerate(filenames):
424 try:
425 abspath = os.path.abspath(filename)
427 substitutions = None
428 if regex:
429 m = regex.search(filename)
430 if not m:
431 raise FilenameAttributeError(
432 "Cannot get attributes with pattern '%s' "
433 "from path '%s'" % (filename_attributes, filename))
435 substitutions = {}
436 for k in m.groupdict():
437 if k in ('network', 'station', 'location', 'channel'):
438 substitutions[k] = m.groupdict()[k]
440 mtime = os.stat(filename)[8]
441 tfile = None
442 if cache:
443 tfile = cache.get(abspath)
445 mustload = (
446 not tfile or
447 (tfile.format != fileformat and fileformat != 'detect') or
448 tfile.mtime != mtime or
449 substitutions is not None)
451 to_load.append(
452 (mustload, mtime, abspath, substitutions, tfile))
454 except (OSError, FilenameAttributeError) as xerror:
455 failures.append(abspath)
456 logger.warning(xerror)
458 abort = progress.update(i+1)
459 if abort:
460 progress.update(len(filenames))
461 return
463 progress.update(len(filenames))
465 to_load.sort(key=lambda x: x[2])
467 nload = len([1 for x in to_load if x[0]])
468 iload = 0
470 count_all = False
471 if nload < 0.01*len(to_load):
472 nload = len(to_load)
473 count_all = True
475 if to_load:
476 progress = Progress('Scanning files', nload)
478 for (mustload, mtime, abspath, substitutions, tfile) in to_load:
479 try:
480 if mustload:
481 tfile = TracesFile(
482 None, abspath, fileformat,
483 substitutions=substitutions, mtime=mtime)
485 if cache and not substitutions:
486 cache.put(abspath, tfile)
488 if not count_all:
489 iload += 1
491 if count_all:
492 iload += 1
494 except (io.FileLoadError, OSError) as xerror:
495 failures.append(abspath)
496 logger.warning(xerror)
497 else:
498 yield tfile
500 abort = progress.update(iload+1)
501 if abort:
502 break
504 progress.update(nload)
506 if failures:
507 logger.warning(
508 'The following file%s caused problems and will be ignored:\n' %
509 util.plural_s(len(failures)) + '\n'.join(failures))
511 if cache:
512 cache.dump_modified()
513 finally:
514 progress.finish()
517def tlen(x):
518 return x.tmax-x.tmin
521class TracesGroup(object):
523 '''
524 Trace container base class.
526 Base class for Pile, SubPile, and TracesFile, i.e. anything containing
527 a collection of several traces. A TracesGroup object maintains lookup sets
528 of some of the traces meta-information, as well as a combined time-range
529 of its contents.
530 '''
532 def __init__(self, parent):
533 self.parent = parent
534 self.empty()
535 self.nupdates = 0
536 self.abspath = None
538 def set_parent(self, parent):
539 self.parent = parent
541 def get_parent(self):
542 return self.parent
544 def empty(self):
545 self.networks, self.stations, self.locations, self.channels, \
546 self.nslc_ids, self.deltats = [Counter() for x in range(6)]
547 self.by_tmin = Sorted([], 'tmin')
548 self.by_tmax = Sorted([], 'tmax')
549 self.by_tlen = Sorted([], tlen)
550 self.by_mtime = Sorted([], 'mtime')
551 self.tmin, self.tmax = None, None
552 self.deltatmin, self.deltatmax = None, None
554 def trees_from_content(self, content):
555 self.by_tmin = Sorted(content, 'tmin')
556 self.by_tmax = Sorted(content, 'tmax')
557 self.by_tlen = Sorted(content, tlen)
558 self.by_mtime = Sorted(content, 'mtime')
559 self.adjust_minmax()
561 def fix_unicode_codes(self):
562 for net in self.networks:
563 if isinstance(net, str):
564 return
566 self.networks = fix_unicode_copy(self.networks, str)
567 self.stations = fix_unicode_copy(self.stations, str)
568 self.locations = fix_unicode_copy(self.locations, str)
569 self.channels = fix_unicode_copy(self.channels, str)
570 self.nslc_ids = fix_unicode_copy(
571 self.nslc_ids, lambda k: tuple(str(x) for x in k))
573 def add(self, content):
574 '''
575 Add content to traces group and update indices.
577 Accepts :py:class:`pyrocko.trace.Trace` objects and
578 :py:class:`pyrocko.pile.TracesGroup` objects.
579 '''
581 if isinstance(content, (trace.Trace, TracesGroup)):
582 content = [content]
584 for c in content:
586 if isinstance(c, TracesGroup):
587 self.networks.update(c.networks)
588 self.stations.update(c.stations)
589 self.locations.update(c.locations)
590 self.channels.update(c.channels)
591 self.nslc_ids.update(c.nslc_ids)
592 self.deltats.update(c.deltats)
594 self.by_tmin.insert_many(c.by_tmin)
595 self.by_tmax.insert_many(c.by_tmax)
596 self.by_tlen.insert_many(c.by_tlen)
597 self.by_mtime.insert_many(c.by_mtime)
599 elif isinstance(c, trace.Trace):
600 self.networks[c.network] += 1
601 self.stations[c.station] += 1
602 self.locations[c.location] += 1
603 self.channels[c.channel] += 1
604 self.nslc_ids[c.nslc_id] += 1
605 self.deltats[c.deltat] += 1
607 self.by_tmin.insert(c)
608 self.by_tmax.insert(c)
609 self.by_tlen.insert(c)
610 self.by_mtime.insert(c)
612 self.adjust_minmax()
614 self.nupdates += 1
615 self.notify_listeners('add')
617 if self.parent is not None:
618 self.parent.add(content)
620 def remove(self, content):
621 '''
622 Remove content to traces group and update indices.
623 '''
624 if isinstance(content, (trace.Trace, TracesGroup)):
625 content = [content]
627 for c in content:
629 if isinstance(c, TracesGroup):
630 self.networks.subtract(c.networks)
631 self.stations.subtract(c.stations)
632 self.locations.subtract(c.locations)
633 self.channels.subtract(c.channels)
634 self.nslc_ids.subtract(c.nslc_ids)
635 self.deltats.subtract(c.deltats)
637 self.by_tmin.remove_many(c.by_tmin)
638 self.by_tmax.remove_many(c.by_tmax)
639 self.by_tlen.remove_many(c.by_tlen)
640 self.by_mtime.remove_many(c.by_mtime)
642 elif isinstance(c, trace.Trace):
643 self.networks.subtract1(c.network)
644 self.stations.subtract1(c.station)
645 self.locations.subtract1(c.location)
646 self.channels.subtract1(c.channel)
647 self.nslc_ids.subtract1(c.nslc_id)
648 self.deltats.subtract1(c.deltat)
650 self.by_tmin.remove(c)
651 self.by_tmax.remove(c)
652 self.by_tlen.remove(c)
653 self.by_mtime.remove(c)
655 self.adjust_minmax()
657 self.nupdates += 1
658 self.notify_listeners('remove')
660 if self.parent is not None:
661 self.parent.remove(content)
663 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None):
664 '''
665 Return list of :py:class:`pyrocko.trace.Trace` objects where given
666 arguments ``tmin`` and ``tmax`` match.
668 :param tmin: start time
669 :param tmax: end time
670 :param group_selector: lambda expression taking group dict of regex
671 match object as a single argument and which returns true or false
672 to keep or reject a file (default: ``None``)
673 :param trace_selector: lambda expression taking group dict of regex
674 match object as a single argument and which returns true or false
675 to keep or reject a file (default: ``None``)
676 '''
678 if not self.by_tmin or not self.is_relevant(
679 tmin, tmax, group_selector):
681 return []
683 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax)
684 if tr.is_relevant(tmin, tmax, trace_selector)]
686 def adjust_minmax(self):
687 if self.by_tmin:
688 self.tmin = self.by_tmin.min().tmin
689 self.tmax = self.by_tmax.max().tmax
690 t = self.by_tlen.max()
691 self.tlenmax = t.tmax - t.tmin
692 self.mtime = self.by_mtime.max().mtime
693 deltats = list(self.deltats.keys())
694 self.deltatmin = min(deltats)
695 self.deltatmax = max(deltats)
696 else:
697 self.tmin = None
698 self.tmax = None
699 self.tlenmax = None
700 self.mtime = None
701 self.deltatmin = None
702 self.deltatmax = None
704 def notify_listeners(self, what):
705 pass
707 def get_update_count(self):
708 return self.nupdates
710 def overlaps(self, tmin, tmax):
711 return self.tmin is not None \
712 and tmax >= self.tmin and self.tmax >= tmin
714 def is_relevant(self, tmin, tmax, group_selector=None):
715 if self.tmin is None or self.tmax is None:
716 return False
717 return tmax >= self.tmin and self.tmax >= tmin and (
718 group_selector is None or group_selector(self))
721class MemTracesFile(TracesGroup):
723 '''
724 This is needed to make traces without an actual disc file to be inserted
725 into a Pile.
726 '''
728 def __init__(self, parent, traces):
729 TracesGroup.__init__(self, parent)
730 self.add(traces)
731 self.mtime = time.time()
733 def add(self, traces):
734 if isinstance(traces, trace.Trace):
735 traces = [traces]
737 for tr in traces:
738 tr.file = self
740 TracesGroup.add(self, traces)
742 def load_headers(self, mtime=None):
743 pass
745 def load_data(self):
746 pass
748 def use_data(self):
749 pass
751 def drop_data(self):
752 pass
754 def reload_if_modified(self):
755 return False
757 def iter_traces(self):
758 for tr in self.by_tmin:
759 yield tr
761 def get_traces(self):
762 return list(self.by_tmin)
764 def gather_keys(self, gather, selector=None):
765 keys = set()
766 for tr in self.by_tmin:
767 if selector is None or selector(tr):
768 keys.add(gather(tr))
770 return keys
772 def __str__(self):
774 s = 'MemTracesFile\n'
775 s += 'file mtime: %s\n' % util.time_to_str(self.mtime)
776 s += 'number of traces: %i\n' % len(self.by_tmin)
777 s += 'timerange: %s - %s\n' % (
778 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
779 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
780 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
781 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
782 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
783 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
784 return s
787class TracesFile(TracesGroup):
788 def __init__(
789 self, parent, abspath, format,
790 substitutions=None, mtime=None):
792 TracesGroup.__init__(self, parent)
793 self.abspath = abspath
794 self.format = format
795 self.traces = []
796 self.data_loaded = False
797 self.data_use_count = 0
798 self.substitutions = substitutions
799 self.load_headers(mtime=mtime)
800 self.mtime = mtime
802 def load_headers(self, mtime=None):
803 logger.debug('loading headers from file: %s' % self.abspath)
804 if mtime is None:
805 self.mtime = os.stat(self.abspath)[8]
807 def kgen(tr):
808 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id
810 self.remove(self.traces)
811 ks = set()
812 for tr in io.load(self.abspath,
813 format=self.format,
814 getdata=False,
815 substitutions=self.substitutions):
817 k = kgen(tr)
818 if k not in ks:
819 ks.add(k)
820 self.traces.append(tr)
821 tr.file = self
823 self.add(self.traces)
825 self.data_loaded = False
826 self.data_use_count = 0
828 def load_data(self, force=False):
829 file_changed = False
830 if not self.data_loaded or force:
831 logger.debug('loading data from file: %s' % self.abspath)
833 def kgen(tr):
834 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id
836 traces_ = io.load(self.abspath, format=self.format, getdata=True,
837 substitutions=self.substitutions)
839 # prevent adding duplicate snippets from corrupt mseed files
840 k_loaded = set()
841 traces = []
842 for tr in traces_:
843 k = kgen(tr)
844 if k not in k_loaded:
845 k_loaded.add(k)
846 traces.append(tr)
848 k_current_d = dict((kgen(tr), tr) for tr in self.traces)
849 k_current = set(k_current_d)
850 k_new = k_loaded - k_current
851 k_delete = k_current - k_loaded
852 k_unchanged = k_current & k_loaded
854 for tr in self.traces[:]:
855 if kgen(tr) in k_delete:
856 self.remove(tr)
857 self.traces.remove(tr)
858 tr.file = None
859 file_changed = True
861 for tr in traces:
862 if kgen(tr) in k_new:
863 tr.file = self
864 self.traces.append(tr)
865 self.add(tr)
866 file_changed = True
868 for tr in traces:
869 if kgen(tr) in k_unchanged:
870 ctr = k_current_d[kgen(tr)]
871 ctr.ydata = tr.ydata
873 self.data_loaded = True
875 if file_changed:
876 logger.debug('reloaded (file may have changed): %s' % self.abspath)
878 return file_changed
880 def use_data(self):
881 if not self.data_loaded:
882 raise Exception('Data not loaded')
883 self.data_use_count += 1
885 def drop_data(self):
886 if self.data_loaded:
887 if self.data_use_count == 1:
888 logger.debug('forgetting data of file: %s' % self.abspath)
889 for tr in self.traces:
890 tr.drop_data()
892 self.data_loaded = False
894 self.data_use_count -= 1
895 else:
896 self.data_use_count = 0
898 def reload_if_modified(self):
899 mtime = os.stat(self.abspath)[8]
900 if mtime != self.mtime:
901 logger.debug(
902 'mtime=%i, reloading file: %s' % (mtime, self.abspath))
904 self.mtime = mtime
905 if self.data_loaded:
906 self.load_data(force=True)
907 else:
908 self.load_headers()
910 return True
912 return False
914 def iter_traces(self):
915 for tr in self.traces:
916 yield tr
918 def gather_keys(self, gather, selector=None):
919 keys = set()
920 for tr in self.by_tmin:
921 if selector is None or selector(tr):
922 keys.add(gather(tr))
924 return keys
926 def __str__(self):
927 s = 'TracesFile\n'
928 s += 'abspath: %s\n' % self.abspath
929 s += 'file mtime: %s\n' % util.time_to_str(self.mtime)
930 s += 'number of traces: %i\n' % len(self.traces)
931 s += 'timerange: %s - %s\n' % (
932 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
933 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
934 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
935 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
936 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
937 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
938 return s
941class FilenameAttributeError(Exception):
942 pass
945class SubPile(TracesGroup):
946 def __init__(self, parent):
947 TracesGroup.__init__(self, parent)
948 self.files = []
949 self.empty()
951 def add_file(self, file):
952 self.files.append(file)
953 file.set_parent(self)
954 self.add(file)
956 def remove_file(self, file):
957 self.files.remove(file)
958 file.set_parent(None)
959 self.remove(file)
961 def remove_files(self, files):
962 for file in files:
963 self.files.remove(file)
964 file.set_parent(None)
965 self.remove(files)
967 def gather_keys(self, gather, selector=None):
968 keys = set()
969 for file in self.files:
970 keys |= file.gather_keys(gather, selector)
972 return keys
974 def iter_traces(
975 self,
976 load_data=False,
977 return_abspath=False,
978 group_selector=None,
979 trace_selector=None):
981 for file in self.files:
983 if group_selector and not group_selector(file):
984 continue
986 must_drop = False
987 if load_data:
988 file.load_data()
989 file.use_data()
990 must_drop = True
992 for tr in file.iter_traces():
993 if trace_selector and not trace_selector(tr):
994 continue
996 if return_abspath:
997 yield file.abspath, tr
998 else:
999 yield tr
1001 if must_drop:
1002 file.drop_data()
1004 def iter_files(self):
1005 for file in self.files:
1006 yield file
1008 def reload_modified(self):
1009 modified = False
1010 for file in self.files:
1011 modified |= file.reload_if_modified()
1013 return modified
1015 def __str__(self):
1016 s = 'SubPile\n'
1017 s += 'number of files: %i\n' % len(self.files)
1018 s += 'timerange: %s - %s\n' % (
1019 util.time_to_str(self.tmin), util.time_to_str(self.tmax))
1020 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
1021 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
1022 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
1023 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
1024 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
1025 return s
1028class Batch(object):
1029 '''
1030 Batch of waveforms from window wise data extraction.
1032 Encapsulates state and results yielded for each window in window wise
1033 waveform extraction with the :py:meth:`Pile.chopper` method (when the
1034 `style='batch'` keyword argument set).
1036 *Attributes:*
1038 .. py:attribute:: tmin
1040 Start of this time window.
1042 .. py:attribute:: tmax
1044 End of this time window.
1046 .. py:attribute:: i
1048 Index of this time window in sequence.
1050 .. py:attribute:: n
1052 Total number of time windows in sequence.
1054 .. py:attribute:: traces
1056 Extracted waveforms for this time window.
1057 '''
1059 def __init__(self, tmin, tmax, i, n, traces):
1060 self.tmin = tmin
1061 self.tmax = tmax
1062 self.i = i
1063 self.n = n
1064 self.traces = traces
1067class Pile(TracesGroup):
1068 '''
1069 Waveform archive lookup, data loading and caching infrastructure.
1070 '''
1072 def __init__(self):
1073 TracesGroup.__init__(self, None)
1074 self.subpiles = {}
1075 self.open_files = {}
1076 self.listeners = []
1077 self.abspaths = set()
1079 def add_listener(self, obj):
1080 self.listeners.append(weakref.ref(obj))
1082 def notify_listeners(self, what):
1083 for ref in self.listeners:
1084 obj = ref()
1085 if obj:
1086 obj.pile_changed(what)
1088 def load_files(
1089 self, filenames,
1090 filename_attributes=None,
1091 fileformat='mseed',
1092 cache=None,
1093 show_progress=True,
1094 update_progress=None):
1096 load = loader(
1097 filenames, fileformat, cache, filename_attributes,
1098 show_progress=show_progress,
1099 update_progress=update_progress)
1101 self.add_files(load)
1103 def add_files(self, files):
1104 for file in files:
1105 self.add_file(file)
1107 def add_file(self, file):
1108 if file.abspath is not None and file.abspath in self.abspaths:
1109 logger.warning('File already in pile: %s' % file.abspath)
1110 return
1112 if file.deltatmin is None:
1113 logger.warning('Sampling rate of all traces are zero in file: %s' %
1114 file.abspath)
1115 return
1117 subpile = self.dispatch(file)
1118 subpile.add_file(file)
1119 if file.abspath is not None:
1120 self.abspaths.add(file.abspath)
1122 def remove_file(self, file):
1123 subpile = file.get_parent()
1124 if subpile is not None:
1125 subpile.remove_file(file)
1126 if file.abspath is not None:
1127 self.abspaths.remove(file.abspath)
1129 def remove_files(self, files):
1130 subpile_files = {}
1131 for file in files:
1132 subpile = file.get_parent()
1133 if subpile not in subpile_files:
1134 subpile_files[subpile] = []
1136 subpile_files[subpile].append(file)
1138 for subpile, files in subpile_files.items():
1139 subpile.remove_files(files)
1140 for file in files:
1141 if file.abspath is not None:
1142 self.abspaths.remove(file.abspath)
1144 def dispatch_key(self, file):
1145 dt = int(math.floor(math.log(file.deltatmin)))
1146 return dt
1148 def dispatch(self, file):
1149 k = self.dispatch_key(file)
1150 if k not in self.subpiles:
1151 self.subpiles[k] = SubPile(self)
1153 return self.subpiles[k]
1155 def get_deltats(self):
1156 return list(self.deltats.keys())
1158 def chop(
1159 self, tmin, tmax,
1160 group_selector=None,
1161 trace_selector=None,
1162 snap=(round, round),
1163 include_last=False,
1164 load_data=True):
1166 chopped = []
1167 used_files = set()
1169 traces = self.relevant(tmin, tmax, group_selector, trace_selector)
1170 if load_data:
1171 files_changed = False
1172 for tr in traces:
1173 if tr.file and tr.file not in used_files:
1174 if tr.file.load_data():
1175 files_changed = True
1177 if tr.file is not None:
1178 used_files.add(tr.file)
1180 if files_changed:
1181 traces = self.relevant(
1182 tmin, tmax, group_selector, trace_selector)
1184 for tr in traces:
1185 if not load_data and tr.ydata is not None:
1186 tr = tr.copy(data=False)
1187 tr.ydata = None
1189 try:
1190 chopped.append(tr.chop(
1191 tmin, tmax,
1192 inplace=False,
1193 snap=snap,
1194 include_last=include_last))
1196 except trace.NoData:
1197 pass
1199 return chopped, used_files
1201 def _process_chopped(
1202 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
1203 tpad):
1205 chopped.sort(key=lambda a: a.full_id)
1206 if degap:
1207 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap)
1209 if not want_incomplete:
1210 chopped_weeded = []
1211 for tr in chopped:
1212 emin = tr.tmin - (wmin-tpad)
1213 emax = tr.tmax + tr.deltat - (wmax+tpad)
1214 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
1215 chopped_weeded.append(tr)
1217 elif degap:
1218 if (0. < emin <= 5. * tr.deltat and
1219 -5. * tr.deltat <= emax < 0.):
1221 tr.extend(
1222 wmin-tpad,
1223 wmax+tpad-tr.deltat,
1224 fillmethod='repeat')
1226 chopped_weeded.append(tr)
1228 chopped = chopped_weeded
1230 for tr in chopped:
1231 tr.wmin = wmin
1232 tr.wmax = wmax
1234 return chopped
1236 def chopper(
1237 self,
1238 tmin=None, tmax=None, tinc=None, tpad=0.,
1239 group_selector=None, trace_selector=None,
1240 want_incomplete=True, degap=True, maxgap=5, maxlap=None,
1241 keep_current_files_open=False, accessor_id=None,
1242 snap=(round, round), include_last=False, load_data=True,
1243 style=None):
1245 '''
1246 Get iterator for shifting window wise data extraction from waveform
1247 archive.
1249 :param tmin: start time (default uses start time of available data)
1250 :param tmax: end time (default uses end time of available data)
1251 :param tinc: time increment (window shift time) (default uses
1252 ``tmax-tmin``)
1253 :param tpad: padding time appended on either side of the data windows
1254 (window overlap is ``2*tpad``)
1255 :param group_selector: filter callback taking :py:class:`TracesGroup`
1256 objects
1257 :param trace_selector: filter callback taking
1258 :py:class:`pyrocko.trace.Trace` objects
1259 :param want_incomplete: if set to ``False``, gappy/incomplete traces
1260 are discarded from the results
1261 :param degap: whether to try to connect traces and to remove gaps and
1262 overlaps
1263 :param maxgap: maximum gap size in samples which is filled with
1264 interpolated samples when ``degap`` is ``True``
1265 :param maxlap: maximum overlap size in samples which is removed when
1266 ``degap`` is ``True``
1267 :param keep_current_files_open: whether to keep cached trace data in
1268 memory after the iterator has ended
1269 :param accessor_id: if given, used as a key to identify different
1270 points of extraction for the decision of when to release cached
1271 trace data (should be used when data is alternately extracted from
1272 more than one region / selection)
1273 :param snap: replaces Python's :py:func:`round` function which is used
1274 to determine indices where to start and end the trace data array
1275 :param include_last: whether to include last sample
1276 :param load_data: whether to load the waveform data. If set to
1277 ``False``, traces with no data samples, but with correct
1278 meta-information are returned
1279 :param style: set to ``'batch'`` to yield waveforms and information
1280 about the chopper state as :py:class:`Batch` objects. By default
1281 lists of :py:class:`pyrocko.trace.Trace` objects are yielded.
1282 :returns: iterator providing extracted waveforms for each extracted
1283 window. See ``style`` argument for details.
1284 '''
1285 if tmin is None:
1286 if self.tmin is None:
1287 logger.warning('Pile\'s tmin is not set - pile may be empty.')
1288 return
1289 tmin = self.tmin + tpad
1291 if tmax is None:
1292 if self.tmax is None:
1293 logger.warning('Pile\'s tmax is not set - pile may be empty.')
1294 return
1295 tmax = self.tmax - tpad
1297 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector):
1298 return
1300 if accessor_id not in self.open_files:
1301 self.open_files[accessor_id] = set()
1303 open_files = self.open_files[accessor_id]
1305 if tinc is None:
1306 tinc = tmax - tmin
1307 nwin = 1
1308 else:
1309 eps = tinc * 1e-6
1310 if tinc != 0.0:
1311 nwin = int(((tmax - eps) - tmin) / tinc) + 1
1312 else:
1313 nwin = 1
1315 for iwin in range(nwin):
1316 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
1318 chopped, used_files = self.chop(
1319 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap,
1320 include_last, load_data)
1322 for file in used_files - open_files:
1323 # increment datause counter on newly opened files
1324 file.use_data()
1326 open_files.update(used_files)
1328 processed = self._process_chopped(
1329 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
1330 tpad)
1332 if style == 'batch':
1333 yield Batch(
1334 tmin=wmin,
1335 tmax=wmax,
1336 i=iwin,
1337 n=nwin,
1338 traces=processed)
1340 else:
1341 yield processed
1343 unused_files = open_files - used_files
1345 while unused_files:
1346 file = unused_files.pop()
1347 file.drop_data()
1348 open_files.remove(file)
1350 if not keep_current_files_open:
1351 while open_files:
1352 file = open_files.pop()
1353 file.drop_data()
1355 def all(self, *args, **kwargs):
1356 '''
1357 Shortcut to aggregate :py:meth:`chopper` output into a single list.
1358 '''
1360 alltraces = []
1361 for traces in self.chopper(*args, **kwargs):
1362 alltraces.extend(traces)
1364 return alltraces
1366 def iter_all(self, *args, **kwargs):
1367 for traces in self.chopper(*args, **kwargs):
1368 for tr in traces:
1369 yield tr
1371 def chopper_grouped(self, gather, progress=None, *args, **kwargs):
1372 keys = self.gather_keys(gather)
1373 if len(keys) == 0:
1374 return
1376 outer_group_selector = None
1377 if 'group_selector' in kwargs:
1378 outer_group_selector = kwargs['group_selector']
1380 outer_trace_selector = None
1381 if 'trace_selector' in kwargs:
1382 outer_trace_selector = kwargs['trace_selector']
1384 # the use of this gather-cache makes it impossible to modify the pile
1385 # during chopping
1386 gather_cache = {}
1387 pbar = None
1388 try:
1389 if progress is not None:
1390 pbar = util.progressbar(progress, len(keys))
1392 for ikey, key in enumerate(keys):
1393 def tsel(tr):
1394 return gather(tr) == key and (
1395 outer_trace_selector is None
1396 or outer_trace_selector(tr))
1398 def gsel(gr):
1399 if gr not in gather_cache:
1400 gather_cache[gr] = gr.gather_keys(gather)
1402 return key in gather_cache[gr] and (
1403 outer_group_selector is None
1404 or outer_group_selector(gr))
1406 kwargs['trace_selector'] = tsel
1407 kwargs['group_selector'] = gsel
1409 for traces in self.chopper(*args, **kwargs):
1410 yield traces
1412 if pbar:
1413 pbar.update(ikey+1)
1415 finally:
1416 if pbar:
1417 pbar.finish()
1419 def gather_keys(self, gather, selector=None):
1420 keys = set()
1421 for subpile in self.subpiles.values():
1422 keys |= subpile.gather_keys(gather, selector)
1424 return sorted(keys)
1426 def iter_traces(
1427 self,
1428 load_data=False,
1429 return_abspath=False,
1430 group_selector=None,
1431 trace_selector=None):
1433 '''
1434 Iterate over all traces in pile.
1436 :param load_data: whether to load the waveform data, by default empty
1437 traces are yielded
1438 :param return_abspath: if ``True`` yield tuples containing absolute
1439 file path and :py:class:`pyrocko.trace.Trace` objects
1440 :param group_selector: filter callback taking :py:class:`TracesGroup`
1441 objects
1442 :param trace_selector: filter callback taking
1443 :py:class:`pyrocko.trace.Trace` objects
1445 Example; yields only traces, where the station code is 'HH1'::
1447 test_pile = pile.make_pile('/local/test_trace_directory')
1448 for t in test_pile.iter_traces(
1449 trace_selector=lambda tr: tr.station=='HH1'):
1451 print t
1452 '''
1454 for subpile in self.subpiles.values():
1455 if not group_selector or group_selector(subpile):
1456 for tr in subpile.iter_traces(load_data, return_abspath,
1457 group_selector, trace_selector):
1458 yield tr
1460 def iter_files(self):
1461 for subpile in self.subpiles.values():
1462 for file in subpile.iter_files():
1463 yield file
1465 def reload_modified(self):
1466 modified = False
1467 for subpile in self.subpiles.values():
1468 modified |= subpile.reload_modified()
1470 return modified
1472 def get_tmin(self):
1473 return self.tmin
1475 def get_tmax(self):
1476 return self.tmax
1478 def get_deltatmin(self):
1479 return self.deltatmin
1481 def get_deltatmax(self):
1482 return self.deltatmax
1484 def is_empty(self):
1485 return self.tmin is None and self.tmax is None
1487 def __str__(self):
1488 if self.tmin is not None and self.tmax is not None:
1489 tmin = util.time_to_str(self.tmin)
1490 tmax = util.time_to_str(self.tmax)
1491 s = 'Pile\n'
1492 s += 'number of subpiles: %i\n' % len(self.subpiles)
1493 s += 'timerange: %s - %s\n' % (tmin, tmax)
1494 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys()))
1495 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys()))
1496 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys()))
1497 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys()))
1498 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
1500 else:
1501 s = 'empty Pile'
1503 return s
1505 def snuffle(self, **kwargs):
1506 '''
1507 Visualize it.
1509 :param stations: list of `pyrocko.model.Station` objects or ``None``
1510 :param events: list of `pyrocko.model.Event` objects or ``None``
1511 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None``
1512 :param ntracks: float, number of tracks to be shown initially
1513 (default: 12)
1514 :param follow: time interval (in seconds) for real time follow mode or
1515 ``None``
1516 :param controls: bool, whether to show the main controls (default:
1517 ``True``)
1518 :param opengl: bool, whether to use opengl (default: ``False``)
1519 '''
1521 from pyrocko.gui.snuffler import snuffle
1522 snuffle(self, **kwargs)
1525def make_pile(
1526 paths=None, selector=None, regex=None,
1527 fileformat='mseed',
1528 cachedirname=None, show_progress=True):
1530 '''
1531 Create pile from given file and directory names.
1533 :param paths: filenames and/or directories to look for traces. If paths is
1534 ``None`` ``sys.argv[1:]`` is used.
1535 :param selector: lambda expression taking group dict of regex match object
1536 as a single argument and which returns true or false to keep or reject
1537 a file
1538 :param regex: regular expression which filenames have to match
1539 :param fileformat: format of the files ('mseed', 'sac', 'kan',
1540 'from_extension', 'detect')
1541 :param cachedirname: loader cache is stored under this directory. It is
1542 created as neccessary.
1543 :param show_progress: show progress bar and other progress information
1544 '''
1546 if show_progress_force_off:
1547 show_progress = False
1549 if isinstance(paths, str):
1550 paths = [paths]
1552 if paths is None:
1553 paths = sys.argv[1:]
1555 if cachedirname is None:
1556 cachedirname = config.config().cache_dir
1558 fns = util.select_files(
1559 paths, include=regex, selector=selector, show_progress=show_progress)
1561 cache = get_cache(cachedirname)
1562 p = Pile()
1563 p.load_files(
1564 sorted(fns),
1565 cache=cache,
1566 fileformat=fileformat,
1567 show_progress=show_progress)
1569 return p
1572class Injector(trace.States):
1574 def __init__(
1575 self, pile,
1576 fixation_length=None,
1577 path=None,
1578 format='from_extension',
1579 forget_fixed=False):
1581 trace.States.__init__(self)
1582 self._pile = pile
1583 self._fixation_length = fixation_length
1584 self._format = format
1585 self._path = path
1586 self._forget_fixed = forget_fixed
1588 def set_fixation_length(self, length):
1589 '''
1590 Set length after which the fixation method is called on buffer traces.
1592 The length should be given in seconds. Give None to disable.
1593 '''
1594 self.fixate_all()
1595 self._fixation_length = length # in seconds
1597 def set_save_path(
1598 self,
1599 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_'
1600 '%(tmin)s_%(tmax)s.mseed'):
1602 self.fixate_all()
1603 self._path = path
1605 def inject(self, trace):
1606 logger.debug('Received a trace: %s' % trace)
1608 buf = self.get(trace)
1609 if buf is None:
1610 trbuf = trace.copy()
1611 buf = MemTracesFile(None, [trbuf])
1612 self._pile.add_file(buf)
1613 self.set(trace, buf)
1615 else:
1616 self._pile.remove_file(buf)
1617 trbuf = buf.get_traces()[0]
1618 buf.remove(trbuf)
1619 trbuf.append(trace.ydata)
1620 buf.add(trbuf)
1621 self._pile.add_file(buf)
1622 self.set(trace, buf)
1624 trbuf = buf.get_traces()[0]
1625 if self._fixation_length is not None:
1626 if trbuf.tmax - trbuf.tmin > self._fixation_length:
1627 self._fixate(buf, complete=False)
1629 def fixate_all(self):
1630 for state in list(self._states.values()):
1631 self._fixate(state[-1])
1633 self._states = {}
1635 def free(self, buf):
1636 self._fixate(buf)
1638 def _fixate(self, buf, complete=True):
1639 trbuf = buf.get_traces()[0]
1640 del_state = True
1641 if self._path:
1642 if self._fixation_length is not None:
1643 ttmin = trbuf.tmin
1644 ytmin = util.year_start(ttmin)
1645 n = int(math.floor((ttmin - ytmin) / self._fixation_length))
1646 tmin = ytmin + n*self._fixation_length
1647 traces = []
1648 t = tmin
1649 while t <= trbuf.tmax:
1650 try:
1651 traces.append(
1652 trbuf.chop(
1653 t,
1654 t+self._fixation_length,
1655 inplace=False,
1656 snap=(math.ceil, math.ceil)))
1658 except trace.NoData:
1659 pass
1660 t += self._fixation_length
1662 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \
1663 trbuf.deltat/100. or complete:
1665 self._pile.remove_file(buf)
1667 else: # reinsert incomplete last part
1668 new_trbuf = traces.pop()
1669 self._pile.remove_file(buf)
1670 buf.remove(trbuf)
1671 buf.add(new_trbuf)
1672 self._pile.add_file(buf)
1673 del_state = False
1675 else:
1676 traces = [trbuf]
1677 self._pile.remove_file(buf)
1679 fns = io.save(traces, self._path, format=self._format)
1681 if not self._forget_fixed:
1682 self._pile.load_files(
1683 fns, show_progress=False, fileformat=self._format)
1685 if del_state:
1686 del self._states[trbuf.nslc_id]
1688 def __del__(self):
1689 self.fixate_all()