Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/cache.py: 89%
89 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-05 16:26 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-05 16:26 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Squirrel memory cacheing.
8'''
10import logging
11from threading import Lock
13from pyrocko.guts import Object, Int
15logger = logging.getLogger('psq.cache')
17CACHE_LOCK = Lock()
20class ContentCacheStats(Object):
21 '''
22 Information about cache state.
23 '''
24 nentries = Int.T(
25 help='Number of items in the cache.')
26 naccessors = Int.T(
27 help='Number of accessors currently holding references to cache '
28 'items.')
31class ContentCache(object):
33 '''
34 Simple memory cache for file contents.
36 Squirrel manages data in small entities: nuts. Only the meta-data for each
37 nut is stored in the database, content data has to be read from file. This
38 cache helps to speed up data access for typical seismological access
39 patterns.
41 Content data for stations, channels and instrument responses is small in
42 size but slow to parse so it makes sense to cache these indefinitely once
43 read. Also, it is usually inefficient to read a single station from a
44 station file, so it is better to cache the contents of the complete file
45 even if only one station is requested (it is likely that other stations
46 from that file will be used anyway).
48 Content data for waveforms is large in size and we usually want to free the
49 memory allocated for them after processing. Typical processing schemes
50 require batches of waveforms to be available together (e.g.
51 cross-correlations between pairs of stations) and there may be overlap
52 between successive batches (e.g. sliding window processing schemes).
54 This cache implementation uses named accessors and batch window counting
55 for flexible content caching. Loaded contents are held in memory as long as
56 an accessor is holding a reference to it. For each accessor a batch counter
57 is maintained, which starts at 0 and is incremented using calls to
58 :py:meth:`advance_accessor`. Content accesses are tracked with calls to
59 :py:meth:`get`, which sets a "last access" attribute on the cached item to
60 the current value of the batch counter (each accessor has its own last
61 access attribute on the items it uses). References to items which have
62 not been accessed during the latest batch by the accessor in question are
63 released during :py:meth:`advance_accessor`. :py:meth:`put` inserts new
64 items into the cache. :py:meth:`has` checks if there already is content
65 cached for a given item. To remove all references held by a given accessor,
66 :py:meth:`clear_accessor` can be called.
68 **Example usage**
70 For meta-data content to be cached indefinitely, no calls to
71 :py:meth:`advance_accessor` or :py:meth:`clear_accessor` should be made.
72 For waveform content one would call :py:meth:`advance_accessor` after each
73 move of a sliding window or :py:meth:`clear_accessor` after each processed
74 event. For a process requiring data from two independent positions of
75 extraction, e.g. for cross-correlations between all possible pairs of a set
76 of events, two separate accessor names could be used.
77 '''
79 def __init__(self):
80 self._entries = {}
81 self._accessor_ticks = {}
83 def _prune_outdated(self, path, segment, nut_mtime):
84 with CACHE_LOCK:
85 try:
86 cache_mtime = self._entries[path, segment][0]
87 except KeyError:
88 return
90 if cache_mtime != nut_mtime:
91 logger.debug('Forgetting (outdated): %s %s' % (path, segment))
92 del self._entries[path, segment]
94 def put(self, nut):
95 '''
96 Insert a new/updated item into cache.
98 :param nut:
99 Content item with attached data object.
100 :type nut:
101 :py:class:`~pyrocko.squirrel.model.Nut`
102 '''
103 path, segment, element, mtime = nut.key
104 self._prune_outdated(path, segment, nut.file_mtime)
105 with CACHE_LOCK:
106 if (path, segment) not in self._entries:
107 self._entries[path, segment] = nut.file_mtime, {}, {}
109 self._entries[path, segment][1][element] = nut
111 def get(self, nut, accessor='default', model='squirrel'):
112 '''
113 Get a content item and track its access.
115 :param nut:
116 Content item.
117 :type nut:
118 :py:class:`~pyrocko.squirrel.model.Nut`
120 :param accessor:
121 Name of accessing consumer. Giving a new name initializes a new
122 accessor.
123 :type accessor:
124 str
126 :returns:
127 Content data object
128 '''
129 path, segment, element, mtime = nut.key
130 entry = self._entries[path, segment]
132 with CACHE_LOCK:
133 if accessor not in self._accessor_ticks:
134 self._accessor_ticks[accessor] = 0
136 entry[2][accessor] = self._accessor_ticks[accessor]
137 el = entry[1][element]
139 if model == 'squirrel':
140 return el.content
141 elif model.endswith('+'):
142 return el.content, el.raw_content[model[:-1]]
143 else:
144 return el.raw_content[model]
146 def has(self, nut):
147 '''
148 Check if item's content is currently in cache.
150 :param nut:
151 Content item.
152 :type nut:
153 :py:class:`~pyrocko.squirrel.model.Nut`
155 :returns:
156 :py:class:`bool`
158 '''
159 path, segment, element, nut_mtime = nut.key
161 with CACHE_LOCK:
162 try:
163 entry = self._entries[path, segment]
164 cache_mtime = entry[0]
165 entry[1][element]
166 except KeyError:
167 return False
169 return cache_mtime == nut_mtime
171 def advance_accessor(self, accessor='default'):
172 '''
173 Increment batch counter of an accessor.
175 :param accessor:
176 Name of accessing consumer. Giving a new name initializes a new
177 accessor.
178 :type accessor:
179 str
180 '''
181 if accessor not in self._accessor_ticks:
182 self._accessor_ticks[accessor] = 0
184 ta = self._accessor_ticks[accessor]
186 delete = []
187 with CACHE_LOCK:
188 for path_segment, entry in self._entries.items():
189 t = entry[2].get(accessor, ta)
190 if t < ta:
191 del entry[2][accessor]
192 if not entry[2]:
193 delete.append(path_segment)
195 for path_segment in delete:
196 logger.debug('Forgetting (advance): %s %s' % path_segment)
197 del self._entries[path_segment]
199 self._accessor_ticks[accessor] += 1
201 def clear_accessor(self, accessor='default'):
202 '''
203 Clear all references held by an accessor.
205 :param accessor:
206 Name of accessing consumer.
207 :type accessor:
208 str
209 '''
210 delete = []
211 with CACHE_LOCK:
212 for path_segment, entry in self._entries.items():
213 entry[2].pop(accessor, None)
214 if not entry[2]:
215 delete.append(path_segment)
217 for path_segment in delete:
218 logger.debug('Forgetting (clear): %s %s' % path_segment)
219 del self._entries[path_segment]
221 try:
222 del self._accessor_ticks[accessor]
223 except KeyError:
224 pass
226 def clear(self):
227 '''
228 Empty the cache.
229 '''
230 for accessor in list(self._accessor_ticks.keys()):
231 self.clear_accessor(accessor)
233 self._entries = {}
234 self._accessor_ticks = {}
236 def get_stats(self):
237 '''
238 Get information about cache state.
240 :returns: :py:class:`ContentCacheStats` object.
241 '''
242 return ContentCacheStats(
243 nentries=len(self._entries),
244 naccessors=len(self._accessor_ticks))