Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/cache.py: 89%

89 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-03-05 16:26 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel memory cacheing. 

8''' 

9 

10import logging 

11from threading import Lock 

12 

13from pyrocko.guts import Object, Int 

14 

15logger = logging.getLogger('psq.cache') 

16 

17CACHE_LOCK = Lock() 

18 

19 

20class ContentCacheStats(Object): 

21 ''' 

22 Information about cache state. 

23 ''' 

24 nentries = Int.T( 

25 help='Number of items in the cache.') 

26 naccessors = Int.T( 

27 help='Number of accessors currently holding references to cache ' 

28 'items.') 

29 

30 

31class ContentCache(object): 

32 

33 ''' 

34 Simple memory cache for file contents. 

35 

36 Squirrel manages data in small entities: nuts. Only the meta-data for each 

37 nut is stored in the database, content data has to be read from file. This 

38 cache helps to speed up data access for typical seismological access 

39 patterns. 

40 

41 Content data for stations, channels and instrument responses is small in 

42 size but slow to parse so it makes sense to cache these indefinitely once 

43 read. Also, it is usually inefficient to read a single station from a 

44 station file, so it is better to cache the contents of the complete file 

45 even if only one station is requested (it is likely that other stations 

46 from that file will be used anyway). 

47 

48 Content data for waveforms is large in size and we usually want to free the 

49 memory allocated for them after processing. Typical processing schemes 

50 require batches of waveforms to be available together (e.g. 

51 cross-correlations between pairs of stations) and there may be overlap 

52 between successive batches (e.g. sliding window processing schemes). 

53 

54 This cache implementation uses named accessors and batch window counting 

55 for flexible content caching. Loaded contents are held in memory as long as 

56 an accessor is holding a reference to it. For each accessor a batch counter 

57 is maintained, which starts at 0 and is incremented using calls to 

58 :py:meth:`advance_accessor`. Content accesses are tracked with calls to 

59 :py:meth:`get`, which sets a "last access" attribute on the cached item to 

60 the current value of the batch counter (each accessor has its own last 

61 access attribute on the items it uses). References to items which have 

62 not been accessed during the latest batch by the accessor in question are 

63 released during :py:meth:`advance_accessor`. :py:meth:`put` inserts new 

64 items into the cache. :py:meth:`has` checks if there already is content 

65 cached for a given item. To remove all references held by a given accessor, 

66 :py:meth:`clear_accessor` can be called. 

67 

68 **Example usage** 

69 

70 For meta-data content to be cached indefinitely, no calls to 

71 :py:meth:`advance_accessor` or :py:meth:`clear_accessor` should be made. 

72 For waveform content one would call :py:meth:`advance_accessor` after each 

73 move of a sliding window or :py:meth:`clear_accessor` after each processed 

74 event. For a process requiring data from two independent positions of 

75 extraction, e.g. for cross-correlations between all possible pairs of a set 

76 of events, two separate accessor names could be used. 

77 ''' 

78 

79 def __init__(self): 

80 self._entries = {} 

81 self._accessor_ticks = {} 

82 

83 def _prune_outdated(self, path, segment, nut_mtime): 

84 with CACHE_LOCK: 

85 try: 

86 cache_mtime = self._entries[path, segment][0] 

87 except KeyError: 

88 return 

89 

90 if cache_mtime != nut_mtime: 

91 logger.debug('Forgetting (outdated): %s %s' % (path, segment)) 

92 del self._entries[path, segment] 

93 

94 def put(self, nut): 

95 ''' 

96 Insert a new/updated item into cache. 

97 

98 :param nut: 

99 Content item with attached data object. 

100 :type nut: 

101 :py:class:`~pyrocko.squirrel.model.Nut` 

102 ''' 

103 path, segment, element, mtime = nut.key 

104 self._prune_outdated(path, segment, nut.file_mtime) 

105 with CACHE_LOCK: 

106 if (path, segment) not in self._entries: 

107 self._entries[path, segment] = nut.file_mtime, {}, {} 

108 

109 self._entries[path, segment][1][element] = nut 

110 

111 def get(self, nut, accessor='default', model='squirrel'): 

112 ''' 

