blob: 753bffa949b00bc834628d3a9b0b64ae70f8ea28 [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""A simple in-RAM cache with distributed invalidation.
7
8Here's how it works:
9 + Each frontend or backend job has one CacheManager which
10 owns a set of RamCache objects, which are basically dictionaries.
11 + Each job can put objects in its own local cache, and retrieve them.
12 + When an item is modified, the item at the corresponding cache key
13 is invalidated, which means two things: (a) it is dropped from the
14 local RAM cache, and (b) the key is written to the Invalidate table.
15 + On each incoming request, the job checks the Invalidate table for
16 any entries added since the last time that it checked. If it finds
17 any, it drops all RamCache entries for the corresponding key.
18 + There is also a cron task that truncates old Invalidate entries
19 when the table is too large. If a frontend job sees more than the
20 max Invalidate rows, it will drop everything from all caches,
21 because it does not know what it missed due to truncation.
22 + The special key 0 means to drop all cache entries.
23
24This approach makes jobs use cached values that are not stale at the
25time that processing of each request begins. There is no guarantee that
26an item will not be modified by some other job and that the cached entry
27could become stale during the lifetime of that same request.
28
29TODO(jrobbins): Listener hook so that client code can register its own
30handler for invalidation events. E.g., the sorting code has a cache that
31is correctly invalidated on each issue change, but needs to be completely
32dropped when a config is modified.
33
34TODO(jrobbins): If this part of the system becomes a bottleneck, consider
35some optimizations: (a) splitting the table into multiple tables by
36kind, or (b) sharding the table by cache_key. Or, maybe leverage memcache
37to avoid even hitting the DB in the frequent case where nothing has changed.
38"""
39from __future__ import print_function
40from __future__ import division
41from __future__ import absolute_import
42
43import collections
44import logging
45
46from framework import jsonfeed
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +020047from framework import logger
Copybara854996b2021-09-07 19:36:02 +000048from framework import sql
49
50
51INVALIDATE_TABLE_NAME = 'Invalidate'
52INVALIDATE_COLS = ['timestep', 'kind', 'cache_key']
53# Note: *_id invalidations should happen only when there's a change
54# in one of the values used to look up the internal ID number.
55# E.g. hotlist_id_2lc should only be invalidated when the hotlist
56# name or owner changes.
57INVALIDATE_KIND_VALUES = [
58 'user', 'usergroup', 'project', 'project_id', 'issue', 'issue_id',
59 'hotlist', 'hotlist_id', 'comment', 'template'
60]
61INVALIDATE_ALL_KEYS = 0
62MAX_INVALIDATE_ROWS_TO_CONSIDER = 1000
63
64
65class CacheManager(object):
66 """Service class to manage RAM caches and shared Invalidate table."""
67
68 def __init__(self):
69 self.cache_registry = collections.defaultdict(list)
70 self.processed_invalidations_up_to = 0
71 self.invalidate_tbl = sql.SQLTableManager(INVALIDATE_TABLE_NAME)
72
73 def RegisterCache(self, cache, kind):
74 """Register a cache to be notified of future invalidations."""
75 assert kind in INVALIDATE_KIND_VALUES
76 self.cache_registry[kind].append(cache)
77
78 def _InvalidateAllCaches(self):
79 """Invalidate all cache entries."""
80 for cache_list in self.cache_registry.values():
81 for cache in cache_list:
82 cache.LocalInvalidateAll()
83
84 def _ProcessInvalidationRows(self, rows):
85 """Invalidate cache entries indicated by database rows."""
86 already_done = set()
87 for timestep, kind, key in rows:
88 self.processed_invalidations_up_to = max(
89 self.processed_invalidations_up_to, timestep)
90 if (kind, key) in already_done:
91 continue
92 already_done.add((kind, key))
93 for cache in self.cache_registry[kind]:
94 if key == INVALIDATE_ALL_KEYS:
95 cache.LocalInvalidateAll()
96 else:
97 cache.LocalInvalidate(key)
98
99 def DoDistributedInvalidation(self, cnxn):
100 """Drop any cache entries that were invalidated by other jobs."""
101 # Only consider a reasonable number of rows so that we can never
102 # get bogged down on this step. If there are too many rows to
103 # process, just invalidate all caches, and process the last group
104 # of rows to update processed_invalidations_up_to.
105 rows = self.invalidate_tbl.Select(
106 cnxn, cols=INVALIDATE_COLS,
107 where=[('timestep > %s', [self.processed_invalidations_up_to])],
108 order_by=[('timestep DESC', [])],
109 limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
110
111 cnxn.Commit()
112
113 if len(rows) == MAX_INVALIDATE_ROWS_TO_CONSIDER:
114 logging.info('Invaliditing all caches: there are too many invalidations')
115 self._InvalidateAllCaches()
116
117 logging.info('Saw %d invalidation rows', len(rows))
118 self._ProcessInvalidationRows(rows)
119
120 def StoreInvalidateRows(self, cnxn, kind, keys):
121 """Store rows to let all jobs know to invalidate the given keys."""
122 assert kind in INVALIDATE_KIND_VALUES
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +0200123 logger.log(
124 {
125 'log_type': 'cache/invalidate/rows',
126 'kind': kind,
127 'count': len(keys),
128 'keys': str(keys),
129 })
Copybara854996b2021-09-07 19:36:02 +0000130 self.invalidate_tbl.InsertRows(
131 cnxn, ['kind', 'cache_key'], [(kind, key) for key in keys])
132
133 def StoreInvalidateAll(self, cnxn, kind):
134 """Store a value to tell all jobs to invalidate all items of this kind."""
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +0200135 logger.log({'log_type': 'cache/invalidate/all', 'kind': kind})
Copybara854996b2021-09-07 19:36:02 +0000136 last_timestep = self.invalidate_tbl.InsertRow(
137 cnxn, kind=kind, cache_key=INVALIDATE_ALL_KEYS)
138 self.invalidate_tbl.Delete(
139 cnxn, kind=kind, where=[('timestep < %s', [last_timestep])])
140
141
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +0200142class RamCacheConsolidate(jsonfeed.FlaskInternalTask):
Copybara854996b2021-09-07 19:36:02 +0000143 """Drop old Invalidate rows when there are too many of them."""
144
145 def HandleRequest(self, mr):
146 """Drop excessive rows in the Invalidate table and return some stats.
147
148 Args:
149 mr: common information parsed from the HTTP request.
150
151 Returns:
152 Results dictionary in JSON format. The stats are just for debugging,
153 they are not used by any other part of the system.
154 """
155 tbl = self.services.cache_manager.invalidate_tbl
156 old_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
157
158 # Delete anything other than the last 1000 rows because we won't
159 # look at them anyway. If a job gets a request and sees 1000 new
160 # rows, it will drop all caches of all types, so it is as if there
161 # were INVALIDATE_ALL_KEYS entries.
162 if old_count > MAX_INVALIDATE_ROWS_TO_CONSIDER:
163 kept_timesteps = tbl.Select(
164 mr.cnxn, ['timestep'],
165 order_by=[('timestep DESC', [])],
166 limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
167 earliest_kept = kept_timesteps[-1][0]
168 tbl.Delete(mr.cnxn, where=[('timestep < %s', [earliest_kept])])
169
170 new_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
171
172 return {
173 'old_count': old_count,
174 'new_count': new_count,
175 }
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200176
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +0200177 def GetRamCacheConsolidate(self, **kwargs):
178 return self.handler(**kwargs)