1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6from __future__ import absolute_import, print_function 

7 

8import os.path as op 

9import logging 

10import time 

11try: 

12 import cPickle as pickle 

13except ImportError: 

14 import pickle 

15 

16from pyrocko import util 

17from pyrocko.guts import String, Dict, Duration, dump_all 

18 

19from .base import Source 

20from ..model import ehash 

21from ..lock import LockDir 

22 

23guts_prefix = 'squirrel' 

24 

25logger = logging.getLogger('psq.client.catalog') 

26 

27 

28class Link(object): 

29 def __init__(self, tmin, tmax, tmodified, nevents=-1, content_id=None): 

30 self.tmin = tmin 

31 self.tmax = tmax 

32 self.tmodified = tmodified 

33 self.nevents = nevents 

34 self.content_id = content_id 

35 

36 def __str__(self): 

37 return 'span %s - %s, access %s, nevents %i' % ( 

38 util.tts(self.tmin), 

39 util.tts(self.tmax), 

40 util.tts(self.tmodified), 

41 self.nevents) 

42 

43 

44class NoSuchCatalog(Exception): 

45 pass 

46 

47 

48def get_catalog(name): 

49 if name == 'geofon': 

50 from pyrocko.client.geofon import Geofon 

51 return Geofon() 

52 elif name == 'gcmt': 

53 from pyrocko.client.globalcmt import GlobalCMT 

54 return GlobalCMT() 

55 else: 

56 raise NoSuchCatalog(name) 

57 

58 

59class CatalogSource(Source): 

60 ''' 

61 Squirrel data-source to transparently access online earthquake catalogs. 

62 

63 The catalog source maintains and synchronizes a partial copy of the online 

64 catalog, e.g. of all events above a certain magnitude. The time span for 

65 which the local copy of the catalog should be up-to date is maintained 

66 automatically be Squirrel. Data is loaded and updated in chunks as 

67 needed in a just-in-time fashion. Data validity can optionally expire after 

68 a given period of time and new data can be treated to be preliminary. 

69 In both cases information will be refreshed as needed. 

70 ''' 

71 

72 catalog = String.T( 

73 help='Catalog name.') 

74 

75 query_args = Dict.T( 

76 String.T(), String.T(), 

77 optional=True, 

78 help='Common arguments, which are appended to all queries, e.g. to ' 

79 'constrain location, depth or magnitude ranges.') 

80 

81 expires = Duration.T( 

82 optional=True, 

83 help='Expiration time [s]. Information older than this will be ' 

84 'refreshed, i.e. queried again.') 

85 

86 anxious = Duration.T( 

87 optional=True, 

88 help='Anxiety period [s]. Information will be treated as preliminary ' 

89 'if it was younger than this at the time of its retrieval. ' 

90 'Preliminary information is refreshed on each query relevant ' 

91 'to it.') 

92 

93 cache_path = String.T( 

94 optional=True, 

95 help='Directory path where the partial local copy of the catalog is ' 

96 'kept. By default the Squirrel environment\'s cache directory is ' 

97 'used.') 

98 

99 def __init__(self, catalog, query_args=None, **kwargs): 

100 Source.__init__(self, catalog=catalog, query_args=query_args, **kwargs) 

101 

102 self._hash = self.make_hash() 

103 self._nevents_query_hint = 1000 

104 self._nevents_chunk_hint = 5000 

105 self._tquery = 3600.*24. 

106 self._tquery_limits = (3600., 3600.*24.*365.) 

107 

108 def describe(self): 

109 return 'catalog:%s:%s' % (self.catalog, self.get_hash()) 

110 

111 def setup(self, squirrel, check=True): 

112 self._force_query_age_max = self.anxious 

113 self._catalog = get_catalog(self.catalog) 

114 

115 self._cache_path = op.join( 

116 self.cache_path or squirrel._cache_path, 

117 'catalog', 

118 self.get_hash()) 

119 

120 util.ensuredir(self._cache_path) 

121 

122 with LockDir(self._cache_path): 

123 self._load_chain() 

124 self._add_events_to_squirrel(squirrel) 

125 

126 def make_hash(self): 

127 s = self.catalog 

128 if self.query_args is not None: 

129 s += ','.join( 

130 '%s:%s' % (k, self.query_args[k]) 

131 for k in sorted(self.query_args.keys())) 

132 else: 

133 s += 'noqueryargs' 

134 

135 return ehash(s) 

136 

137 def get_hash(self): 

138 return self._hash 

139 

140 def update_event_inventory(self, squirrel, constraint=None): 

141 

142 with LockDir(self._cache_path): 

143 self._load_chain() 

144 

145 assert constraint is not None 

146 if constraint is not None: 

147 tmin, tmax = constraint.tmin, constraint.tmax 

148 

149 tmin_sq, tmax_sq = squirrel.get_time_span() 

150 

151 if tmin is None: 

152 tmin = tmin_sq 

153 

154 if tmax is None: 

155 tmax = tmax_sq 

156 

157 if tmin is None or tmax is None: 

158 logger.warning( 

159 'Cannot query catalog source "%s" without time ' 

160 'constraint. Could not determine appropriate time ' 

161 'constraint from current data holdings (no data?).' 

162 % self.catalog) 

163 

164 return 

165 

166 if tmin >= tmax: 

167 return 

168 

169 tnow = time.time() 

170 modified = False 

171 

172 if not self._chain: 

173 self._chain = [Link(tmin, tmax, tnow)] 

174 modified = True 

175 else: 

176 if tmin < self._chain[0].tmin: 

177 self._chain[0:0] = [Link(tmin, self._chain[0].tmin, tnow)] 

178 modified = True 

179 if self._chain[-1].tmax < tmax: 