113 Get a content item and track its access. 

114 

115 :param nut: 

116 Content item. 

117 :type nut: 

118 :py:class:`~pyrocko.squirrel.model.Nut` 

119 

120 :param accessor: 

121 Name of accessing consumer. Giving a new name initializes a new 

122 accessor. 

123 :type accessor: 

124 str 

125 

126 :returns: 

127 Content data object 

128 ''' 

129 path, segment, element, mtime = nut.key 

130 entry = self._entries[path, segment] 

131 

132 with CACHE_LOCK: 

133 if accessor not in self._accessor_ticks: 

134 self._accessor_ticks[accessor] = 0 

135 

136 entry[2][accessor] = self._accessor_ticks[accessor] 

137 el = entry[1][element] 

138 

139 if model == 'squirrel': 

140 return el.content 

141 elif model.endswith('+'): 

142 return el.content, el.raw_content[model[:-1]] 

143 else: 

144 return el.raw_content[model] 

145 

146 def has(self, nut): 

147 ''' 

148 Check if item's content is currently in cache. 

149 

150 :param nut: 

151 Content item. 

152 :type nut: 

153 :py:class:`~pyrocko.squirrel.model.Nut` 

154 

155 :returns: 

156 :py:class:`bool` 

157 

158 ''' 

159 path, segment, element, nut_mtime = nut.key 

160 

161 with CACHE_LOCK: 

162 try: 

163 entry = self._entries[path, segment] 

164 cache_mtime = entry[0] 

165 entry[1][element] 

166 except KeyError: 

167 return False 

168 

169 return cache_mtime == nut_mtime 

170 

171 def advance_accessor(self, accessor='default'): 

172 ''' 

173 Increment batch counter of an accessor. 

174 

175 :param accessor: 

176 Name of accessing consumer. Giving a new name initializes a new 

177 accessor. 

178 :type accessor: 

179 str 

180 ''' 

181 if accessor not in self._accessor_ticks: 

182 self._accessor_ticks[accessor] = 0 

183 

184 ta = self._accessor_ticks[accessor] 

185 

186 delete = [] 

187 with CACHE_LOCK: 

188 for path_segment, entry in self._entries.items(): 

189 t = entry[2].get(accessor, ta) 

190 if t < ta: 

191 del entry[2][accessor] 

192 if not entry[2]: 

193 delete.append(path_segment) 

194 

195 for path_segment in delete: 

196 logger.debug('Forgetting (advance): %s %s' % path_segment) 

197 del self._entries[path_segment] 

198 

199 self._accessor_ticks[accessor] += 1 

200 

201 def clear_accessor(self, accessor='default'): 

202 ''' 

203 Clear all references held by an accessor. 

204 

205 :param accessor: 

206 Name of accessing consumer. 

207 :type accessor: 

208 str 

209 ''' 

210 delete = [] 

211 with CACHE_LOCK: 

212 for path_segment, entry in self._entries.items(): 

213 entry[2].pop(accessor, None) 

214 if not entry[2]: 

215 delete.append(path_segment) 

216 

217 for path_segment in delete: 

218 logger.debug('Forgetting (clear): %s %s' % path_segment) 

219 del self._entries[path_segment] 

220 

221 try: 

222 del self._accessor_ticks[accessor] 

223 except KeyError: 

224 pass 

225 

226 def clear(self): 

227 ''' 

228 Empty the cache. 

229 ''' 

230 for accessor in list(self._accessor_ticks.keys()): 

231 self.clear_accessor(accessor) 

232 

233 self._entries = {} 

234 self._accessor_ticks = {} 

235 

236 def get_stats(self): 

237 ''' 

238 Get information about cache state. 

239 

240 :returns: :py:class:`ContentCacheStats` object. 

241 ''' 

242 return ContentCacheStats( 

243 nentries=len(self._entries), 

244 naccessors=len(self._accessor_ticks))