Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/client/catalog.py: 79%
208 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Squirrel online earthquake catalog client.
8'''
10import os.path as op
11import logging
12import time
13try:
14 import cPickle as pickle
15except ImportError:
16 import pickle
18from pyrocko import util, progress
19from pyrocko.guts import String, Dict, Duration, dump_all
21from .base import Source
22from ..model import ehash, g_tmin, g_tmax
23from ..lock import LockDir
25guts_prefix = 'squirrel'
27logger = logging.getLogger('psq.client.catalog')
30class Link(object):
31 def __init__(self, tmin, tmax, tmodified, nevents=-1, content_id=None):
32 self.tmin = tmin
33 self.tmax = tmax
34 self.tmodified = tmodified
35 self.nevents = nevents
36 self.content_id = content_id
38 def __str__(self):
39 return 'span %s - %s, access %s, nevents %i' % (
40 util.tts(self.tmin),
41 util.tts(self.tmax),
42 util.tts(self.tmodified),
43 self.nevents)
46class NoSuchCatalog(Exception):
47 pass
50def get_catalog(name):
51 if name == 'geofon':
52 from pyrocko.client.geofon import Geofon
53 return Geofon()
54 elif name == 'gcmt':
55 from pyrocko.client.globalcmt import GlobalCMT
56 return GlobalCMT()
57 elif name == 'isc':
58 from pyrocko.client.isc import ISC
59 return ISC()
60 else:
61 raise NoSuchCatalog(name)
64class CatalogSource(Source):
65 '''
66 Squirrel data-source to transparently access online earthquake catalogs.
68 The catalog source maintains and synchronizes a partial copy of the online
69 catalog, e.g. of all events above a certain magnitude. The time span for
70 which the local copy of the catalog should be up-to date is maintained
71 automatically be Squirrel. Data is loaded and updated in chunks as
72 needed in a just-in-time fashion. Data validity can optionally expire after
73 a given period of time and new data can be treated to be preliminary.
74 In both cases information will be refreshed as needed.
75 '''
77 catalog = String.T(
78 help='Catalog name.')
80 query_args = Dict.T(
81 String.T(), String.T(),
82 optional=True,
83 help='Common arguments, which are appended to all queries, e.g. to '
84 'constrain location, depth or magnitude ranges.')
86 expires = Duration.T(
87 optional=True,
88 help='Expiration time [s]. Information older than this will be '
89 'refreshed, i.e. queried again.')
91 anxious = Duration.T(
92 optional=True,
93 help='Anxiety period [s]. Information will be treated as preliminary '
94 'if it was younger than this at the time of its retrieval. '
95 'Preliminary information is refreshed on each query relevant '
96 'to it.')
98 cache_path = String.T(
99 optional=True,
100 help='Directory path where the partial local copy of the catalog is '
101 "kept. By default the Squirrel environment's cache directory is "
102 'used.')
104 def __init__(self, catalog, query_args=None, **kwargs):
105 Source.__init__(self, catalog=catalog, query_args=query_args, **kwargs)
107 self._hash = self.make_hash()
108 self._nevents_query_hint = 1000
109 self._nevents_chunk_hint = 5000
110 self._tquery = 3600.*24.
111 self._tquery_limits = (3600., 3600.*24.*365.)
112 self._cache_path = None
114 def get_cache_path(self):
115 return self._cache_path
117 def describe(self):
118 return 'catalog:%s:%s' % (self.catalog, self.get_hash())
120 def setup(self, squirrel, check=True, upgrade=False):
121 self._force_query_age_max = self.anxious
122 self._catalog = get_catalog(self.catalog)
124 self._cache_path = op.join(
125 self.cache_path or squirrel._cache_path,
126 'catalog',
127 self.get_hash())
129 util.ensuredir(self._cache_path)
131 with LockDir(self._cache_path):
132 self._load_chain()
133 self._add_events_to_squirrel(squirrel)
135 def make_hash(self):
136 s = self.catalog
137 if self.query_args is not None:
138 s += ','.join(
139 '%s:%s' % (k, self.query_args[k])
140 for k in sorted(self.query_args.keys()))
141 else:
142 s += 'noqueryargs'
144 return ehash(s)
146 def get_hash(self):
147 return self._hash
149 def update_event_inventory(self, squirrel, constraint=None):
151 with LockDir(self._cache_path):
152 self._load_chain()
154 assert constraint is not None
155 if constraint is not None:
156 tmin, tmax = constraint.tmin, constraint.tmax
158 tmin_sq, tmax_sq = squirrel.get_time_span(dummy_limits=False)
160 if tmin is None:
161 tmin = tmin_sq
163 if tmax is None:
164 tmax = tmax_sq
166 if tmin == g_tmin:
167 tmin = None
169 if tmax == g_tmax:
170 tmax = None
172 if tmin is None or tmax is None:
173 logger.warning(
174 'Cannot query catalog source "%s" without time '
175 'constraint. Could not determine appropriate time '
176 'constraint from current data holdings (no data?).'
177 % self.catalog)
179 return
181 if tmin >= tmax:
182 tmax = tmin
184 tnow = time.time()
185 modified = False
187 if tmax > tnow:
188 tmax = tnow
190 chain = []
191 this_tmin = tmin
192 for link in self._chain:
193 if this_tmin < link.tmin and tmax > this_tmin:
194 chain.append(Link(this_tmin, min(tmax, link.tmin), tnow))
195 modified = True
197 chain.append(link)
198 this_tmin = link.tmax
200 if this_tmin < tmax:
201 chain.append(Link(this_tmin, tmax, tnow))
202 modified = True
204 if modified:
205 self._chain = chain
207 chain = []
208 remove = []
209 for link in self._chain:
210 if tmin < link.tmax and link.tmin < tmax \
211 and self._outdated(link, tnow):
213 if link.content_id:
214 remove.append(
215 self._get_events_file_path(link.content_id))
217 tmin_query = max(link.tmin, tmin)
218 tmax_query = min(link.tmax, tmax)
220 if link.tmin < tmin_query:
221 chain.append(Link(link.tmin, tmin_query, tnow))
223 if tmin_query < tmax_query:
224 for link in self._iquery(tmin_query, tmax_query, tnow):
225 chain.append(link)
227 if tmax_query < link.tmax:
228 chain.append(Link(tmax_query, link.tmax, tnow))
230 modified = True
232 else:
233 chain.append(link)
235 if modified:
236 self._chain = chain
237 self._dump_chain()
238 squirrel.remove(remove)
240 self._add_events_to_squirrel(squirrel)
242 def _add_events_to_squirrel(self, squirrel):
243 add = []
244 for link in self._chain:
245 if link.content_id:
246 add.append(self._get_events_file_path(link.content_id))
248 squirrel.add(add, kinds=['event'], format='yaml')
250 def _iquery(self, tmin, tmax, tmodified):
252 nwant = self._nevents_query_hint
253 tlim = self._tquery_limits
255 t = tmin
256 tpack_min = tmin
258 events = []
259 with progress.task(
260 'Querying %s' % self.catalog, 100, logger=logger) as task:
262 while t < tmax:
263 tmin_query = t
264 tmax_query = min(t + self._tquery, tmax)
266 events_new = self._query(tmin_query, tmax_query)
267 nevents_new = len(events_new)
268 events.extend(events_new)
269 while len(events) > int(self._nevents_chunk_hint * 1.5):
270 tpack_max = events[self._nevents_chunk_hint].time
271 yield self._pack(
272 events[:self._nevents_chunk_hint],
273 tpack_min, tpack_max, tmodified)
275 tpack_min = tpack_max
276 events[:self._nevents_query_hint] = []
278 t += self._tquery
280 if tmax_query != tmax:
281 if nevents_new < 5:
282 self._tquery *= 10.0
284 elif not (nwant // 2 < nevents_new < nwant * 2):
285 self._tquery /= float(nevents_new) / float(nwant)
287 self._tquery = max(tlim[0], min(self._tquery, tlim[1]))
288 task.update(int(round(100. * (t-tmin)/(tmax-tmin))))
290 if self._force_query_age_max is not None:
291 tsplit = tmodified - self._force_query_age_max
292 if tpack_min < tsplit < tmax:
293 events_older = []
294 events_newer = []
295 for ev in events:
296 if ev.time < tsplit:
297 events_older.append(ev)
298 else:
299 events_newer.append(ev)
301 yield self._pack(
302 events_older, tpack_min, tsplit, tmodified)
303 yield self._pack(
304 events_newer, tsplit, tmax, tmodified)
305 return
307 yield self._pack(events, tpack_min, tmax, tmodified)
309 def _pack(self, events, tmin, tmax, tmodified):
310 if events:
311 content_id = ehash(
312 self.get_hash() + ' %r %r %r' % (tmin, tmax, tmodified))
313 path = self._get_events_file_path(content_id)
314 dump_all(events, filename=path)
315 else:
316 content_id = None
318 return Link(tmin, tmax, tmodified, len(events), content_id)
320 def query_args_typed(self):
321 if self.query_args is None:
322 return {}
323 else:
324 type_map = {
325 'magmin': float,
326 'magmax': float,
327 'latmin': float,
328 'latmax': float,
329 'lonmin': float,
330 'lonmax': float,
331 'depthmin': float,
332 'depthmax': float}
334 return dict(
335 (k, type_map.get(k, str)(v))
336 for (k, v) in self.query_args.items())
338 def _query(self, tmin, tmax):
339 logger.info('Querying catalog "%s" for time span %s - %s.' % (
340 self.catalog, util.tts(tmin), util.tts(tmax)))
342 return self._catalog.get_events(
343 (tmin, tmax),
344 **self.query_args_typed())
346 def _outdated(self, link, tnow):
347 if link.nevents == -1:
348 return True
350 if self._force_query_age_max \
351 and link.tmax + self._force_query_age_max > link.tmodified:
353 return True
355 if self.expires is not None \
356 and link.tmodified < tnow - self.expires:
358 return True
360 return False
362 def _get_events_file_path(self, fhash):
363 return op.join(self._cache_path, fhash + '.pf')
365 def _get_chain_file_path(self):
366 return op.join(self._cache_path, 'chain.pickle')
368 def _load_chain(self):
369 path = self._get_chain_file_path()
370 if op.exists(path):
371 with open(path, 'rb') as f:
372 self._chain = pickle.load(f)
373 else:
374 self._chain = []
376 def _dump_chain(self):
377 with open(self._get_chain_file_path(), 'wb') as f:
378 pickle.dump(self._chain, f, protocol=2)
381__all__ = [
382 'CatalogSource'
383]