blob: 8869d61572b163328727cd401ce3d915ff82e64a [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2020 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Classes to manage cached values.
5
6Monorail makes full use of the RAM of GAE frontends to reduce latency
7and load on the database.
8
9Even though these caches do invalidation, there are rare race conditions
10that can cause a somewhat stale object to be retrieved from memcache and
11then put into a RAM cache and used by a given GAE instance for some time.
12So, we only use these caches for operations that can tolerate somewhat
13stale data. For example, displaying issues in a list or displaying brief
14info about related issues. We never use the cache to load objects as
15part of a read-modify-save sequence because that could cause stored data
16to revert to a previous state.
17"""
18from __future__ import print_function
19from __future__ import division
20from __future__ import absolute_import
21
22import logging
Copybara854996b2021-09-07 19:36:02 +000023
24from protorpc import protobuf
25
26from google.appengine.api import memcache
27
28import settings
29from framework import framework_constants
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +020030from framework import logger
Copybara854996b2021-09-07 19:36:02 +000031
32
33DEFAULT_MAX_SIZE = 10000
34
35
36class RamCache(object):
37 """An in-RAM cache with distributed invalidation."""
38
39 def __init__(self, cache_manager, kind, max_size=None):
40 self.cache_manager = cache_manager
41 self.kind = kind
42 self.cache = {}
43 self.max_size = max_size or DEFAULT_MAX_SIZE
44 cache_manager.RegisterCache(self, kind)
45
46 def CacheItem(self, key, item):
47 """Store item at key in this cache, discarding a random item if needed."""
48 if len(self.cache) >= self.max_size:
49 self.cache.popitem()
50
51 self.cache[key] = item
52
53 def CacheAll(self, new_item_dict):
54 """Cache all items in the given dict, dropping old items if needed."""
55 if len(new_item_dict) >= self.max_size:
56 logging.warn('Dumping the entire cache! %s', self.kind)
57 self.cache = {}
58 else:
59 while len(self.cache) + len(new_item_dict) > self.max_size:
60 self.cache.popitem()
61
62 self.cache.update(new_item_dict)
63
64 def GetItem(self, key):
65 """Return the cached item if present, otherwise None."""
66 return self.cache.get(key)
67
68 def HasItem(self, key):
69 """Return True if there is a value cached at the given key."""
70 return key in self.cache
71
72 def GetAll(self, keys):
73 """Look up the given keys.
74
75 Args:
76 keys: a list of cache keys to look up.
77
78 Returns:
79 A pair: (hits_dict, misses_list) where hits_dict is a dictionary of
80 all the given keys and the values that were found in the cache, and
81 misses_list is a list of given keys that were not in the cache.
82 """
83 hits, misses = {}, []
84 for key in keys:
85 try:
86 hits[key] = self.cache[key]
87 except KeyError:
88 misses.append(key)
89
90 return hits, misses
91
92 def LocalInvalidate(self, key):
93 """Drop the given key from this cache, without distributed notification."""
94 if key in self.cache:
95 logging.info('Locally invalidating %r in kind=%r', key, self.kind)
96 self.cache.pop(key, None)
97
98 def Invalidate(self, cnxn, key):
99 """Drop key locally, and append it to the Invalidate DB table."""
100 self.InvalidateKeys(cnxn, [key])
101
102 def InvalidateKeys(self, cnxn, keys):
103 """Drop keys locally, and append them to the Invalidate DB table."""
104 for key in keys:
105 self.LocalInvalidate(key)
106 if self.cache_manager:
107 self.cache_manager.StoreInvalidateRows(cnxn, self.kind, keys)
108
109 def LocalInvalidateAll(self):
110 """Invalidate all keys locally: just start over with an empty dict."""
111 logging.info('Locally invalidating all in kind=%r', self.kind)
112 self.cache = {}
113
114 def InvalidateAll(self, cnxn):
115 """Invalidate all keys in this cache."""
116 self.LocalInvalidateAll()
117 if self.cache_manager:
118 self.cache_manager.StoreInvalidateAll(cnxn, self.kind)
119
120
121class ShardedRamCache(RamCache):
122 """Specialized version of RamCache that stores values in parts.
123
124 Instead of the cache keys being simple integers, they are pairs, e.g.,
125 (project_id, shard_id). Invalidation will invalidate all shards for
126 a given main key, e.g, invalidating project_id 16 will drop keys
127 (16, 0), (16, 1), (16, 2), ... (16, 9).
128 """
129
130 def __init__(self, cache_manager, kind, max_size=None, num_shards=10):
131 super(ShardedRamCache, self).__init__(
132 cache_manager, kind, max_size=max_size)
133 self.num_shards = num_shards
134
135 def LocalInvalidate(self, key):
136 """Use the specified value to drop entries from the local cache."""
137 logging.info('About to invalidate shared RAM keys %r',
138 [(key, shard_id) for shard_id in range(self.num_shards)
139 if (key, shard_id) in self.cache])
140 for shard_id in range(self.num_shards):
141 self.cache.pop((key, shard_id), None)
142
143
144class ValueCentricRamCache(RamCache):
145 """Specialized version of RamCache that stores values in InvalidateTable.
146
147 This is useful for caches that have non integer keys.
148 """
149
150 def LocalInvalidate(self, value):
151 """Use the specified value to drop entries from the local cache."""
152 keys_to_drop = []
153 # Loop through and collect all keys with the specified value.
154 for k, v in self.cache.items():
155 if v == value:
156 keys_to_drop.append(k)
157 for k in keys_to_drop:
158 self.cache.pop(k, None)
159
160 def InvalidateKeys(self, cnxn, keys):
161 """Drop keys locally, and append their values to the Invalidate DB table."""
162 # Find values to invalidate.
163 values = [self.cache[key] for key in keys if self.cache.has_key(key)]
164 if len(values) == len(keys):
165 for value in values:
166 self.LocalInvalidate(value)
167 if self.cache_manager:
168 self.cache_manager.StoreInvalidateRows(cnxn, self.kind, values)
169 else:
170 # If a value is not found in the cache then invalidate the whole cache.
171 # This is done to ensure that we are not in an inconsistent state or in a
172 # race condition.
173 self.InvalidateAll(cnxn)
174
175
176class AbstractTwoLevelCache(object):
177 """A class to manage both RAM and secondary-caching layer to retrieve objects.
178
179 Subclasses must implement the FetchItems() method to get objects from
180 the database when both caches miss.
181 """
182
183 # When loading a huge number of issues from the database, do it in chunks
184 # so as to avoid timeouts.
185 _FETCH_BATCH_SIZE = 10000
186
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200187 def __init__(self, cache_manager, kind, prefix, pb_class, max_size=None):
Copybara854996b2021-09-07 19:36:02 +0000188
189 self.cache = self._MakeCache(cache_manager, kind, max_size=max_size)
190 self.prefix = prefix
191 self.pb_class = pb_class
192
Copybara854996b2021-09-07 19:36:02 +0000193 def _MakeCache(self, cache_manager, kind, max_size=None):
194 """Make the RAM cache and register it with the cache_manager."""
195 return RamCache(cache_manager, kind, max_size=max_size)
196
197 def CacheItem(self, key, value):
198 """Add the given key-value pair to RAM and L2 cache."""
199 self.cache.CacheItem(key, value)
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200200 self._WriteToMemcache({key: value})
Copybara854996b2021-09-07 19:36:02 +0000201
202 def HasItem(self, key):
203 """Return True if the given key is in the RAM cache."""
204 return self.cache.HasItem(key)
205
206 def GetAnyOnHandItem(self, keys, start=None, end=None):
207 """Try to find one of the specified items in RAM."""
208 if start is None:
209 start = 0
210 if end is None:
211 end = len(keys)
212 for i in range(start, end):
213 key = keys[i]
214 if self.cache.HasItem(key):
215 return self.cache.GetItem(key)
216
217 # Note: We could check L2 here too, but the round-trips to L2
218 # are kind of slow. And, getting too many hits from L2 actually
219 # fills our RAM cache too quickly and could lead to thrashing.
220
221 return None
222
223 def GetAll(self, cnxn, keys, use_cache=True, **kwargs):
224 """Get values for the given keys from RAM, the L2 cache, or the DB.
225
226 Args:
227 cnxn: connection to the database.
228 keys: list of integer keys to look up.
229 use_cache: set to False to always hit the database.
230 **kwargs: any additional keywords are passed to FetchItems().
231
232 Returns:
233 A pair: hits, misses. Where hits is {key: value} and misses is
234 a list of any keys that were not found anywhere.
235 """
236 if use_cache:
237 result_dict, missed_keys = self.cache.GetAll(keys)
238 else:
239 result_dict, missed_keys = {}, list(keys)
240
241 if missed_keys:
242 if use_cache:
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200243 cache_hits, missed_keys = self._ReadFromMemcache(missed_keys)
Copybara854996b2021-09-07 19:36:02 +0000244 result_dict.update(cache_hits)
245 self.cache.CacheAll(cache_hits)
246
247 while missed_keys:
248 missed_batch = missed_keys[:self._FETCH_BATCH_SIZE]
249 missed_keys = missed_keys[self._FETCH_BATCH_SIZE:]
250 retrieved_dict = self.FetchItems(cnxn, missed_batch, **kwargs)
251 result_dict.update(retrieved_dict)
252 if use_cache:
253 self.cache.CacheAll(retrieved_dict)
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200254 self._WriteToMemcache(retrieved_dict)
Copybara854996b2021-09-07 19:36:02 +0000255
256 still_missing_keys = [key for key in keys if key not in result_dict]
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +0200257 if still_missing_keys:
258 # The keys were not found in the caches or the DB.
259 logger.log(
260 {
261 'log_type': 'database/missing_keys',
262 'kind': self.cache.kind,
263 'prefix': self.prefix,
264 'count': len(still_missing_keys),
265 'keys': str(still_missing_keys)
266 })
Copybara854996b2021-09-07 19:36:02 +0000267 return result_dict, still_missing_keys
268
269 def LocalInvalidateAll(self):
270 self.cache.LocalInvalidateAll()
271
272 def LocalInvalidate(self, key):
273 self.cache.LocalInvalidate(key)
274
275 def InvalidateKeys(self, cnxn, keys):
276 """Drop the given keys from both RAM and L2 cache."""
277 self.cache.InvalidateKeys(cnxn, keys)
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200278 self._DeleteFromMemcache(keys)
Copybara854996b2021-09-07 19:36:02 +0000279
280 def InvalidateAllKeys(self, cnxn, keys):
281 """Drop the given keys from L2 cache and invalidate all keys in RAM.
282
283 Useful for avoiding inserting many rows into the Invalidate table when
284 invalidating a large group of keys all at once. Only use when necessary.
285 """
286 self.cache.InvalidateAll(cnxn)
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200287 self._DeleteFromMemcache(keys)
Copybara854996b2021-09-07 19:36:02 +0000288
289 def GetAllAlreadyInRam(self, keys):
290 """Look only in RAM to return {key: values}, missed_keys."""
291 result_dict, missed_keys = self.cache.GetAll(keys)
292 return result_dict, missed_keys
293
294 def InvalidateAllRamEntries(self, cnxn):
295 """Drop all RAM cache entries. It will refill as needed from L2 cache."""
296 self.cache.InvalidateAll(cnxn)
297
298 def FetchItems(self, cnxn, keys, **kwargs):
299 """On RAM and L2 cache miss, hit the database."""
300 raise NotImplementedError()
301
Copybara854996b2021-09-07 19:36:02 +0000302 def _ReadFromMemcache(self, keys):
303 # type: (Sequence[int]) -> Mapping[str, Any], Sequence[int]
304 """Read the given keys from memcache, return {key: value}, missing_keys."""
305 cache_hits = {}
306 cached_dict = memcache.get_multi(
307 [self._KeyToStr(key) for key in keys],
308 key_prefix=self.prefix,
309 namespace=settings.memcache_namespace)
310
311 for key_str, serialized_value in cached_dict.items():
312 value = self._StrToValue(serialized_value)
313 key = self._StrToKey(key_str)
314 cache_hits[key] = value
315 self.cache.CacheItem(key, value)
316
317 still_missing_keys = [key for key in keys if key not in cache_hits]
318 return cache_hits, still_missing_keys
319
320 def _WriteToMemcache(self, retrieved_dict):
321 # type: (Mapping[int, int]) -> None
322 """Write entries for each key-value pair to memcache. Encode PBs."""
323 strs_to_cache = {
324 self._KeyToStr(key): self._ValueToStr(value)
325 for key, value in retrieved_dict.items()}
326
327 try:
328 memcache.add_multi(
329 strs_to_cache,
330 key_prefix=self.prefix,
331 time=framework_constants.CACHE_EXPIRATION,
332 namespace=settings.memcache_namespace)
333 except ValueError as identifier:
334 # If memcache does not accept the values, ensure that no stale
335 # values are left, then bail out.
336 logging.error('Got memcache error: %r', identifier)
337 self._DeleteFromMemcache(list(strs_to_cache.keys()))
338 return
339
340 def _DeleteFromMemcache(self, keys):
341 # type: (Sequence[str]) -> None
342 """Delete key-values from memcache. """
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +0200343 logger.log(
344 {
345 'log_type': 'cache/memcache/delete',
346 'kind': self.cache.kind,
347 'prefix': self.prefix,
348 'count': len(keys),
349 'keys': str(keys)
350 })
Copybara854996b2021-09-07 19:36:02 +0000351 memcache.delete_multi(
352 [self._KeyToStr(key) for key in keys],
353 seconds=5,
354 key_prefix=self.prefix,
355 namespace=settings.memcache_namespace)
356
Copybara854996b2021-09-07 19:36:02 +0000357 def _KeyToStr(self, key):
358 # type: (int) -> str
359 """Convert our int IDs to strings for use as memcache keys."""
360 return str(key)
361
362 def _StrToKey(self, key_str):
363 # type: (str) -> int
364 """Convert memcache keys back to the ints that we use as IDs."""
365 return int(key_str)
366
367 def _ValueToStr(self, value):
368 # type: (Any) -> str
369 """Serialize an application object so that it can be stored in L2 cache."""
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200370 if not self.pb_class:
371 return value
372 elif self.pb_class == int:
373 return str(value)
Copybara854996b2021-09-07 19:36:02 +0000374 else:
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200375 return protobuf.encode_message(value)
Copybara854996b2021-09-07 19:36:02 +0000376
377 def _StrToValue(self, serialized_value):
378 # type: (str) -> Any
379 """Deserialize L2 cache string into an application object."""
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200380 if not self.pb_class:
381 return serialized_value
382 elif self.pb_class == int:
383 return int(serialized_value)
Copybara854996b2021-09-07 19:36:02 +0000384 else:
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200385 return protobuf.decode_message(self.pb_class, serialized_value)