Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/cache.py: 91%

87 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-12-04 10:41 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel memory cacheing. 

8''' 

9 

10import logging 

11import threading 

12 

13from pyrocko.guts import Object, Int 

14 

15logger = logging.getLogger('psq.cache') 

16 

17 

18class ContentCacheStats(Object): 

19 ''' 

20 Information about cache state. 

21 ''' 

22 nentries = Int.T( 

23 help='Number of items in the cache.') 

24 naccessors = Int.T( 

25 help='Number of accessors currently holding references to cache ' 

26 'items.') 

27 

28 

29class ContentCache(object): 

30 

31 ''' 

32 Simple memory cache for file contents. 

33 

34 Squirrel manages data in small entities: nuts. Only the meta-data for each 

35 nut is stored in the database, content data has to be read from file. This 

36 cache helps to speed up data access for typical seismological access 

37 patterns. 

38 

39 Content data for stations, channels and instrument responses is small in 

40 size but slow to parse so it makes sense to cache these indefinitely once 

41 read. Also, it is usually inefficient to read a single station from a 

42 station file, so it is better to cache the contents of the complete file 

43 even if only one station is requested (it is likely that other stations 

44 from that file will be used anyway). 

45 

46 Content data for waveforms is large in size and we usually want to free the 

47 memory allocated for them after processing. Typical processing schemes 

48 require batches of waveforms to be available together (e.g. 

49 cross-correlations between pairs of stations) and there may be overlap 

50 between successive batches (e.g. sliding window processing schemes). 

51 

52 This cache implementation uses named accessors and batch window counting 

53 for flexible content caching. Loaded contents are held in memory as long as 

54 an accessor is holding a reference to it. For each accessor a batch counter 

55 is maintained, which starts at 0 and is incremented using calls to 

56 :py:meth:`advance_accessor`. Content accesses are tracked with calls to 

57 :py:meth:`get`, which sets a "last access" attribute on the cached item to 

58 the current value of the batch counter (each accessor has its own last 

59 access attribute on the items it uses). References to items which have 

60 not been accessed during the latest batch by the accessor in question are 

61 released during :py:meth:`advance_accessor`. :py:meth:`put` inserts new 

62 items into the cache. :py:meth:`has` checks if there already is content 

63 cached for a given item. To remove all references held by a given accessor, 

64 :py:meth:`clear_accessor` can be called. 

65 

66 **Example usage** 

67 

68 For meta-data content to be cached indefinitely, no calls to 

69 :py:meth:`advance_accessor` or :py:meth:`clear_accessor` should be made. 

70 For waveform content one would call :py:meth:`advance_accessor` after each 

71 move of a sliding window or :py:meth:`clear_accessor` after each processed 

72 event. For a process requiring data from two independent positions of 

73 extraction, e.g. for cross-correlations between all possible pairs of a set 

74 of events, two separate accessor names could be used. 

75 ''' 

76 

77 def __init__(self): 

78 self._entries = {} 

79 self._accessor_ticks = {} 

80 self._lock = threading.RLock() 

81 

82 def _prune_outdated(self, path, segment, nut_mtime): 

83 with self._lock: 

84 try: 

85 cache_mtime = self._entries[path, segment][0] 

86 except KeyError: 

87 return 

88 

89 if cache_mtime != nut_mtime: 

90 logger.debug('Forgetting (outdated): %s %s' % (path, segment)) 

91 self._entries.pop((path, segment), None) 

92 

93 def put(self, nut): 

94 ''' 

95 Insert a new/updated item into cache. 

96 

97 :param nut: 

98 Content item with attached data object. 

99 :type nut: 

100 :py:class:`~pyrocko.squirrel.model.Nut` 

101 ''' 

102 with self._lock: 

103 path, segment, element, mtime = nut.key 

104 self._prune_outdated(path, segment, nut.file_mtime) 

105 if (path, segment) not in self._entries: 

106 self._entries[path, segment] = nut.file_mtime, {}, {} 

107 

108 self._entries[path, segment][1][element] = nut 

109 

110 def get(self, nut, accessor='default', model='squirrel'): 

111 ''' 