180 self._chain.append(Link(self._chain[-1].tmax, tmax, tnow)) 

181 modified = True 

182 

183 chain = [] 

184 remove = [] 

185 for link in self._chain: 

186 if tmin < link.tmax and link.tmin < tmax \ 

187 and self._outdated(link, tnow): 

188 

189 if link.content_id: 

190 remove.append( 

191 self._get_events_file_path(link.content_id)) 

192 

193 tmin_query = max(link.tmin, tmin) 

194 tmax_query = min(link.tmax, tmax) 

195 

196 if link.tmin < tmin_query: 

197 chain.append(Link(link.tmin, tmin_query, tnow)) 

198 

199 if tmin_query < tmax_query: 

200 for link in self._iquery(tmin_query, tmax_query, tnow): 

201 chain.append(link) 

202 

203 if tmax_query < link.tmax: 

204 chain.append(Link(tmax_query, link.tmax, tnow)) 

205 

206 modified = True 

207 

208 else: 

209 chain.append(link) 

210 

211 if modified: 

212 self._chain = chain 

213 self._dump_chain() 

214 squirrel.remove(remove) 

215 

216 self._add_events_to_squirrel(squirrel) 

217 

218 def _add_events_to_squirrel(self, squirrel): 

219 add = [] 

220 for link in self._chain: 

221 if link.content_id: 

222 add.append(self._get_events_file_path(link.content_id)) 

223 

224 squirrel.add(add, kinds=['event'], format='yaml') 

225 

226 def _iquery(self, tmin, tmax, tmodified): 

227 

228 nwant = self._nevents_query_hint 

229 tlim = self._tquery_limits 

230 

231 t = tmin 

232 tpack_min = tmin 

233 

234 events = [] 

235 while t < tmax: 

236 tmin_query = t 

237 tmax_query = min(t + self._tquery, tmax) 

238 

239 events_new = self._query(tmin_query, tmax_query) 

240 nevents_new = len(events_new) 

241 events.extend(events_new) 

242 while len(events) > int(self._nevents_chunk_hint * 1.5): 

243 tpack_max = events[self._nevents_chunk_hint].time 

244 yield self._pack( 

245 events[:self._nevents_chunk_hint], 

246 tpack_min, tpack_max, tmodified) 

247 

248 tpack_min = tpack_max 

249 events[:self._nevents_query_hint] = [] 

250 

251 t += self._tquery 

252 

253 if tmax_query != tmax: 

254 if nevents_new < 5: 

255 self._tquery *= 10.0 

256 

257 elif not (nwant // 2 < nevents_new < nwant * 2): 

258 self._tquery /= float(nevents_new) / float(nwant) 

259 

260 self._tquery = max(tlim[0], min(self._tquery, tlim[1])) 

261 

262 if self._force_query_age_max is not None: 

263 tsplit = tmodified - self._force_query_age_max 

264 if tpack_min < tsplit < tmax: 

265 events_older = [] 

266 events_newer = [] 

267 for ev in events: 

268 if ev.time < tsplit: 

269 events_older.append(ev) 

270 else: 

271 events_newer.append(ev) 

272 

273 yield self._pack(events_older, tpack_min, tsplit, tmodified) 

274 yield self._pack(events_newer, tsplit, tmax, tmodified) 

275 return 

276 

277 yield self._pack(events, tpack_min, tmax, tmodified) 

278 

279 def _pack(self, events, tmin, tmax, tmodified): 

280 if events: 

281 content_id = ehash( 

282 self.get_hash() + ' %r %r %r' % (tmin, tmax, tmodified)) 

283 path = self._get_events_file_path(content_id) 

284 dump_all(events, filename=path) 

285 else: 

286 content_id = None 

287 

288 return Link(tmin, tmax, tmodified, len(events), content_id) 

289 

290 def query_args_typed(self): 

291 if self.query_args is None: 

292 return {} 

293 else: 

294 type_map = { 

295 'magmin': float, 

296 'magmax': float, 

297 'latmin': float, 

298 'latmax': float, 

299 'lonmin': float, 

300 'lonmax': float, 

301 'depthmin': float, 

302 'depthmax': float} 

303 

304 return dict( 

305 (k, type_map.get(k, str)(v)) 

306 for (k, v) in self.query_args.items()) 

307 

308 def _query(self, tmin, tmax): 

309 logger.info('Querying catalog "%s" for time span %s - %s.' % ( 

310 self.catalog, util.tts(tmin), util.tts(tmax))) 

311 

312 return self._catalog.get_events( 

313 (tmin, tmax), 

314 **self.query_args_typed()) 

315 

316 def _outdated(self, link, tnow): 

317 if link.nevents == -1: 

318 return True 

319 

320 if self._force_query_age_max \ 

321 and link.tmax + self._force_query_age_max > link.tmodified: 

322 

323 return True 

324 

325 if self.expires is not None \ 

326 and link.tmodified < tnow - self.expires: 

327 

328 return True 

329 

330 return False 

331 

332 def _get_events_file_path(self, fhash): 

333 return op.join(self._cache_path, fhash + '.pf') 

334 

335 def _get_chain_file_path(self): 

336 return op.join(self._cache_path, 'chain.pickle') 

337 

338 def _load_chain(self): 

339 path = self._get_chain_file_path() 

340 if op.exists(path): 

341 with open(path, 'rb') as f: 

342 self._chain = pickle.load(f) 

343 else: 

344 self._chain = [] 

345 

346 def _dump_chain(self): 

347 with open(self._get_chain_file_path(), 'wb') as f: 

348 pickle.dump(self._chain, f, protocol=2) 

349 

350 

351__all__ = [ 

352 'CatalogSource' 

353]