1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

# http://pyrocko.org - GPLv3 

# 

# The Pyrocko Developers, 21st Century 

# ---|P------/S----------~Lg---------- 

 

from __future__ import absolute_import, print_function 

 

import os.path as op 

import logging 

import time 

try: 

import cPickle as pickle 

except ImportError: 

import pickle 

 

from pyrocko import util 

from pyrocko.guts import String, Dict, Duration, dump_all 

 

from .base import Source 

from ..model import ehash 

from ..lock import LockDir 

 

guts_prefix = 'squirrel' 

 

logger = logging.getLogger('psq.client.catalog') 

 

 

class Link(object): 

def __init__(self, tmin, tmax, tmodified, nevents=-1, content_id=None): 

self.tmin = tmin 

self.tmax = tmax 

self.tmodified = tmodified 

self.nevents = nevents 

self.content_id = content_id 

 

def __str__(self): 

return 'span %s - %s, access %s, nevents %i' % ( 

util.tts(self.tmin), 

util.tts(self.tmax), 

util.tts(self.tmodified), 

self.nevents) 

 

 

class NoSuchCatalog(Exception): 

pass 

 

 

def get_catalog(name): 

if name == 'geofon': 

from pyrocko.client.geofon import Geofon 

return Geofon() 

elif name == 'gcmt': 

from pyrocko.client.globalcmt import GlobalCMT 

return GlobalCMT() 

else: 

raise NoSuchCatalog(name) 

 

 

class CatalogSource(Source): 

''' 

Squirrel data-source to transparently access online earthquake catalogs. 

 

The catalog source maintains and synchronizes a partial copy of the online 

catalog, e.g. of all events above a certain magnitude. The time span for 

which the local copy of the catalog should be up-to date is maintained 

automatically be Squirrel. Data is loaded and updated in chunks as 

needed in a just-in-time fashion. Data validity can optionally expire after 

a given period of time and new data can be treated to be preliminary. 

In both cases information will be refreshed as needed. 

''' 

 

catalog = String.T( 

help='Catalog name.') 

 

query_args = Dict.T( 

String.T(), String.T(), 

optional=True, 

help='Common arguments, which are appended to all queries, e.g. to ' 

'constrain location, depth or magnitude ranges.') 

 

expires = Duration.T( 

optional=True, 

help='Expiration time [s]. Information older than this will be ' 

'refreshed, i.e. queried again.') 

 

anxious = Duration.T( 

optional=True, 

help='Anxiety period [s]. Information will be treated as preliminary ' 

'if it was younger than this at the time of its retrieval. ' 

'Preliminary information is refreshed on each query relevant ' 

'to it.') 

 

cache_path = String.T( 

optional=True, 

help='Directory path where the partial local copy of the catalog is ' 

'kept. By default the Squirrel environment\'s cache directory is ' 

'used.') 

 

def __init__(self, catalog, query_args=None, **kwargs): 

Source.__init__(self, catalog=catalog, query_args=query_args, **kwargs) 

 

self._hash = self.make_hash() 

self._nevents_query_hint = 1000 

self._nevents_chunk_hint = 5000 

self._tquery = 3600.*24. 

self._tquery_limits = (3600., 3600.*24.*365.) 

 

def setup(self, squirrel, check=True, progress_viewer='terminal'): 

self._force_query_age_max = self.anxious 

self._catalog = get_catalog(self.catalog) 

 

self._cache_path = op.join( 

self.cache_path or squirrel._cache_path, 

'catalog', 

self.get_hash()) 

 

util.ensuredir(self._cache_path) 

 

def make_hash(self): 

s = self.catalog 

if self.query_args is not None: 

s += ','.join( 

'%s:%s' % (k, self.query_args[k]) 

for k in sorted(self.query_args.keys())) 

else: 

s += 'noqueryargs' 

 

return ehash(s) 

 

def get_hash(self): 

return self._hash 

 

def update_event_inventory(self, squirrel, constraint=None): 

 

with LockDir(self._cache_path): 

self._load_chain() 

 

assert constraint is not None 

if constraint is not None: 

tmin, tmax = constraint.tmin, constraint.tmax 

 

tmin_sq, tmax_sq = squirrel.get_time_span() 

 

if tmin is None: 

tmin = tmin_sq 

 

if tmax is None: 

tmax = tmax_sq 

 

if tmin is None or tmax is None: 

logger.warn( 

'Cannot query catalog source "%s" without time ' 

'constraint. Could not determine appropriate time ' 

'constraint from current data holdings (no data?).' 

% self.catalog) 

 

return 

 

if tmin >= tmax: 

return 

 

tnow = time.time() 

modified = False 

 

if not self._chain: 

