1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import os.path as op
7import logging
8import time
9try:
10 import cPickle as pickle
11except ImportError:
12 import pickle
14from pyrocko import util
15from pyrocko.guts import String, Dict, Duration, dump_all
17from .base import Source
18from ..model import ehash
19from ..lock import LockDir
21guts_prefix = 'squirrel'
23logger = logging.getLogger('psq.client.catalog')
26class Link(object):
27 def __init__(self, tmin, tmax, tmodified, nevents=-1, content_id=None):
28 self.tmin = tmin
29 self.tmax = tmax
30 self.tmodified = tmodified
31 self.nevents = nevents
32 self.content_id = content_id
34 def __str__(self):
35 return 'span %s - %s, access %s, nevents %i' % (
36 util.tts(self.tmin),
37 util.tts(self.tmax),
38 util.tts(self.tmodified),
39 self.nevents)
42class NoSuchCatalog(Exception):
43 pass
46def get_catalog(name):
47 if name == 'geofon':
48 from pyrocko.client.geofon import Geofon
49 return Geofon()
50 elif name == 'gcmt':
51 from pyrocko.client.globalcmt import GlobalCMT
52 return GlobalCMT()
53 else:
54 raise NoSuchCatalog(name)
57class CatalogSource(Source):
58 '''
59 Squirrel data-source to transparently access online earthquake catalogs.
61 The catalog source maintains and synchronizes a partial copy of the online
62 catalog, e.g. of all events above a certain magnitude. The time span for
63 which the local copy of the catalog should be up-to date is maintained
64 automatically be Squirrel. Data is loaded and updated in chunks as
65 needed in a just-in-time fashion. Data validity can optionally expire after
66 a given period of time and new data can be treated to be preliminary.
67 In both cases information will be refreshed as needed.
68 '''
70 catalog = String.T(
71 help='Catalog name.')
73 query_args = Dict.T(
74 String.T(), String.T(),
75 optional=True,
76 help='Common arguments, which are appended to all queries, e.g. to '
77 'constrain location, depth or magnitude ranges.')
79 expires = Duration.T(
80 optional=True,
81 help='Expiration time [s]. Information older than this will be '
82 'refreshed, i.e. queried again.')
84 anxious = Duration.T(
85 optional=True,
86 help='Anxiety period [s]. Information will be treated as preliminary '
87 'if it was younger than this at the time of its retrieval. '
88 'Preliminary information is refreshed on each query relevant '
89 'to it.')
91 cache_path = String.T(
92 optional=True,
93 help='Directory path where the partial local copy of the catalog is '
94 "kept. By default the Squirrel environment's cache directory is "
95 'used.')
97 def __init__(self, catalog, query_args=None, **kwargs):
98 Source.__init__(self, catalog=catalog, query_args=query_args, **kwargs)
100 self._hash = self.make_hash()
101 self._nevents_query_hint = 1000
102 self._nevents_chunk_hint = 5000
103 self._tquery = 3600.*24.
104 self._tquery_limits = (3600., 3600.*24.*365.)
106 def describe(self):
107 return 'catalog:%s:%s' % (self.catalog, self.get_hash())
109 def setup(self, squirrel, check=True):
110 self._force_query_age_max = self.anxious
111 self._catalog = get_catalog(self.catalog)
113 self._cache_path = op.join(
114 self.cache_path or squirrel._cache_path,
115 'catalog',
116 self.get_hash())
118 util.ensuredir(self._cache_path)
120 with LockDir(self._cache_path):
121 self._load_chain()
122 self._add_events_to_squirrel(squirrel)
124 def make_hash(self):
125 s = self.catalog
126 if self.query_args is not None:
127 s += ','.join(
128 '%s:%s' % (k, self.query_args[k])
129 for k in sorted(self.query_args.keys()))
130 else:
131 s += 'noqueryargs'
133 return ehash(s)
135 def get_hash(self):
136 return self._hash
138 def update_event_inventory(self, squirrel, constraint=None):
140 with LockDir(self._cache_path):
141 self._load_chain()
143 assert constraint is not None
144 if constraint is not None:
145 tmin, tmax = constraint.tmin, constraint.tmax
147 tmin_sq, tmax_sq = squirrel.get_time_span(dummy_limits=False)
149 if tmin is None:
150 tmin = tmin_sq
152 if tmax is None:
153 tmax = tmax_sq
155 if tmin is None or tmax is None:
156 logger.warning(
157 'Cannot query catalog source "%s" without time '
158 'constraint. Could not determine appropriate time '
159 'constraint from current data holdings (no data?).'
160 % self.catalog)
162 return
164 if tmin >= tmax:
165 return
167 tnow = time.time()
168 modified = False
170 if not self._chain:
171 self._chain = [Link(tmin, tmax, tnow)]
172 modified = True
173 else:
174 if tmin < self._chain[0].tmin:
175 self._chain[0:0] = [Link(tmin, self._chain[0].tmin, tnow)]
176 modified = True
177 if self._chain[-1].tmax < tmax:
178 self._chain.append(Link(self._chain[-1].tmax, tmax, tnow))
179 modified = True
181 chain = []
182 remove = []
183 for link in self._chain:
184 if tmin < link.tmax and link.tmin < tmax \
185 and self._outdated(link, tnow):
187 if link.content_id:
188 remove.append(
189 self._get_events_file_path(link.content_id))
191 tmin_query = max(link.tmin, tmin)
192 tmax_query = min(link.tmax, tmax)
194 if link.tmin < tmin_query:
195 chain.append(Link(link.tmin, tmin_query, tnow))
197 if tmin_query < tmax_query:
198 for link in self._iquery(tmin_query, tmax_query, tnow):
199 chain.append(link)
201 if tmax_query < link.tmax:
202 chain.append(Link(tmax_query, link.tmax, tnow))
204 modified = True
206 else:
207 chain.append(link)
209 if modified:
210 self._chain = chain
211 self._dump_chain()
212 squirrel.remove(remove)
214 self._add_events_to_squirrel(squirrel)
216 def _add_events_to_squirrel(self, squirrel):
217 add = []
218 for link in self._chain:
219 if link.content_id:
220 add.append(self._get_events_file_path(link.content_id))
222 squirrel.add(add, kinds=['event'], format='yaml')
224 def _iquery(self, tmin, tmax, tmodified):
226 nwant = self._nevents_query_hint
227 tlim = self._tquery_limits
229 t = tmin
230 tpack_min = tmin
232 events = []
233 while t < tmax:
234 tmin_query = t
235 tmax_query = min(t + self._tquery, tmax)
237 events_new = self._query(tmin_query, tmax_query)
238 nevents_new = len(events_new)
239 events.extend(events_new)
240 while len(events) > int(self._nevents_chunk_hint * 1.5):
241 tpack_max = events[self._nevents_chunk_hint].time
242 yield self._pack(
243 events[:self._nevents_chunk_hint],
244 tpack_min, tpack_max, tmodified)
246 tpack_min = tpack_max
247 events[:self._nevents_query_hint] = []
249 t += self._tquery
251 if tmax_query != tmax:
252 if nevents_new < 5:
253 self._tquery *= 10.0
255 elif not (nwant // 2 < nevents_new < nwant * 2):
256 self._tquery /= float(nevents_new) / float(nwant)
258 self._tquery = max(tlim[0], min(self._tquery, tlim[1]))
260 if self._force_query_age_max is not None:
261 tsplit = tmodified - self._force_query_age_max
262 if tpack_min < tsplit < tmax:
263 events_older = []
264 events_newer = []
265 for ev in events:
266 if ev.time < tsplit:
267 events_older.append(ev)
268 else:
269 events_newer.append(ev)
271 yield self._pack(events_older, tpack_min, tsplit, tmodified)
272 yield self._pack(events_newer, tsplit, tmax, tmodified)
273 return
275 yield self._pack(events, tpack_min, tmax, tmodified)
277 def _pack(self, events, tmin, tmax, tmodified):
278 if events:
279 content_id = ehash(
280 self.get_hash() + ' %r %r %r' % (tmin, tmax, tmodified))
281 path = self._get_events_file_path(content_id)
282 dump_all(events, filename=path)
283 else:
284 content_id = None
286 return Link(tmin, tmax, tmodified, len(events), content_id)
288 def query_args_typed(self):
289 if self.query_args is None:
290 return {}
291 else:
292 type_map = {
293 'magmin': float,
294 'magmax': float,
295 'latmin': float,
296 'latmax': float,
297 'lonmin': float,
298 'lonmax': float,
299 'depthmin': float,
300 'depthmax': float}
302 return dict(
303 (k, type_map.get(k, str)(v))
304 for (k, v) in self.query_args.items())
306 def _query(self, tmin, tmax):
307 logger.info('Querying catalog "%s" for time span %s - %s.' % (
308 self.catalog, util.tts(tmin), util.tts(tmax)))
310 return self._catalog.get_events(
311 (tmin, tmax),
312 **self.query_args_typed())
314 def _outdated(self, link, tnow):
315 if link.nevents == -1:
316 return True
318 if self._force_query_age_max \
319 and link.tmax + self._force_query_age_max > link.tmodified:
321 return True
323 if self.expires is not None \
324 and link.tmodified < tnow - self.expires:
326 return True
328 return False
330 def _get_events_file_path(self, fhash):
331 return op.join(self._cache_path, fhash + '.pf')
333 def _get_chain_file_path(self):
334 return op.join(self._cache_path, 'chain.pickle')
336 def _load_chain(self):
337 path = self._get_chain_file_path()
338 if op.exists(path):
339 with open(path, 'rb') as f:
340 self._chain = pickle.load(f)
341 else:
342 self._chain = []
344 def _dump_chain(self):
345 with open(self._get_chain_file_path(), 'wb') as f:
346 pickle.dump(self._chain, f, protocol=2)
349__all__ = [
350 'CatalogSource'
351]