1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

# http://pyrocko.org - GPLv3 

# 

# The Pyrocko Developers, 21st Century 

# ---|P------/S----------~Lg---------- 

 

import logging 

 

logger = logging.getLogger('pyrocko.squirrel.cache') 

 

 

class ContentCache(object): 

 

''' 

Simple memory cache for file contents. 

 

Squirrel manages data in small entities: nuts. Only the meta-data for each 

nut is stored in the database, content data has to be read from file. This 

cache helps to speed up data access for typical seismological access 

patterns. 

 

Content data for stations, channels and instrument responses is small in 

size but slow to parse so it makes sense to cache these indefinitely once 

read. Also, it is usually inefficient to read a single station from a 

station file, so it is better to cache the contents of the complete file 

even if only one station is requested (it is likely that other stations 

from that file will be used anyway). 

 

Content data for waveforms is large in size and we usually want to free the 

memory allocated for them after processing. Typical processing schemes 

require batches of waveforms to be available together (e.g. 

cross-correlations between pairs of stations) and there may be overlap 

between successive batches (e.g. sliding window processing schemes). 

 

This cache implementation uses named accessors and batch window counting 

for flexible content caching. Loaded contents are held in memory as long as 

an accessor is holding a reference to it. For each accessor a batch counter 

is maintained, which starts at 0 and is incremented using calls to 

:py:meth:`advance_accessor`. Content accesses are tracked with calls to 

:py:meth:`get`, which sets a "last access" attribute on the cached item to 

the current value of the batch counter (each accessor has its own last 

access attribute on the items it uses). References to items which have 

not been accessed during the latest batch by the accessor in question are 

released during :py:meth:`advance_accessor`. :py:meth:`put` inserts new 

items into the cache. :py:meth:`has` checks if there already is content 

cached for a given item. To remove all references held by a given accessor, 

:py:meth:`clear_accessor` can be called. 

 

**Example usage** 

 

For meta-data content to be cached indefinitely, no calls to 

:py:meth:`advance_accessor` or :py:meth:`clear_accessor` should be made. 

For waveform content one would call :py:meth:`advance_accessor` after each 

move of a sliding window or :py:meth:`clear_accessor` after each processed 

event. For a process requiring data from two independent positions of 

extraction, e.g. for cross-correlations between all possible pairs of a set 

of events, two separate accessor names could be used. 

''' 

 

def __init__(self): 

self._entries = {} 

self._accessor_ticks = {} 

 

def _prune_outdated(self, path, segment, nut_mtime): 

try: 

cache_mtime = self._entries[path, segment][0] 

except KeyError: 

return 

 

if cache_mtime != nut_mtime: 

logger.debug('Forgetting (outdated): %s %s' % (path, segment)) 

del self._entries[path, segment] 

 

def put(self, nut): 

''' 

Insert a new/updated item into cache. 

 

:param nut: 

Content item with attached data object. 

:type nut: 

:py:class:`~pyrocko.squirrel.model.Nut` 

''' 

path, segment, element, mtime = nut.key 

self._prune_outdated(path, segment, nut.file_mtime) 

 

if (path, segment) not in self._entries: 

self._entries[path, segment] = nut.file_mtime, {}, {} 

 

self._entries[path, segment][1][element] = nut.content 

 

def get(self, nut, accessor='default'): 

''' 

Get a content item and track its access. 

 

:param nut: 

Content item. 

:type nut: 

:py:class:`~pyrocko.squirrel.model.Nut` 

 

:param accessor: 

Name of accessing consumer. Giving a new name initializes a new 

accessor. 

:type accessor: 

str 

 

:returns: 

Content data object 

''' 

path, segment, element, mtime = nut.key 

entry = self._entries[path, segment] 

 

if accessor not in self._accessor_ticks: 

self._accessor_ticks[accessor] = 0 

 

entry[2][accessor] = self._accessor_ticks[accessor] 

 

return entry[1][element] 

 

def has(self, nut): 

''' 

Check if item's content is currently in cache. 

 

:param nut: 

Content item. 

:type nut: 

:py:class:`~pyrocko.squirrel.model.Nut` 

 

:returns: 

:py:class:`bool` 

 

''' 

path, segment, element, nut_mtime = nut.key 

 

try: 

entry = self._entries[path, segment] 

cache_mtime = entry[0] 

entry[1][element] 

except KeyError: 

return False 

 

return cache_mtime == nut_mtime 

 

def advance_accessor(self, accessor='default'): 

''' 

Increment batch counter of an accessor. 

 

:param accessor: 

Name of accessing consumer. Giving a new name initializes a new 

accessor. 

:type accessor: 

str 

''' 

if accessor not in self._accessor_ticks: 

self._accessor_ticks[accessor] = 0 

 

ta = self._accessor_ticks[accessor] 

 

delete = [] 

for path_segment, entry in self._entries.items(): 

t = entry[2].get(accessor, ta) 

if t < ta: 

del entry[2][accessor] 

if not entry[2]: 

delete.append(path_segment) 

 

for path_segment in delete: 

logger.debug('Forgetting (advance): %s %s' % path_segment) 

del self._entries[path_segment] 

 

self._accessor_ticks[accessor] += 1 

 

def clear_accessor(self, accessor='default'): 

''' 

Clear all references held by an accessor. 

 

:param accessor: 

Name of accessing consumer. 

:type accessor: 

str 

''' 

delete = [] 

for path_segment, entry in self._entries.items(): 

entry[2].pop(accessor, None) 

if not entry[2]: 

delete.append(path_segment) 

 

for path_segment in delete: 

logger.debug('Forgetting (clear): %s %s' % path_segment) 

del self._entries[path_segment] 

 

del self._accessor_ticks[accessor] 

 

def clear(self): 

''' 

Empty the cache. 

''' 

for accessor in list(self._accessor_ticks.keys()): 

self.clear_accessor(accessor) 

 

self._entries = {} 

self._accessor_ticks = {}