self._chain = [Link(tmin, tmax, tnow)] 

modified = True 

else: 

if tmin < self._chain[0].tmin: 

self._chain[0:0] = [Link(tmin, self._chain[0].tmin, tnow)] 

modified = True 

if self._chain[-1].tmax < tmax: 

self._chain.append(Link(self._chain[-1].tmax, tmax, tnow)) 

modified = True 

 

chain = [] 

remove = [] 

for link in self._chain: 

if tmin < link.tmax and link.tmin < tmax \ 

and self._outdated(link, tnow): 

 

if link.content_id: 

remove.append( 

self._get_events_file_path(link.content_id)) 

 

tmin_query = max(link.tmin, tmin) 

tmax_query = min(link.tmax, tmax) 

 

if link.tmin < tmin_query: 

chain.append(Link(link.tmin, tmin_query, tnow)) 

 

if tmin_query < tmax_query: 

for link in self._iquery(tmin_query, tmax_query, tnow): 

chain.append(link) 

 

if tmax_query < link.tmax: 

chain.append(Link(tmax_query, link.tmax, tnow)) 

 

modified = True 

 

else: 

chain.append(link) 

 

if modified: 

self._chain = chain 

self._dump_chain() 

squirrel.remove(remove) 

 

add = [] 

for link in self._chain: 

if link.content_id: 

add.append(self._get_events_file_path(link.content_id)) 

 

squirrel.add(add, kinds=['event'], format='yaml') 

 

def _iquery(self, tmin, tmax, tmodified): 

 

nwant = self._nevents_query_hint 

tlim = self._tquery_limits 

 

t = tmin 

tpack_min = tmin 

 

events = [] 

while t < tmax: 

tmin_query = t 

tmax_query = min(t + self._tquery, tmax) 

 

events_new = self._query(tmin_query, tmax_query) 

nevents_new = len(events_new) 

events.extend(events_new) 

while len(events) > int(self._nevents_chunk_hint * 1.5): 

tpack_max = events[self._nevents_chunk_hint].time 

yield self._pack( 

events[:self._nevents_chunk_hint], 

tpack_min, tpack_max, tmodified) 

 

tpack_min = tpack_max 

events[:self._nevents_query_hint] = [] 

 

t += self._tquery 

 

if tmax_query != tmax: 

if nevents_new < 5: 

self._tquery *= 10.0 

 

elif not (nwant // 2 < nevents_new < nwant * 2): 

self._tquery /= float(nevents_new) / float(nwant) 

 

self._tquery = max(tlim[0], min(self._tquery, tlim[1])) 

 

if self._force_query_age_max is not None: 

tsplit = tmodified - self._force_query_age_max 

if tpack_min < tsplit < tmax: 

events_older = [] 

events_newer = [] 

for ev in events: 

if ev.time < tsplit: 

events_older.append(ev) 

else: 

events_newer.append(ev) 

 

yield self._pack(events_older, tpack_min, tsplit, tmodified) 

yield self._pack(events_newer, tsplit, tmax, tmodified) 

return 

 

yield self._pack(events, tpack_min, tmax, tmodified) 

 

def _pack(self, events, tmin, tmax, tmodified): 

if events: 

content_id = ehash( 

self.get_hash() + ' %r %r %r' % (tmin, tmax, tmodified)) 

path = self._get_events_file_path(content_id) 

dump_all(events, filename=path) 

else: 

content_id = None 

 

return Link(tmin, tmax, tmodified, len(events), content_id) 

 

def _query(self, tmin, tmax): 

logger.info('Querying catalog "%s" for time span %s - %s.' % ( 

self.catalog, util.tts(tmin), util.tts(tmax))) 

 

return self._catalog.get_events( 

(tmin, tmax), 

**(self.query_args or {})) 

 

def _outdated(self, link, tnow): 

if link.nevents == -1: 

return True 

 

if self._force_query_age_max \ 

and link.tmax + self._force_query_age_max > link.tmodified: 

 

return True 

 

if self.expires is not None \ 

and link.tmodified < tnow - self.expires: 

 

return True 

 

return False 

 

def _get_events_file_path(self, fhash): 

return op.join(self._cache_path, fhash + '.pf') 

 

def _get_chain_file_path(self): 

return op.join(self._cache_path, 'chain.pickle') 

 

def _load_chain(self): 

path = self._get_chain_file_path() 

if op.exists(path): 

with open(path, 'rb') as f: 

self._chain = pickle.load(f) 

else: 

self._chain = [] 

 

def _dump_chain(self): 

with open(self._get_chain_file_path(), 'wb') as f: 

pickle.dump(self._chain, f, protocol=2) 

 

 

__all__ = [ 

'CatalogSource' 

]