# http://pyrocko.org - GPLv3 # # The Pyrocko Developers, 21st Century # ---|P------/S----------~Lg----------
return [str(x) for x in sorted(s)]
for k, v in other.items(): self[k] -= v if self[k] <= 0: del self[k]
self[k] -= 1 if self[k] <= 0: del self[k]
counter_new = Counter() for k in counter: counter_new[func(k)] = counter[k] return counter_new
ilo, ihi = avltree.span(element) for i in range(ilo, ihi): if avltree[i] is element: avltree.remove_at(i) return
raise ValueError( 'avl_remove_exact(avltree, element): element not in avltree')
# special cases; these run about 50% faster than the generic one on # Python 2.5
state = list(self._avl.iter()), self._key return state
l, key = state self._set_key(key) self._avl = avl.from_iter(iter(l), len(l))
avl_remove_exact(self._avl, value)
for value in values: avl_remove_exact(self._avl, value)
'''Manages trace metainformation cache.
For each directory with files containing traces, one cache file is maintained to hold the trace metainformation of all files which are contained in the directory. '''
'''Create new cache.
:param cachedir: directory to hold the cache files.
'''
'''Try to get an item from the cache.
:param abspath: absolute path of the object to retrieve
:returns: a stored object is returned or None if nothing could be found.
'''
'''Put an item into the cache.
:param abspath: absolute path of the object to be stored :param tfile: object to be stored '''
# get lock on cachepath here
'''Save any modifications to disk.'''
# unlock
'''Weed out missing files from the disk caches.'''
self.dump_modified()
for fn in os.listdir(self.cachedir): if len(fn) == 40: cache = self._load_dircache(pjoin(self.cachedir, fn)) self._dump_dircache(cache, pjoin(self.cachedir, fn))
self.dircaches[cachepath] = self._load_dircache(cachepath) else:
with open(cachefilename, 'rb') as f: cache = pickle.load(f)
# weed out files which no longer exist for fn in list(cache.keys()): if not os.path.isfile(fn): del cache[fn]
for v in cache.values(): v.trees_from_content(v.traces) for tr in v.traces: tr.file = v # fix Py2 codes to not include unicode when the cache file # was created with Py3 if not isinstance(tr.station, str): tr.prune_from_reuse_cache() tr.set_codes( str(tr.network), str(tr.station), str(tr.location), str(tr.channel))
v.data_use_count = 0 v.data_loaded = False v.fix_unicode_codes()
return cache
if os.path.exists(cachefilename): os.remove(cachefilename) return
# make a copy without the parents and the binsearch trees
'''Get global TracesFileCache object for given directory.'''
filenames, fileformat, cache, filename_attributes, show_progress=True, update_progress=None):
show_progress = False
self._bar = util.progressbar(label, self._n)
update_progress(label, 0, self._n)
if i < self._n-1: self._bar.update(i) else: self._bar.finish() self._bar = None
abort = update_progress(self._label, i, self._n)
logger.warning('No files to load from') return
regex = re.compile(filename_attributes)
m = regex.search(filename) if not m: raise FilenameAttributeError( "Cannot get attributes with pattern '%s' " "from path '%s'" % (filename_attributes, filename))
substitutions = {} for k in m.groupdict(): if k in ('network', 'station', 'location', 'channel'): substitutions[k] = m.groupdict()[k]
not tfile or (tfile.format != fileformat and fileformat != 'detect') or tfile.mtime != mtime or substitutions is not None)
except (OSError, FilenameAttributeError) as xerror: failures.append(abspath) logger.warning(xerror)
progress.update(len(filenames)) return
None, abspath, fileformat, substitutions=substitutions, mtime=mtime)
except (io.FileLoadError, OSError) as xerror: failures.append(abspath) logger.warning(xerror) else:
break
logger.warning( 'The following file%s caused problems and will be ignored:\n' % util.plural_s(len(failures)) + '\n'.join(failures))
'''Trace container base class.
Base class for Pile, SubPile, and TracesFile, i.e. anything containing a collection of several traces. A TracesGroup object maintains lookup sets of some of the traces meta-information, as well as a combined time-range of its contents. '''
return self.parent
self.nslc_ids, self.deltats = [Counter() for x in range(6)]
self.by_tmin = Sorted(content, 'tmin') self.by_tmax = Sorted(content, 'tmax') self.by_tlen = Sorted(content, tlen) self.by_mtime = Sorted(content, 'mtime') self.adjust_minmax()
for net in self.networks: if isinstance(net, str): return
self.networks = fix_unicode_copy(self.networks, str) self.stations = fix_unicode_copy(self.stations, str) self.locations = fix_unicode_copy(self.locations, str) self.channels = fix_unicode_copy(self.channels, str) self.nslc_ids = fix_unicode_copy( self.nslc_ids, lambda k: tuple(str(x) for x in k))
''' Add content to traces group and update indices.
Accepts :py:class:`pyrocko.trace.Trace` objects and :py:class:`pyrocko.pile.TracesGroup` objects. '''
''' Remove content to traces group and update indices. ''' content = [content]
if isinstance(c, TracesGroup): self.networks.subtract(c.networks) self.stations.subtract(c.stations) self.locations.subtract(c.locations) self.channels.subtract(c.channels) self.nslc_ids.subtract(c.nslc_ids) self.deltats.subtract(c.deltats)
self.by_tmin.remove_many(c.by_tmin) self.by_tmax.remove_many(c.by_tmax) self.by_tlen.remove_many(c.by_tlen) self.by_mtime.remove_many(c.by_mtime)
elif isinstance(c, trace.Trace): self.networks.subtract1(c.network) self.stations.subtract1(c.station) self.locations.subtract1(c.location) self.channels.subtract1(c.channel) self.nslc_ids.subtract1(c.nslc_id) self.deltats.subtract1(c.deltat)
self.by_tmin.remove(c) self.by_tmax.remove(c) self.by_tlen.remove(c) self.by_mtime.remove(c)
self.parent.remove(content)
'''Return list of :py:class:`pyrocko.trace.Trace` objects where given arguments ``tmin`` and ``tmax`` match.
:param tmin: start time :param tmax: end time :param group_selector: lambda expression taking group dict of regex match object as a single argument and which returns true or false to keep or reject a file (default: ``None``) :param trace_selector: lambda expression taking group dict of regex match object as a single argument and which returns true or false to keep or reject a file (default: ``None``) '''
tmin, tmax, group_selector):
return []
if tr.is_relevant(tmin, tmax, trace_selector)]
else:
return self.nupdates
return self.tmin is not None \ and tmax >= self.tmin and self.tmax >= tmin
group_selector is None or group_selector(self))
'''This is needed to make traces without an actual disc file to be inserted into a Pile.'''
TracesGroup.__init__(self, parent) self.add(traces) self.mtime = time.time()
if isinstance(traces, trace.Trace): traces = [traces]
for tr in traces: tr.file = self
TracesGroup.add(self, traces)
pass
pass
pass
pass
return False
for tr in self.by_tmin: yield tr
return list(self.by_tmin)
keys = set() for tr in self.by_tmin: if selector is None or selector(tr): keys.add(gather(tr))
return keys
def __str__(self):
s = 'MemTracesFile\n' s += 'file mtime: %s\n' % util.time_to_str(self.mtime) s += 'number of traces: %i\n' % len(self.by_tmin) s += 'timerange: %s - %s\n' % ( util.time_to_str(self.tmin), util.time_to_str(self.tmax)) s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) return s
self, parent, abspath, format, substitutions=None, mtime=None):
self.mtime = os.stat(self.abspath)[8]
format=self.format, getdata=False, substitutions=self.substitutions):
substitutions=self.substitutions)
# prevent adding duplicate snippets from corrupt mseed files
self.remove(tr) self.traces.remove(tr) tr.file = None file_changed = True
tr.file = self self.traces.append(tr) self.add(tr) file_changed = True
logger.debug('reloaded (file may have changed): %s' % self.abspath)
raise Exception('Data not loaded')
else: self.data_use_count = 0
mtime = os.stat(self.abspath)[8] if mtime != self.mtime: logger.debug( 'mtime=%i, reloading file: %s' % (mtime, self.abspath))
self.mtime = mtime if self.data_loaded: self.load_data(force=True) else: self.load_headers()
return True
return False
for tr in self.traces: yield tr
keys = set() for tr in self.by_tmin: if selector is None or selector(tr): keys.add(gather(tr))
return keys
def __str__(self): s = 'TracesFile\n' s += 'abspath: %s\n' % self.abspath s += 'file mtime: %s\n' % util.time_to_str(self.mtime) s += 'number of traces: %i\n' % len(self.traces) s += 'timerange: %s - %s\n' % ( util.time_to_str(self.tmin), util.time_to_str(self.tmax)) s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) return s
self.files.remove(file) file.set_parent(None) self.remove(file)
for file in files: self.files.remove(file) file.set_parent(None) self.remove(files)
keys = set() for file in self.files: keys |= file.gather_keys(gather, selector)
return keys
self, load_data=False, return_abspath=False, group_selector=None, trace_selector=None):
for file in self.files:
if group_selector and not group_selector(file): continue
must_drop = False if load_data: file.load_data() file.use_data() must_drop = True
for tr in file.iter_traces(): if trace_selector and not trace_selector(tr): continue
if return_abspath: yield file.abspath, tr else: yield tr
if must_drop: file.drop_data()
for file in self.files: yield file
modified = False for file in self.files: modified |= file.reload_if_modified()
return modified
def __str__(self): s = 'SubPile\n' s += 'number of files: %i\n' % len(self.files) s += 'timerange: %s - %s\n' % ( util.time_to_str(self.tmin), util.time_to_str(self.tmax)) s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) return s
'''Waveform archive lookup, data loading and caching infrastructure.'''
self.listeners.append(weakref.ref(obj))
obj = ref() if obj: obj.pile_changed(what)
self, filenames, filename_attributes=None, fileformat='mseed', cache=None, show_progress=True, update_progress=None):
filenames, fileformat, cache, filename_attributes, show_progress=show_progress, update_progress=update_progress)
logger.warning('File already in pile: %s' % file.abspath) return
logger.warning('Sampling rate of all traces are zero in file: %s' % file.abspath) return
subpile = file.get_parent() if subpile is not None: subpile.remove_file(file) if file.abspath is not None: self.abspaths.remove(file.abspath)
subpile_files = {} for file in files: subpile = file.get_parent() if subpile not in subpile_files: subpile_files[subpile] = []
subpile_files[subpile].append(file)
for subpile, files in subpile_files.items(): subpile.remove_files(files) for file in files: if file.abspath is not None: self.abspaths.remove(file.abspath)
return list(self.deltats.keys())
self, tmin, tmax, group_selector=None, trace_selector=None, snap=(round, round), include_last=False, load_data=True):
files_changed = True
traces = self.relevant( tmin, tmax, group_selector, trace_selector)
tr = tr.copy(data=False) tr.ydata = None
tmin, tmax, inplace=False, snap=snap, include_last=include_last))
except trace.NoData: pass
self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, tpad):
elif degap: if (0. < emin <= 5. * tr.deltat and -5. * tr.deltat <= emax < 0.):
tr.extend( wmin-tpad, wmax+tpad-tr.deltat, fillmethod='repeat')
chopped_weeded.append(tr)
self, tmin=None, tmax=None, tinc=None, tpad=0., group_selector=None, trace_selector=None, want_incomplete=True, degap=True, maxgap=5, maxlap=None, keep_current_files_open=False, accessor_id=None, snap=(round, round), include_last=False, load_data=True):
''' Get iterator for shifting window wise data extraction from waveform archive.
:param tmin: start time (default uses start time of available data) :param tmax: end time (default uses end time of available data) :param tinc: time increment (window shift time) (default uses ``tmax-tmin``) :param tpad: padding time appended on either side of the data windows (window overlap is ``2*tpad``) :param group_selector: filter callback taking :py:class:`TracesGroup` objects :param trace_selector: filter callback taking :py:class:`pyrocko.trace.Trace` objects :param want_incomplete: if set to ``False``, gappy/incomplete traces are discarded from the results :param degap: whether to try to connect traces and to remove gaps and overlaps :param maxgap: maximum gap size in samples which is filled with interpolated samples when ``degap`` is ``True`` :param maxlap: maximum overlap size in samples which is removed when ``degap`` is ``True`` :param keep_current_files_open: whether to keep cached trace data in memory after the iterator has ended :param accessor_id: if given, used as a key to identify different points of extraction for the decision of when to release cached trace data (should be used when data is alternately extracted from more than one region / selection) :param snap: replaces Python's :py:func:`round` function which is used to determine indices where to start and end the trace data array :param include_last: whether to include last sample :param load_data: whether to load the waveform data. If set to ``False``, traces with no data samples, but with correct meta-information are returned :returns: itererator yielding a list of :py:class:`pyrocko.trace.Trace` objects for every extracted time window ''' if self.tmin is None: logger.warning('Pile\'s tmin is not set - pile may be empty.') return tmin = self.tmin + tpad
if self.tmax is None: logger.warning('Pile\'s tmax is not set - pile may be empty.') return tmax = self.tmax - tpad
wmin-tpad, wmax+tpad, group_selector, trace_selector, snap, include_last, load_data)
# increment datause counter on newly opened files
chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, tpad)
file = unused_files.pop() file.drop_data() open_files.remove(file)
''' Shortcut to aggregate :py:meth:`chopper` output into a single list. '''
for traces in self.chopper(*args, **kwargs): for tr in traces: yield tr
keys = self.gather_keys(gather) if len(keys) == 0: return
outer_group_selector = None if 'group_selector' in kwargs: outer_group_selector = kwargs['group_selector']
outer_trace_selector = None if 'trace_selector' in kwargs: outer_trace_selector = kwargs['trace_selector']
# the use of this gather-cache makes it impossible to modify the pile # during chopping gather_cache = {} pbar = None if progress is not None: pbar = util.progressbar(progress, len(keys))
for ikey, key in enumerate(keys): def tsel(tr): return gather(tr) == key and (outer_trace_selector is None or outer_trace_selector(tr))
def gsel(gr): if gr not in gather_cache: gather_cache[gr] = gr.gather_keys(gather)
return key in gather_cache[gr] and ( outer_group_selector is None or outer_group_selector(gr))
kwargs['trace_selector'] = tsel kwargs['group_selector'] = gsel
for traces in self.chopper(*args, **kwargs): yield traces
if pbar: pbar.update(ikey+1)
if pbar: pbar.finish()
keys = set() for subpile in self.subpiles.values(): keys |= subpile.gather_keys(gather, selector)
return sorted(keys)
self, load_data=False, return_abspath=False, group_selector=None, trace_selector=None):
'''Iterate over all traces in pile.
:param load_data: whether to load the waveform data, by default empty traces are yielded :param return_abspath: if ``True`` yield tuples containing absolute file path and :py:class:`pyrocko.trace.Trace` objects :param group_selector: filter callback taking :py:class:`TracesGroup` objects :param trace_selector: filter callback taking :py:class:`pyrocko.trace.Trace` objects
Example; yields only traces, where the station code is 'HH1'::
test_pile = pile.make_pile('/local/test_trace_directory') for t in test_pile.iter_traces( trace_selector=lambda tr: tr.station=='HH1'):
print t '''
for subpile in self.subpiles.values(): if not group_selector or group_selector(subpile): for tr in subpile.iter_traces(load_data, return_abspath, group_selector, trace_selector): yield tr
for subpile in self.subpiles.values(): for file in subpile.iter_files(): yield file
modified = False for subpile in self.subpiles.values(): modified |= subpile.reload_modified()
return modified
return self.tmin
return self.tmax
return self.deltatmin
return self.deltatmax
return self.tmin is None and self.tmax is None
def __str__(self): if self.tmin is not None and self.tmax is not None: tmin = util.time_to_str(self.tmin) tmax = util.time_to_str(self.tmax) s = 'Pile\n' s += 'number of subpiles: %i\n' % len(self.subpiles) s += 'timerange: %s - %s\n' % (tmin, tmax) s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys()))
else: s = 'empty Pile'
return s
'''Visualize it.
:param stations: list of `pyrocko.model.Station` objects or ``None`` :param events: list of `pyrocko.model.Event` objects or ``None`` :param markers: list of `pyrocko.gui_util.Marker` objects or ``None`` :param ntracks: float, number of tracks to be shown initially (default: 12) :param follow: time interval (in seconds) for real time follow mode or ``None`` :param controls: bool, whether to show the main controls (default: ``True``) :param opengl: bool, whether to use opengl (default: ``False``) '''
from pyrocko.gui.snuffler import snuffle snuffle(self, **kwargs)
paths=None, selector=None, regex=None, fileformat='mseed', cachedirname=None, show_progress=True):
'''Create pile from given file and directory names.
:param paths: filenames and/or directories to look for traces. If paths is ``None`` ``sys.argv[1:]`` is used. :param selector: lambda expression taking group dict of regex match object as a single argument and which returns true or false to keep or reject a file :param regex: regular expression which filenames have to match :param fileformat: format of the files ('mseed', 'sac', 'kan', 'from_extension', 'detect') :param cachedirname: loader cache is stored under this directory. It is created as neccessary. :param show_progress: show progress bar and other progress information '''
if show_progress_force_off: show_progress = False
if isinstance(paths, str): paths = [paths]
if paths is None: paths = sys.argv[1:]
if cachedirname is None: cachedirname = config.config().cache_dir
fns = util.select_files( paths, selector, regex, show_progress=show_progress)
cache = get_cache(cachedirname) p = Pile() p.load_files( sorted(fns), cache=cache, fileformat=fileformat, show_progress=show_progress)
return p
self, pile, fixation_length=None, path=None, format='from_extension', forget_fixed=False):
trace.States.__init__(self) self._pile = pile self._fixation_length = fixation_length self._format = format self._path = path self._forget_fixed = forget_fixed
'''Set length after which the fixation method is called on buffer traces.
The length should be given in seconds. Give None to disable. ''' self.fixate_all() self._fixation_length = length # in seconds
self, path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' '%(tmin)s_%(tmax)s.mseed'):
self.fixate_all() self._path = path
logger.debug('Received a trace: %s' % trace)
buf = self.get(trace) if buf is None: trbuf = trace.copy() buf = MemTracesFile(None, [trbuf]) self._pile.add_file(buf) self.set(trace, buf)
else: self._pile.remove_file(buf) trbuf = buf.get_traces()[0] buf.remove(trbuf) trbuf.append(trace.ydata) buf.add(trbuf) self._pile.add_file(buf) self.set(trace, buf)
trbuf = buf.get_traces()[0] if self._fixation_length is not None: if trbuf.tmax - trbuf.tmin > self._fixation_length: self._fixate(buf, complete=False)
for state in list(self._states.values()): self._fixate(state[-1])
self._states = {}
self._fixate(buf)
trbuf = buf.get_traces()[0] del_state = True if self._path: if self._fixation_length is not None: ttmin = trbuf.tmin ytmin = util.year_start(ttmin) n = int(math.floor((ttmin - ytmin) / self._fixation_length)) tmin = ytmin + n*self._fixation_length traces = [] t = tmin while t <= trbuf.tmax: try: traces.append( trbuf.chop( t, t+self._fixation_length, inplace=False, snap=(math.ceil, math.ceil)))
except trace.NoData: pass t += self._fixation_length
if abs(traces[-1].tmax - (t - trbuf.deltat)) < \ trbuf.deltat/100. or complete:
self._pile.remove_file(buf)
else: # reinsert incomplete last part new_trbuf = traces.pop() self._pile.remove_file(buf) buf.remove(trbuf) buf.add(new_trbuf) self._pile.add_file(buf) del_state = False
else: traces = [trbuf] self._pile.remove_file(buf)
fns = io.save(traces, self._path, format=self._format)
if not self._forget_fixed: self._pile.load_files( fns, show_progress=False, fileformat=self._format)
if del_state: del self._states[trbuf.nslc_id]
self.fixate_all() |