1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import os.path as op
9import logging
10import time
11try:
12 import cPickle as pickle
13except ImportError:
14 import pickle
16from pyrocko import util
17from pyrocko.guts import String, Dict, Duration, dump_all
19from .base import Source
20from ..model import ehash
21from ..lock import LockDir
23guts_prefix = 'squirrel'
25logger = logging.getLogger('psq.client.catalog')
28class Link(object):
29 def __init__(self, tmin, tmax, tmodified, nevents=-1, content_id=None):
30 self.tmin = tmin
31 self.tmax = tmax
32 self.tmodified = tmodified
33 self.nevents = nevents
34 self.content_id = content_id
36 def __str__(self):
37 return 'span %s - %s, access %s, nevents %i' % (
38 util.tts(self.tmin),
39 util.tts(self.tmax),
40 util.tts(self.tmodified),
41 self.nevents)
44class NoSuchCatalog(Exception):
45 pass
48def get_catalog(name):
49 if name == 'geofon':
50 from pyrocko.client.geofon import Geofon
51 return Geofon()
52 elif name == 'gcmt':
53 from pyrocko.client.globalcmt import GlobalCMT
54 return GlobalCMT()
55 else:
56 raise NoSuchCatalog(name)
59class CatalogSource(Source):
60 '''
61 Squirrel data-source to transparently access online earthquake catalogs.
63 The catalog source maintains and synchronizes a partial copy of the online
64 catalog, e.g. of all events above a certain magnitude. The time span for
65 which the local copy of the catalog should be up-to date is maintained
66 automatically be Squirrel. Data is loaded and updated in chunks as
67 needed in a just-in-time fashion. Data validity can optionally expire after
68 a given period of time and new data can be treated to be preliminary.
69 In both cases information will be refreshed as needed.
70 '''
72 catalog = String.T(
73 help='Catalog name.')
75 query_args = Dict.T(
76 String.T(), String.T(),
77 optional=True,
78 help='Common arguments, which are appended to all queries, e.g. to '
79 'constrain location, depth or magnitude ranges.')
81 expires = Duration.T(
82 optional=True,
83 help='Expiration time [s]. Information older than this will be '
84 'refreshed, i.e. queried again.')
86 anxious = Duration.T(
87 optional=True,
88 help='Anxiety period [s]. Information will be treated as preliminary '
89 'if it was younger than this at the time of its retrieval. '
90 'Preliminary information is refreshed on each query relevant '
91 'to it.')
93 cache_path = String.T(
94 optional=True,
95 help='Directory path where the partial local copy of the catalog is '
96 'kept. By default the Squirrel environment\'s cache directory is '
97 'used.')
99 def __init__(self, catalog, query_args=None, **kwargs):
100 Source.__init__(self, catalog=catalog, query_args=query_args, **kwargs)
102 self._hash = self.make_hash()
103 self._nevents_query_hint = 1000
104 self._nevents_chunk_hint = 5000
105 self._tquery = 3600.*24.
106 self._tquery_limits = (3600., 3600.*24.*365.)
108 def describe(self):
109 return 'catalog:%s:%s' % (self.catalog, self.get_hash())
111 def setup(self, squirrel, check=True):
112 self._force_query_age_max = self.anxious
113 self._catalog = get_catalog(self.catalog)
115 self._cache_path = op.join(
116 self.cache_path or squirrel._cache_path,
117 'catalog',
118 self.get_hash())
120 util.ensuredir(self._cache_path)
122 with LockDir(self._cache_path):
123 self._load_chain()
124 self._add_events_to_squirrel(squirrel)
126 def make_hash(self):
127 s = self.catalog
128 if self.query_args is not None:
129 s += ','.join(
130 '%s:%s' % (k, self.query_args[k])
131 for k in sorted(self.query_args.keys()))
132 else:
133 s += 'noqueryargs'
135 return ehash(s)
137 def get_hash(self):
138 return self._hash
140 def update_event_inventory(self, squirrel, constraint=None):
142 with LockDir(self._cache_path):
143 self._load_chain()
145 assert constraint is not None
146 if constraint is not None:
147 tmin, tmax = constraint.tmin, constraint.tmax
149 tmin_sq, tmax_sq = squirrel.get_time_span()
151 if tmin is None:
152 tmin = tmin_sq
154 if tmax is None:
155 tmax = tmax_sq
157 if tmin is None or tmax is None:
158 logger.warning(
159 'Cannot query catalog source "%s" without time '
160 'constraint. Could not determine appropriate time '
161 'constraint from current data holdings (no data?).'
162 % self.catalog)
164 return
166 if tmin >= tmax:
167 return
169 tnow = time.time()
170 modified = False
172 if not self._chain:
173 self._chain = [Link(tmin, tmax, tnow)]
174 modified = True
175 else:
176 if tmin < self._chain[0].tmin:
177 self._chain[0:0] = [Link(tmin, self._chain[0].tmin, tnow)]
178 modified = True
179 if self._chain[-1].tmax < tmax:
180 self._chain.append(Link(self._chain[-1].tmax, tmax, tnow))
181 modified = True
183 chain = []
184 remove = []
185 for link in self._chain:
186 if tmin < link.tmax and link.tmin < tmax \
187 and self._outdated(link, tnow):
189 if link.content_id:
190 remove.append(
191 self._get_events_file_path(link.content_id))
193 tmin_query = max(link.tmin, tmin)
194 tmax_query = min(link.tmax, tmax)
196 if link.tmin < tmin_query:
197 chain.append(Link(link.tmin, tmin_query, tnow))
199 if tmin_query < tmax_query:
200 for link in self._iquery(tmin_query, tmax_query, tnow):
201 chain.append(link)
203 if tmax_query < link.tmax:
204 chain.append(Link(tmax_query, link.tmax, tnow))
206 modified = True
208 else:
209 chain.append(link)
211 if modified:
212 self._chain = chain
213 self._dump_chain()
214 squirrel.remove(remove)
216 self._add_events_to_squirrel(squirrel)
218 def _add_events_to_squirrel(self, squirrel):
219 add = []
220 for link in self._chain:
221 if link.content_id:
222 add.append(self._get_events_file_path(link.content_id))
224 squirrel.add(add, kinds=['event'], format='yaml')
226 def _iquery(self, tmin, tmax, tmodified):
228 nwant = self._nevents_query_hint
229 tlim = self._tquery_limits
231 t = tmin
232 tpack_min = tmin
234 events = []
235 while t < tmax:
236 tmin_query = t
237 tmax_query = min(t + self._tquery, tmax)
239 events_new = self._query(tmin_query, tmax_query)
240 nevents_new = len(events_new)
241 events.extend(events_new)
242 while len(events) > int(self._nevents_chunk_hint * 1.5):
243 tpack_max = events[self._nevents_chunk_hint].time
244 yield self._pack(
245 events[:self._nevents_chunk_hint],
246 tpack_min, tpack_max, tmodified)
248 tpack_min = tpack_max
249 events[:self._nevents_query_hint] = []
251 t += self._tquery
253 if tmax_query != tmax:
254 if nevents_new < 5:
255 self._tquery *= 10.0
257 elif not (nwant // 2 < nevents_new < nwant * 2):
258 self._tquery /= float(nevents_new) / float(nwant)
260 self._tquery = max(tlim[0], min(self._tquery, tlim[1]))
262 if self._force_query_age_max is not None:
263 tsplit = tmodified - self._force_query_age_max
264 if tpack_min < tsplit < tmax:
265 events_older = []
266 events_newer = []
267 for ev in events:
268 if ev.time < tsplit:
269 events_older.append(ev)
270 else:
271 events_newer.append(ev)
273 yield self._pack(events_older, tpack_min, tsplit, tmodified)
274 yield self._pack(events_newer, tsplit, tmax, tmodified)
275 return
277 yield self._pack(events, tpack_min, tmax, tmodified)
279 def _pack(self, events, tmin, tmax, tmodified):
280 if events:
281 content_id = ehash(
282 self.get_hash() + ' %r %r %r' % (tmin, tmax, tmodified))
283 path = self._get_events_file_path(content_id)
284 dump_all(events, filename=path)
285 else:
286 content_id = None
288 return Link(tmin, tmax, tmodified, len(events), content_id)
290 def query_args_typed(self):
291 if self.query_args is None:
292 return {}
293 else:
294 type_map = {
295 'magmin': float,
296 'magmax': float,
297 'latmin': float,
298 'latmax': float,
299 'lonmin': float,
300 'lonmax': float,
301 'depthmin': float,
302 'depthmax': float}
304 return dict(
305 (k, type_map.get(k, str)(v))
306 for (k, v) in self.query_args.items())
308 def _query(self, tmin, tmax):
309 logger.info('Querying catalog "%s" for time span %s - %s.' % (
310 self.catalog, util.tts(tmin), util.tts(tmax)))
312 return self._catalog.get_events(
313 (tmin, tmax),
314 **self.query_args_typed())
316 def _outdated(self, link, tnow):
317 if link.nevents == -1:
318 return True
320 if self._force_query_age_max \
321 and link.tmax + self._force_query_age_max > link.tmodified:
323 return True
325 if self.expires is not None \
326 and link.tmodified < tnow - self.expires:
328 return True
330 return False
332 def _get_events_file_path(self, fhash):
333 return op.join(self._cache_path, fhash + '.pf')
335 def _get_chain_file_path(self):
336 return op.join(self._cache_path, 'chain.pickle')
338 def _load_chain(self):
339 path = self._get_chain_file_path()
340 if op.exists(path):
341 with open(path, 'rb') as f:
342 self._chain = pickle.load(f)
343 else:
344 self._chain = []
346 def _dump_chain(self):
347 with open(self._get_chain_file_path(), 'wb') as f:
348 pickle.dump(self._chain, f, protocol=2)
351__all__ = [
352 'CatalogSource'
353]