blob: 35276a0710144a8939ac724577520f8acfdbe6ec [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2020 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Classes to manage cached values.
5
6Monorail makes full use of the RAM of GAE frontends to reduce latency
7and load on the database.
8
9Even though these caches do invalidation, there are rare race conditions
10that can cause a somewhat stale object to be retrieved from memcache and
11then put into a RAM cache and used by a given GAE instance for some time.
12So, we only use these caches for operations that can tolerate somewhat
13stale data. For example, displaying issues in a list or displaying brief
14info about related issues. We never use the cache to load objects as
15part of a read-modify-save sequence because that could cause stored data
16to revert to a previous state.
17"""
18from __future__ import print_function
19from __future__ import division
20from __future__ import absolute_import
21
22import logging
Copybara854996b2021-09-07 19:36:02 +000023
24from protorpc import protobuf
25
26from google.appengine.api import memcache
27
28import settings
29from framework import framework_constants
Copybara854996b2021-09-07 19:36:02 +000030
31
32DEFAULT_MAX_SIZE = 10000
33
34
35class RamCache(object):
36 """An in-RAM cache with distributed invalidation."""
37
38 def __init__(self, cache_manager, kind, max_size=None):
39 self.cache_manager = cache_manager
40 self.kind = kind
41 self.cache = {}
42 self.max_size = max_size or DEFAULT_MAX_SIZE
43 cache_manager.RegisterCache(self, kind)
44
45 def CacheItem(self, key, item):
46 """Store item at key in this cache, discarding a random item if needed."""
47 if len(self.cache) >= self.max_size:
48 self.cache.popitem()
49
50 self.cache[key] = item
51
52 def CacheAll(self, new_item_dict):
53 """Cache all items in the given dict, dropping old items if needed."""
54 if len(new_item_dict) >= self.max_size:
55 logging.warn('Dumping the entire cache! %s', self.kind)
56 self.cache = {}
57 else:
58 while len(self.cache) + len(new_item_dict) > self.max_size:
59 self.cache.popitem()
60
61 self.cache.update(new_item_dict)
62
63 def GetItem(self, key):
64 """Return the cached item if present, otherwise None."""
65 return self.cache.get(key)
66
67 def HasItem(self, key):
68 """Return True if there is a value cached at the given key."""
69 return key in self.cache
70
71 def GetAll(self, keys):
72 """Look up the given keys.
73
74 Args:
75 keys: a list of cache keys to look up.
76
77 Returns:
78 A pair: (hits_dict, misses_list) where hits_dict is a dictionary of
79 all the given keys and the values that were found in the cache, and
80 misses_list is a list of given keys that were not in the cache.
81 """
82 hits, misses = {}, []
83 for key in keys:
84 try:
85 hits[key] = self.cache[key]
86 except KeyError:
87 misses.append(key)
88
89 return hits, misses
90
91 def LocalInvalidate(self, key):
92 """Drop the given key from this cache, without distributed notification."""
93 if key in self.cache:
94 logging.info('Locally invalidating %r in kind=%r', key, self.kind)
95 self.cache.pop(key, None)
96
97 def Invalidate(self, cnxn, key):
98 """Drop key locally, and append it to the Invalidate DB table."""
99 self.InvalidateKeys(cnxn, [key])
100
101 def InvalidateKeys(self, cnxn, keys):
102 """Drop keys locally, and append them to the Invalidate DB table."""
103 for key in keys:
104 self.LocalInvalidate(key)
105 if self.cache_manager:
106 self.cache_manager.StoreInvalidateRows(cnxn, self.kind, keys)
107
108 def LocalInvalidateAll(self):
109 """Invalidate all keys locally: just start over with an empty dict."""
110 logging.info('Locally invalidating all in kind=%r', self.kind)
111 self.cache = {}
112
113 def InvalidateAll(self, cnxn):
114 """Invalidate all keys in this cache."""
115 self.LocalInvalidateAll()
116 if self.cache_manager:
117 self.cache_manager.StoreInvalidateAll(cnxn, self.kind)
118
119
120class ShardedRamCache(RamCache):
121 """Specialized version of RamCache that stores values in parts.
122
123 Instead of the cache keys being simple integers, they are pairs, e.g.,
124 (project_id, shard_id). Invalidation will invalidate all shards for
125 a given main key, e.g, invalidating project_id 16 will drop keys
126 (16, 0), (16, 1), (16, 2), ... (16, 9).
127 """
128
129 def __init__(self, cache_manager, kind, max_size=None, num_shards=10):
130 super(ShardedRamCache, self).__init__(
131 cache_manager, kind, max_size=max_size)
132 self.num_shards = num_shards
133
134 def LocalInvalidate(self, key):
135 """Use the specified value to drop entries from the local cache."""
136 logging.info('About to invalidate shared RAM keys %r',
137 [(key, shard_id) for shard_id in range(self.num_shards)
138 if (key, shard_id) in self.cache])
139 for shard_id in range(self.num_shards):
140 self.cache.pop((key, shard_id), None)
141
142
143class ValueCentricRamCache(RamCache):
144 """Specialized version of RamCache that stores values in InvalidateTable.
145
146 This is useful for caches that have non integer keys.
147 """
148
149 def LocalInvalidate(self, value):
150 """Use the specified value to drop entries from the local cache."""
151 keys_to_drop = []
152 # Loop through and collect all keys with the specified value.
153 for k, v in self.cache.items():
154 if v == value:
155 keys_to_drop.append(k)
156 for k in keys_to_drop:
157 self.cache.pop(k, None)
158
159 def InvalidateKeys(self, cnxn, keys):
160 """Drop keys locally, and append their values to the Invalidate DB table."""
161 # Find values to invalidate.
162 values = [self.cache[key] for key in keys if self.cache.has_key(key)]
163 if len(values) == len(keys):
164 for value in values:
165 self.LocalInvalidate(value)
166 if self.cache_manager:
167 self.cache_manager.StoreInvalidateRows(cnxn, self.kind, values)
168 else:
169 # If a value is not found in the cache then invalidate the whole cache.
170 # This is done to ensure that we are not in an inconsistent state or in a
171 # race condition.
172 self.InvalidateAll(cnxn)
173
174
175class AbstractTwoLevelCache(object):
176 """A class to manage both RAM and secondary-caching layer to retrieve objects.
177
178 Subclasses must implement the FetchItems() method to get objects from
179 the database when both caches miss.
180 """
181
182 # When loading a huge number of issues from the database, do it in chunks
183 # so as to avoid timeouts.
184 _FETCH_BATCH_SIZE = 10000
185
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200186 def __init__(self, cache_manager, kind, prefix, pb_class, max_size=None):
Copybara854996b2021-09-07 19:36:02 +0000187
188 self.cache = self._MakeCache(cache_manager, kind, max_size=max_size)
189 self.prefix = prefix
190 self.pb_class = pb_class
191
Copybara854996b2021-09-07 19:36:02 +0000192 def _MakeCache(self, cache_manager, kind, max_size=None):
193 """Make the RAM cache and register it with the cache_manager."""
194 return RamCache(cache_manager, kind, max_size=max_size)
195
196 def CacheItem(self, key, value):
197 """Add the given key-value pair to RAM and L2 cache."""
198 self.cache.CacheItem(key, value)
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200199 self._WriteToMemcache({key: value})
Copybara854996b2021-09-07 19:36:02 +0000200
201 def HasItem(self, key):
202 """Return True if the given key is in the RAM cache."""
203 return self.cache.HasItem(key)
204
205 def GetAnyOnHandItem(self, keys, start=None, end=None):
206 """Try to find one of the specified items in RAM."""
207 if start is None:
208 start = 0
209 if end is None:
210 end = len(keys)
211 for i in range(start, end):
212 key = keys[i]
213 if self.cache.HasItem(key):
214 return self.cache.GetItem(key)
215
216 # Note: We could check L2 here too, but the round-trips to L2
217 # are kind of slow. And, getting too many hits from L2 actually
218 # fills our RAM cache too quickly and could lead to thrashing.
219
220 return None
221
222 def GetAll(self, cnxn, keys, use_cache=True, **kwargs):
223 """Get values for the given keys from RAM, the L2 cache, or the DB.
224
225 Args:
226 cnxn: connection to the database.
227 keys: list of integer keys to look up.
228 use_cache: set to False to always hit the database.
229 **kwargs: any additional keywords are passed to FetchItems().
230
231 Returns:
232 A pair: hits, misses. Where hits is {key: value} and misses is
233 a list of any keys that were not found anywhere.
234 """
235 if use_cache:
236 result_dict, missed_keys = self.cache.GetAll(keys)
237 else:
238 result_dict, missed_keys = {}, list(keys)
239
240 if missed_keys:
241 if use_cache:
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200242 cache_hits, missed_keys = self._ReadFromMemcache(missed_keys)
Copybara854996b2021-09-07 19:36:02 +0000243 result_dict.update(cache_hits)
244 self.cache.CacheAll(cache_hits)
245
246 while missed_keys:
247 missed_batch = missed_keys[:self._FETCH_BATCH_SIZE]
248 missed_keys = missed_keys[self._FETCH_BATCH_SIZE:]
249 retrieved_dict = self.FetchItems(cnxn, missed_batch, **kwargs)
250 result_dict.update(retrieved_dict)
251 if use_cache:
252 self.cache.CacheAll(retrieved_dict)
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200253 self._WriteToMemcache(retrieved_dict)
Copybara854996b2021-09-07 19:36:02 +0000254
255 still_missing_keys = [key for key in keys if key not in result_dict]
256 return result_dict, still_missing_keys
257
258 def LocalInvalidateAll(self):
259 self.cache.LocalInvalidateAll()
260
261 def LocalInvalidate(self, key):
262 self.cache.LocalInvalidate(key)
263
264 def InvalidateKeys(self, cnxn, keys):
265 """Drop the given keys from both RAM and L2 cache."""
266 self.cache.InvalidateKeys(cnxn, keys)
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200267 self._DeleteFromMemcache(keys)
Copybara854996b2021-09-07 19:36:02 +0000268
269 def InvalidateAllKeys(self, cnxn, keys):
270 """Drop the given keys from L2 cache and invalidate all keys in RAM.
271
272 Useful for avoiding inserting many rows into the Invalidate table when
273 invalidating a large group of keys all at once. Only use when necessary.
274 """
275 self.cache.InvalidateAll(cnxn)
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200276 self._DeleteFromMemcache(keys)
Copybara854996b2021-09-07 19:36:02 +0000277
278 def GetAllAlreadyInRam(self, keys):
279 """Look only in RAM to return {key: values}, missed_keys."""
280 result_dict, missed_keys = self.cache.GetAll(keys)
281 return result_dict, missed_keys
282
283 def InvalidateAllRamEntries(self, cnxn):
284 """Drop all RAM cache entries. It will refill as needed from L2 cache."""
285 self.cache.InvalidateAll(cnxn)
286
287 def FetchItems(self, cnxn, keys, **kwargs):
288 """On RAM and L2 cache miss, hit the database."""
289 raise NotImplementedError()
290
Copybara854996b2021-09-07 19:36:02 +0000291 def _ReadFromMemcache(self, keys):
292 # type: (Sequence[int]) -> Mapping[str, Any], Sequence[int]
293 """Read the given keys from memcache, return {key: value}, missing_keys."""
294 cache_hits = {}
295 cached_dict = memcache.get_multi(
296 [self._KeyToStr(key) for key in keys],
297 key_prefix=self.prefix,
298 namespace=settings.memcache_namespace)
299
300 for key_str, serialized_value in cached_dict.items():
301 value = self._StrToValue(serialized_value)
302 key = self._StrToKey(key_str)
303 cache_hits[key] = value
304 self.cache.CacheItem(key, value)
305
306 still_missing_keys = [key for key in keys if key not in cache_hits]
307 return cache_hits, still_missing_keys
308
309 def _WriteToMemcache(self, retrieved_dict):
310 # type: (Mapping[int, int]) -> None
311 """Write entries for each key-value pair to memcache. Encode PBs."""
312 strs_to_cache = {
313 self._KeyToStr(key): self._ValueToStr(value)
314 for key, value in retrieved_dict.items()}
315
316 try:
317 memcache.add_multi(
318 strs_to_cache,
319 key_prefix=self.prefix,
320 time=framework_constants.CACHE_EXPIRATION,
321 namespace=settings.memcache_namespace)
322 except ValueError as identifier:
323 # If memcache does not accept the values, ensure that no stale
324 # values are left, then bail out.
325 logging.error('Got memcache error: %r', identifier)
326 self._DeleteFromMemcache(list(strs_to_cache.keys()))
327 return
328
329 def _DeleteFromMemcache(self, keys):
330 # type: (Sequence[str]) -> None
331 """Delete key-values from memcache. """
332 memcache.delete_multi(
333 [self._KeyToStr(key) for key in keys],
334 seconds=5,
335 key_prefix=self.prefix,
336 namespace=settings.memcache_namespace)
337
Copybara854996b2021-09-07 19:36:02 +0000338 def _KeyToStr(self, key):
339 # type: (int) -> str
340 """Convert our int IDs to strings for use as memcache keys."""
341 return str(key)
342
343 def _StrToKey(self, key_str):
344 # type: (str) -> int
345 """Convert memcache keys back to the ints that we use as IDs."""
346 return int(key_str)
347
348 def _ValueToStr(self, value):
349 # type: (Any) -> str
350 """Serialize an application object so that it can be stored in L2 cache."""
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200351 if not self.pb_class:
352 return value
353 elif self.pb_class == int:
354 return str(value)
Copybara854996b2021-09-07 19:36:02 +0000355 else:
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200356 return protobuf.encode_message(value)
Copybara854996b2021-09-07 19:36:02 +0000357
358 def _StrToValue(self, serialized_value):
359 # type: (str) -> Any
360 """Deserialize L2 cache string into an application object."""
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200361 if not self.pb_class:
362 return serialized_value
363 elif self.pb_class == int:
364 return int(serialized_value)
Copybara854996b2021-09-07 19:36:02 +0000365 else:
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200366 return protobuf.decode_message(self.pb_class, serialized_value)