Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 1 | # Copyright 2020 The Chromium Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | """Classes to manage cached values. |
| 5 | |
| 6 | Monorail makes full use of the RAM of GAE frontends to reduce latency |
| 7 | and load on the database. |
| 8 | |
| 9 | Even though these caches do invalidation, there are rare race conditions |
| 10 | that can cause a somewhat stale object to be retrieved from memcache and |
| 11 | then put into a RAM cache and used by a given GAE instance for some time. |
| 12 | So, we only use these caches for operations that can tolerate somewhat |
| 13 | stale data. For example, displaying issues in a list or displaying brief |
| 14 | info about related issues. We never use the cache to load objects as |
| 15 | part of a read-modify-save sequence because that could cause stored data |
| 16 | to revert to a previous state. |
| 17 | """ |
| 18 | from __future__ import print_function |
| 19 | from __future__ import division |
| 20 | from __future__ import absolute_import |
| 21 | |
| 22 | import logging |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 23 | |
| 24 | from protorpc import protobuf |
| 25 | |
| 26 | from google.appengine.api import memcache |
| 27 | |
| 28 | import settings |
| 29 | from framework import framework_constants |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 30 | |
| 31 | |
| 32 | DEFAULT_MAX_SIZE = 10000 |
| 33 | |
| 34 | |
| 35 | class RamCache(object): |
| 36 | """An in-RAM cache with distributed invalidation.""" |
| 37 | |
| 38 | def __init__(self, cache_manager, kind, max_size=None): |
| 39 | self.cache_manager = cache_manager |
| 40 | self.kind = kind |
| 41 | self.cache = {} |
| 42 | self.max_size = max_size or DEFAULT_MAX_SIZE |
| 43 | cache_manager.RegisterCache(self, kind) |
| 44 | |
| 45 | def CacheItem(self, key, item): |
| 46 | """Store item at key in this cache, discarding a random item if needed.""" |
| 47 | if len(self.cache) >= self.max_size: |
| 48 | self.cache.popitem() |
| 49 | |
| 50 | self.cache[key] = item |
| 51 | |
| 52 | def CacheAll(self, new_item_dict): |
| 53 | """Cache all items in the given dict, dropping old items if needed.""" |
| 54 | if len(new_item_dict) >= self.max_size: |
| 55 | logging.warn('Dumping the entire cache! %s', self.kind) |
| 56 | self.cache = {} |
| 57 | else: |
| 58 | while len(self.cache) + len(new_item_dict) > self.max_size: |
| 59 | self.cache.popitem() |
| 60 | |
| 61 | self.cache.update(new_item_dict) |
| 62 | |
| 63 | def GetItem(self, key): |
| 64 | """Return the cached item if present, otherwise None.""" |
| 65 | return self.cache.get(key) |
| 66 | |
| 67 | def HasItem(self, key): |
| 68 | """Return True if there is a value cached at the given key.""" |
| 69 | return key in self.cache |
| 70 | |
| 71 | def GetAll(self, keys): |
| 72 | """Look up the given keys. |
| 73 | |
| 74 | Args: |
| 75 | keys: a list of cache keys to look up. |
| 76 | |
| 77 | Returns: |
| 78 | A pair: (hits_dict, misses_list) where hits_dict is a dictionary of |
| 79 | all the given keys and the values that were found in the cache, and |
| 80 | misses_list is a list of given keys that were not in the cache. |
| 81 | """ |
| 82 | hits, misses = {}, [] |
| 83 | for key in keys: |
| 84 | try: |
| 85 | hits[key] = self.cache[key] |
| 86 | except KeyError: |
| 87 | misses.append(key) |
| 88 | |
| 89 | return hits, misses |
| 90 | |
| 91 | def LocalInvalidate(self, key): |
| 92 | """Drop the given key from this cache, without distributed notification.""" |
| 93 | if key in self.cache: |
| 94 | logging.info('Locally invalidating %r in kind=%r', key, self.kind) |
| 95 | self.cache.pop(key, None) |
| 96 | |
| 97 | def Invalidate(self, cnxn, key): |
| 98 | """Drop key locally, and append it to the Invalidate DB table.""" |
| 99 | self.InvalidateKeys(cnxn, [key]) |
| 100 | |
| 101 | def InvalidateKeys(self, cnxn, keys): |
| 102 | """Drop keys locally, and append them to the Invalidate DB table.""" |
| 103 | for key in keys: |
| 104 | self.LocalInvalidate(key) |
| 105 | if self.cache_manager: |
| 106 | self.cache_manager.StoreInvalidateRows(cnxn, self.kind, keys) |
| 107 | |
| 108 | def LocalInvalidateAll(self): |
| 109 | """Invalidate all keys locally: just start over with an empty dict.""" |
| 110 | logging.info('Locally invalidating all in kind=%r', self.kind) |
| 111 | self.cache = {} |
| 112 | |
| 113 | def InvalidateAll(self, cnxn): |
| 114 | """Invalidate all keys in this cache.""" |
| 115 | self.LocalInvalidateAll() |
| 116 | if self.cache_manager: |
| 117 | self.cache_manager.StoreInvalidateAll(cnxn, self.kind) |
| 118 | |
| 119 | |
| 120 | class ShardedRamCache(RamCache): |
| 121 | """Specialized version of RamCache that stores values in parts. |
| 122 | |
| 123 | Instead of the cache keys being simple integers, they are pairs, e.g., |
| 124 | (project_id, shard_id). Invalidation will invalidate all shards for |
| 125 | a given main key, e.g, invalidating project_id 16 will drop keys |
| 126 | (16, 0), (16, 1), (16, 2), ... (16, 9). |
| 127 | """ |
| 128 | |
| 129 | def __init__(self, cache_manager, kind, max_size=None, num_shards=10): |
| 130 | super(ShardedRamCache, self).__init__( |
| 131 | cache_manager, kind, max_size=max_size) |
| 132 | self.num_shards = num_shards |
| 133 | |
| 134 | def LocalInvalidate(self, key): |
| 135 | """Use the specified value to drop entries from the local cache.""" |
| 136 | logging.info('About to invalidate shared RAM keys %r', |
| 137 | [(key, shard_id) for shard_id in range(self.num_shards) |
| 138 | if (key, shard_id) in self.cache]) |
| 139 | for shard_id in range(self.num_shards): |
| 140 | self.cache.pop((key, shard_id), None) |
| 141 | |
| 142 | |
| 143 | class ValueCentricRamCache(RamCache): |
| 144 | """Specialized version of RamCache that stores values in InvalidateTable. |
| 145 | |
| 146 | This is useful for caches that have non integer keys. |
| 147 | """ |
| 148 | |
| 149 | def LocalInvalidate(self, value): |
| 150 | """Use the specified value to drop entries from the local cache.""" |
| 151 | keys_to_drop = [] |
| 152 | # Loop through and collect all keys with the specified value. |
| 153 | for k, v in self.cache.items(): |
| 154 | if v == value: |
| 155 | keys_to_drop.append(k) |
| 156 | for k in keys_to_drop: |
| 157 | self.cache.pop(k, None) |
| 158 | |
| 159 | def InvalidateKeys(self, cnxn, keys): |
| 160 | """Drop keys locally, and append their values to the Invalidate DB table.""" |
| 161 | # Find values to invalidate. |
| 162 | values = [self.cache[key] for key in keys if self.cache.has_key(key)] |
| 163 | if len(values) == len(keys): |
| 164 | for value in values: |
| 165 | self.LocalInvalidate(value) |
| 166 | if self.cache_manager: |
| 167 | self.cache_manager.StoreInvalidateRows(cnxn, self.kind, values) |
| 168 | else: |
| 169 | # If a value is not found in the cache then invalidate the whole cache. |
| 170 | # This is done to ensure that we are not in an inconsistent state or in a |
| 171 | # race condition. |
| 172 | self.InvalidateAll(cnxn) |
| 173 | |
| 174 | |
| 175 | class AbstractTwoLevelCache(object): |
| 176 | """A class to manage both RAM and secondary-caching layer to retrieve objects. |
| 177 | |
| 178 | Subclasses must implement the FetchItems() method to get objects from |
| 179 | the database when both caches miss. |
| 180 | """ |
| 181 | |
| 182 | # When loading a huge number of issues from the database, do it in chunks |
| 183 | # so as to avoid timeouts. |
| 184 | _FETCH_BATCH_SIZE = 10000 |
| 185 | |
Adrià Vilanova Martínez | de94280 | 2022-07-15 14:06:55 +0200 | [diff] [blame^] | 186 | def __init__(self, cache_manager, kind, prefix, pb_class, max_size=None): |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 187 | |
| 188 | self.cache = self._MakeCache(cache_manager, kind, max_size=max_size) |
| 189 | self.prefix = prefix |
| 190 | self.pb_class = pb_class |
| 191 | |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 192 | def _MakeCache(self, cache_manager, kind, max_size=None): |
| 193 | """Make the RAM cache and register it with the cache_manager.""" |
| 194 | return RamCache(cache_manager, kind, max_size=max_size) |
| 195 | |
| 196 | def CacheItem(self, key, value): |
| 197 | """Add the given key-value pair to RAM and L2 cache.""" |
| 198 | self.cache.CacheItem(key, value) |
Adrià Vilanova Martínez | de94280 | 2022-07-15 14:06:55 +0200 | [diff] [blame^] | 199 | self._WriteToMemcache({key: value}) |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 200 | |
| 201 | def HasItem(self, key): |
| 202 | """Return True if the given key is in the RAM cache.""" |
| 203 | return self.cache.HasItem(key) |
| 204 | |
| 205 | def GetAnyOnHandItem(self, keys, start=None, end=None): |
| 206 | """Try to find one of the specified items in RAM.""" |
| 207 | if start is None: |
| 208 | start = 0 |
| 209 | if end is None: |
| 210 | end = len(keys) |
| 211 | for i in range(start, end): |
| 212 | key = keys[i] |
| 213 | if self.cache.HasItem(key): |
| 214 | return self.cache.GetItem(key) |
| 215 | |
| 216 | # Note: We could check L2 here too, but the round-trips to L2 |
| 217 | # are kind of slow. And, getting too many hits from L2 actually |
| 218 | # fills our RAM cache too quickly and could lead to thrashing. |
| 219 | |
| 220 | return None |
| 221 | |
| 222 | def GetAll(self, cnxn, keys, use_cache=True, **kwargs): |
| 223 | """Get values for the given keys from RAM, the L2 cache, or the DB. |
| 224 | |
| 225 | Args: |
| 226 | cnxn: connection to the database. |
| 227 | keys: list of integer keys to look up. |
| 228 | use_cache: set to False to always hit the database. |
| 229 | **kwargs: any additional keywords are passed to FetchItems(). |
| 230 | |
| 231 | Returns: |
| 232 | A pair: hits, misses. Where hits is {key: value} and misses is |
| 233 | a list of any keys that were not found anywhere. |
| 234 | """ |
| 235 | if use_cache: |
| 236 | result_dict, missed_keys = self.cache.GetAll(keys) |
| 237 | else: |
| 238 | result_dict, missed_keys = {}, list(keys) |
| 239 | |
| 240 | if missed_keys: |
| 241 | if use_cache: |
Adrià Vilanova Martínez | de94280 | 2022-07-15 14:06:55 +0200 | [diff] [blame^] | 242 | cache_hits, missed_keys = self._ReadFromMemcache(missed_keys) |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 243 | result_dict.update(cache_hits) |
| 244 | self.cache.CacheAll(cache_hits) |
| 245 | |
| 246 | while missed_keys: |
| 247 | missed_batch = missed_keys[:self._FETCH_BATCH_SIZE] |
| 248 | missed_keys = missed_keys[self._FETCH_BATCH_SIZE:] |
| 249 | retrieved_dict = self.FetchItems(cnxn, missed_batch, **kwargs) |
| 250 | result_dict.update(retrieved_dict) |
| 251 | if use_cache: |
| 252 | self.cache.CacheAll(retrieved_dict) |
Adrià Vilanova Martínez | de94280 | 2022-07-15 14:06:55 +0200 | [diff] [blame^] | 253 | self._WriteToMemcache(retrieved_dict) |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 254 | |
| 255 | still_missing_keys = [key for key in keys if key not in result_dict] |
| 256 | return result_dict, still_missing_keys |
| 257 | |
| 258 | def LocalInvalidateAll(self): |
| 259 | self.cache.LocalInvalidateAll() |
| 260 | |
| 261 | def LocalInvalidate(self, key): |
| 262 | self.cache.LocalInvalidate(key) |
| 263 | |
| 264 | def InvalidateKeys(self, cnxn, keys): |
| 265 | """Drop the given keys from both RAM and L2 cache.""" |
| 266 | self.cache.InvalidateKeys(cnxn, keys) |
Adrià Vilanova Martínez | de94280 | 2022-07-15 14:06:55 +0200 | [diff] [blame^] | 267 | self._DeleteFromMemcache(keys) |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 268 | |
| 269 | def InvalidateAllKeys(self, cnxn, keys): |
| 270 | """Drop the given keys from L2 cache and invalidate all keys in RAM. |
| 271 | |
| 272 | Useful for avoiding inserting many rows into the Invalidate table when |
| 273 | invalidating a large group of keys all at once. Only use when necessary. |
| 274 | """ |
| 275 | self.cache.InvalidateAll(cnxn) |
Adrià Vilanova Martínez | de94280 | 2022-07-15 14:06:55 +0200 | [diff] [blame^] | 276 | self._DeleteFromMemcache(keys) |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 277 | |
| 278 | def GetAllAlreadyInRam(self, keys): |
| 279 | """Look only in RAM to return {key: values}, missed_keys.""" |
| 280 | result_dict, missed_keys = self.cache.GetAll(keys) |
| 281 | return result_dict, missed_keys |
| 282 | |
| 283 | def InvalidateAllRamEntries(self, cnxn): |
| 284 | """Drop all RAM cache entries. It will refill as needed from L2 cache.""" |
| 285 | self.cache.InvalidateAll(cnxn) |
| 286 | |
| 287 | def FetchItems(self, cnxn, keys, **kwargs): |
| 288 | """On RAM and L2 cache miss, hit the database.""" |
| 289 | raise NotImplementedError() |
| 290 | |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 291 | def _ReadFromMemcache(self, keys): |
| 292 | # type: (Sequence[int]) -> Mapping[str, Any], Sequence[int] |
| 293 | """Read the given keys from memcache, return {key: value}, missing_keys.""" |
| 294 | cache_hits = {} |
| 295 | cached_dict = memcache.get_multi( |
| 296 | [self._KeyToStr(key) for key in keys], |
| 297 | key_prefix=self.prefix, |
| 298 | namespace=settings.memcache_namespace) |
| 299 | |
| 300 | for key_str, serialized_value in cached_dict.items(): |
| 301 | value = self._StrToValue(serialized_value) |
| 302 | key = self._StrToKey(key_str) |
| 303 | cache_hits[key] = value |
| 304 | self.cache.CacheItem(key, value) |
| 305 | |
| 306 | still_missing_keys = [key for key in keys if key not in cache_hits] |
| 307 | return cache_hits, still_missing_keys |
| 308 | |
| 309 | def _WriteToMemcache(self, retrieved_dict): |
| 310 | # type: (Mapping[int, int]) -> None |
| 311 | """Write entries for each key-value pair to memcache. Encode PBs.""" |
| 312 | strs_to_cache = { |
| 313 | self._KeyToStr(key): self._ValueToStr(value) |
| 314 | for key, value in retrieved_dict.items()} |
| 315 | |
| 316 | try: |
| 317 | memcache.add_multi( |
| 318 | strs_to_cache, |
| 319 | key_prefix=self.prefix, |
| 320 | time=framework_constants.CACHE_EXPIRATION, |
| 321 | namespace=settings.memcache_namespace) |
| 322 | except ValueError as identifier: |
| 323 | # If memcache does not accept the values, ensure that no stale |
| 324 | # values are left, then bail out. |
| 325 | logging.error('Got memcache error: %r', identifier) |
| 326 | self._DeleteFromMemcache(list(strs_to_cache.keys())) |
| 327 | return |
| 328 | |
| 329 | def _DeleteFromMemcache(self, keys): |
| 330 | # type: (Sequence[str]) -> None |
| 331 | """Delete key-values from memcache. """ |
| 332 | memcache.delete_multi( |
| 333 | [self._KeyToStr(key) for key in keys], |
| 334 | seconds=5, |
| 335 | key_prefix=self.prefix, |
| 336 | namespace=settings.memcache_namespace) |
| 337 | |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 338 | def _KeyToStr(self, key): |
| 339 | # type: (int) -> str |
| 340 | """Convert our int IDs to strings for use as memcache keys.""" |
| 341 | return str(key) |
| 342 | |
| 343 | def _StrToKey(self, key_str): |
| 344 | # type: (str) -> int |
| 345 | """Convert memcache keys back to the ints that we use as IDs.""" |
| 346 | return int(key_str) |
| 347 | |
| 348 | def _ValueToStr(self, value): |
| 349 | # type: (Any) -> str |
| 350 | """Serialize an application object so that it can be stored in L2 cache.""" |
Adrià Vilanova Martínez | de94280 | 2022-07-15 14:06:55 +0200 | [diff] [blame^] | 351 | if not self.pb_class: |
| 352 | return value |
| 353 | elif self.pb_class == int: |
| 354 | return str(value) |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 355 | else: |
Adrià Vilanova Martínez | de94280 | 2022-07-15 14:06:55 +0200 | [diff] [blame^] | 356 | return protobuf.encode_message(value) |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 357 | |
| 358 | def _StrToValue(self, serialized_value): |
| 359 | # type: (str) -> Any |
| 360 | """Deserialize L2 cache string into an application object.""" |
Adrià Vilanova Martínez | de94280 | 2022-07-15 14:06:55 +0200 | [diff] [blame^] | 361 | if not self.pb_class: |
| 362 | return serialized_value |
| 363 | elif self.pb_class == int: |
| 364 | return int(serialized_value) |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 365 | else: |
Adrià Vilanova Martínez | de94280 | 2022-07-15 14:06:55 +0200 | [diff] [blame^] | 366 | return protobuf.decode_message(self.pb_class, serialized_value) |