112 Get a content item and track its access. 

113 

114 :param nut: 

115 Content item. 

116 :type nut: 

117 :py:class:`~pyrocko.squirrel.model.Nut` 

118 

119 :param accessor: 

120 Name of accessing consumer. Giving a new name initializes a new 

121 accessor. 

122 :type accessor: 

123 str 

124 

125 :returns: 

126 Content data object 

127 ''' 

128 with self._lock: 

129 path, segment, element, mtime = nut.key 

130 entry = self._entries[path, segment] 

131 

132 if accessor not in self._accessor_ticks: 

133 self._accessor_ticks[accessor] = 0 

134 

135 entry[2][accessor] = self._accessor_ticks[accessor] 

136 el = entry[1][element] 

137 

138 if model == 'squirrel': 

139 return el.content 

140 elif model.endswith('+'): 

141 return el.content, el.raw_content[model[:-1]] 

142 else: 

143 return el.raw_content[model] 

144 

145 def has(self, nut): 

146 ''' 

147 Check if item's content is currently in cache. 

148 

149 :param nut: 

150 Content item. 

151 :type nut: 

152 :py:class:`~pyrocko.squirrel.model.Nut` 

153 

154 :returns: 

155 :py:class:`bool` 

156 

157 ''' 

158 path, segment, element, nut_mtime = nut.key 

159 

160 with self._lock: 

161 try: 

162 entry = self._entries[path, segment] 

163 cache_mtime = entry[0] 

164 entry[1][element] 

165 except KeyError: 

166 return False 

167 

168 return cache_mtime == nut_mtime 

169 

170 def advance_accessor(self, accessor='default'): 

171 ''' 

172 Increment batch counter of an accessor. 

173 

174 :param accessor: 

175 Name of accessing consumer. Giving a new name initializes a new 

176 accessor. 

177 :type accessor: 

178 str 

179 ''' 

180 

181 with self._lock: 

182 if accessor not in self._accessor_ticks: 

183 self._accessor_ticks[accessor] = 0 

184 

185 ta = self._accessor_ticks[accessor] 

186 

187 delete = [] 

188 for path_segment, entry in self._entries.items(): 

189 t = entry[2].get(accessor, ta) 

190 if t < ta: 

191 entry[2].pop(accessor, None) 

192 if not entry[2]: 

193 delete.append(path_segment) 

194 

195 for path_segment in delete: 

196 logger.debug( 

197 'Forgetting (clear): %s %s' % path_segment) 

198 self._entries.pop(path_segment, None) 

199 

200 self._accessor_ticks[accessor] += 1 

201 

202 def clear_accessor(self, accessor='default'): 

203 ''' 

204 Clear all references held by an accessor. 

205 

206 :param accessor: 

207 Name of accessing consumer. 

208 :type accessor: 

209 str 

210 ''' 

211 with self._lock: 

212 delete = [] 

213 for path_segment, entry in self._entries.items(): 

214 entry[2].pop(accessor, None) 

215 if not entry[2]: 

216 delete.append(path_segment) 

217 

218 for path_segment in delete: 

219 logger.debug('Forgetting (clear): %s %s' % path_segment) 

220 self._entries.pop(path_segment, None) 

221 

222 self._accessor_ticks.pop(accessor, None) 

223 

224 def clear(self): 

225 ''' 

226 Empty the cache. 

227 ''' 

228 with self._lock: 

229 for accessor in list(self._accessor_ticks.keys()): 

230 self.clear_accessor(accessor) 

231 

232 self._entries = {} 

233 self._accessor_ticks = {} 

234 

235 def get_stats(self): 

236 ''' 

237 Get information about cache state. 

238 

239 :returns: :py:class:`ContentCacheStats` object. 

240 ''' 

241 with self._lock: 

242 return ContentCacheStats( 

243 nentries=len(self._entries), 

244 naccessors=len(self._accessor_ticks))