Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/client/catalog.py: 81%
193 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Squirrel online earthquake catalog client.
8'''
10import os.path as op
11import logging
12import time
13try:
14 import cPickle as pickle
15except ImportError:
16 import pickle
18from pyrocko import util
19from pyrocko.guts import String, Dict, Duration, dump_all
21from .base import Source
22from ..model import ehash
23from ..lock import LockDir
25guts_prefix = 'squirrel'
27logger = logging.getLogger('psq.client.catalog')
30class Link(object):
31 def __init__(self, tmin, tmax, tmodified, nevents=-1, content_id=None):
32 self.tmin = tmin
33 self.tmax = tmax
34 self.tmodified = tmodified
35 self.nevents = nevents
36 self.content_id = content_id
38 def __str__(self):
39 return 'span %s - %s, access %s, nevents %i' % (
40 util.tts(self.tmin),
41 util.tts(self.tmax),
42 util.tts(self.tmodified),
43 self.nevents)
46class NoSuchCatalog(Exception):
47 pass
50def get_catalog(name):
51 if name == 'geofon':
52 from pyrocko.client.geofon import Geofon
53 return Geofon()
54 elif name == 'gcmt':
55 from pyrocko.client.globalcmt import GlobalCMT
56 return GlobalCMT()
57 else:
58 raise NoSuchCatalog(name)
61class CatalogSource(Source):
62 '''
63 Squirrel data-source to transparently access online earthquake catalogs.
65 The catalog source maintains and synchronizes a partial copy of the online
66 catalog, e.g. of all events above a certain magnitude. The time span for
67 which the local copy of the catalog should be up-to date is maintained
68 automatically be Squirrel. Data is loaded and updated in chunks as
69 needed in a just-in-time fashion. Data validity can optionally expire after
70 a given period of time and new data can be treated to be preliminary.
71 In both cases information will be refreshed as needed.
72 '''
74 catalog = String.T(
75 help='Catalog name.')
77 query_args = Dict.T(
78 String.T(), String.T(),
79 optional=True,
80 help='Common arguments, which are appended to all queries, e.g. to '
81 'constrain location, depth or magnitude ranges.')
83 expires = Duration.T(
84 optional=True,
85 help='Expiration time [s]. Information older than this will be '
86 'refreshed, i.e. queried again.')
88 anxious = Duration.T(
89 optional=True,
90 help='Anxiety period [s]. Information will be treated as preliminary '
91 'if it was younger than this at the time of its retrieval. '
92 'Preliminary information is refreshed on each query relevant '
93 'to it.')
95 cache_path = String.T(
96 optional=True,
97 help='Directory path where the partial local copy of the catalog is '
98 "kept. By default the Squirrel environment's cache directory is "
99 'used.')
101 def __init__(self, catalog, query_args=None, **kwargs):
102 Source.__init__(self, catalog=catalog, query_args=query_args, **kwargs)
104 self._hash = self.make_hash()
105 self._nevents_query_hint = 1000
106 self._nevents_chunk_hint = 5000
107 self._tquery = 3600.*24.
108 self._tquery_limits = (3600., 3600.*24.*365.)
110 def describe(self):
111 return 'catalog:%s:%s' % (self.catalog, self.get_hash())
113 def setup(self, squirrel, check=True):
114 self._force_query_age_max = self.anxious
115 self._catalog = get_catalog(self.catalog)
117 self._cache_path = op.join(
118 self.cache_path or squirrel._cache_path,
119 'catalog',
120 self.get_hash())
122 util.ensuredir(self._cache_path)
124 with LockDir(self._cache_path):
125 self._load_chain()
126 self._add_events_to_squirrel(squirrel)
128 def make_hash(self):
129 s = self.catalog
130 if self.query_args is not None:
131 s += ','.join(
132 '%s:%s' % (k, self.query_args[k])
133 for k in sorted(self.query_args.keys()))
134 else:
135 s += 'noqueryargs'
137 return ehash(s)
139 def get_hash(self):
140 return self._hash
142 def update_event_inventory(self, squirrel, constraint=None):
144 with LockDir(self._cache_path):
145 self._load_chain()
147 assert constraint is not None
148 if constraint is not None:
149 tmin, tmax = constraint.tmin, constraint.tmax
151 tmin_sq, tmax_sq = squirrel.get_time_span(dummy_limits=False)
153 if tmin is None:
154 tmin = tmin_sq
156 if tmax is None:
157 tmax = tmax_sq
159 if tmin is None or tmax is None:
160 logger.warning(
161 'Cannot query catalog source "%s" without time '
162 'constraint. Could not determine appropriate time '
163 'constraint from current data holdings (no data?).'
164 % self.catalog)
166 return
168 if tmin >= tmax:
169 return
171 tnow = time.time()
172 modified = False
174 if tmax > tnow:
175 tmax = tnow
177 if not self._chain:
178 self._chain = [Link(tmin, tmax, tnow)]
179 modified = True
180 else:
181 if tmin < self._chain[0].tmin:
182 self._chain[0:0] = [Link(tmin, self._chain[0].tmin, tnow)]
183 modified = True
184 if self._chain[-1].tmax < tmax:
185 self._chain.append(Link(self._chain[-1].tmax, tmax, tnow))
186 modified = True
188 chain = []
189 remove = []
190 for link in self._chain:
191 if tmin < link.tmax and link.tmin < tmax \
192 and self._outdated(link, tnow):
194 if link.content_id:
195 remove.append(
196 self._get_events_file_path(link.content_id))
198 tmin_query = max(link.tmin, tmin)
199 tmax_query = min(link.tmax, tmax)
201 if link.tmin < tmin_query:
202 chain.append(Link(link.tmin, tmin_query, tnow))
204 if tmin_query < tmax_query:
205 for link in self._iquery(tmin_query, tmax_query, tnow):
206 chain.append(link)
208 if tmax_query < link.tmax:
209 chain.append(Link(tmax_query, link.tmax, tnow))
211 modified = True
213 else:
214 chain.append(link)
216 if modified:
217 self._chain = chain
218 self._dump_chain()
219 squirrel.remove(remove)
221 self._add_events_to_squirrel(squirrel)
223 def _add_events_to_squirrel(self, squirrel):
224 add = []
225 for link in self._chain:
226 if link.content_id:
227 add.append(self._get_events_file_path(link.content_id))
229 squirrel.add(add, kinds=['event'], format='yaml')
231 def _iquery(self, tmin, tmax, tmodified):
233 nwant = self._nevents_query_hint
234 tlim = self._tquery_limits
236 t = tmin
237 tpack_min = tmin
239 events = []
240 while t < tmax:
241 tmin_query = t
242 tmax_query = min(t + self._tquery, tmax)
244 events_new = self._query(tmin_query, tmax_query)
245 nevents_new = len(events_new)
246 events.extend(events_new)
247 while len(events) > int(self._nevents_chunk_hint * 1.5):
248 tpack_max = events[self._nevents_chunk_hint].time
249 yield self._pack(
250 events[:self._nevents_chunk_hint],
251 tpack_min, tpack_max, tmodified)
253 tpack_min = tpack_max
254 events[:self._nevents_query_hint] = []
256 t += self._tquery
258 if tmax_query != tmax:
259 if nevents_new < 5:
260 self._tquery *= 10.0
262 elif not (nwant // 2 < nevents_new < nwant * 2):
263 self._tquery /= float(nevents_new) / float(nwant)
265 self._tquery = max(tlim[0], min(self._tquery, tlim[1]))
267 if self._force_query_age_max is not None:
268 tsplit = tmodified - self._force_query_age_max
269 if tpack_min < tsplit < tmax:
270 events_older = []
271 events_newer = []
272 for ev in events:
273 if ev.time < tsplit:
274 events_older.append(ev)
275 else:
276 events_newer.append(ev)
278 yield self._pack(events_older, tpack_min, tsplit, tmodified)
279 yield self._pack(events_newer, tsplit, tmax, tmodified)
280 return
282 yield self._pack(events, tpack_min, tmax, tmodified)
284 def _pack(self, events, tmin, tmax, tmodified):
285 if events:
286 content_id = ehash(
287 self.get_hash() + ' %r %r %r' % (tmin, tmax, tmodified))
288 path = self._get_events_file_path(content_id)
289 dump_all(events, filename=path)
290 else:
291 content_id = None
293 return Link(tmin, tmax, tmodified, len(events), content_id)
295 def query_args_typed(self):
296 if self.query_args is None:
297 return {}
298 else:
299 type_map = {
300 'magmin': float,
301 'magmax': float,
302 'latmin': float,
303 'latmax': float,
304 'lonmin': float,
305 'lonmax': float,
306 'depthmin': float,
307 'depthmax': float}
309 return dict(
310 (k, type_map.get(k, str)(v))
311 for (k, v) in self.query_args.items())
313 def _query(self, tmin, tmax):
314 logger.info('Querying catalog "%s" for time span %s - %s.' % (
315 self.catalog, util.tts(tmin), util.tts(tmax)))
317 return self._catalog.get_events(
318 (tmin, tmax),
319 **self.query_args_typed())
321 def _outdated(self, link, tnow):
322 if link.nevents == -1:
323 return True
325 if self._force_query_age_max \
326 and link.tmax + self._force_query_age_max > link.tmodified:
328 return True
330 if self.expires is not None \
331 and link.tmodified < tnow - self.expires:
333 return True
335 return False
337 def _get_events_file_path(self, fhash):
338 return op.join(self._cache_path, fhash + '.pf')
340 def _get_chain_file_path(self):
341 return op.join(self._cache_path, 'chain.pickle')
343 def _load_chain(self):
344 path = self._get_chain_file_path()
345 if op.exists(path):
346 with open(path, 'rb') as f:
347 self._chain = pickle.load(f)
348 else:
349 self._chain = []
351 def _dump_chain(self):
352 with open(self._get_chain_file_path(), 'wb') as f:
353 pickle.dump(self._chain, f, protocol=2)
356__all__ = [
357 'CatalogSource'
358]