blob: 02ad6dd54c46f69e205ae5e185730e90cd921003 [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""A simple in-RAM cache with distributed invalidation.
7
8Here's how it works:
9 + Each frontend or backend job has one CacheManager which
10 owns a set of RamCache objects, which are basically dictionaries.
11 + Each job can put objects in its own local cache, and retrieve them.
12 + When an item is modified, the item at the corresponding cache key
13 is invalidated, which means two things: (a) it is dropped from the
14 local RAM cache, and (b) the key is written to the Invalidate table.
15 + On each incoming request, the job checks the Invalidate table for
16 any entries added since the last time that it checked. If it finds
17 any, it drops all RamCache entries for the corresponding key.
18 + There is also a cron task that truncates old Invalidate entries
19 when the table is too large. If a frontend job sees more than the
20 max Invalidate rows, it will drop everything from all caches,
21 because it does not know what it missed due to truncation.
22 + The special key 0 means to drop all cache entries.
23
24This approach makes jobs use cached values that are not stale at the
25time that processing of each request begins. There is no guarantee that
26an item will not be modified by some other job and that the cached entry
27could become stale during the lifetime of that same request.
28
29TODO(jrobbins): Listener hook so that client code can register its own
30handler for invalidation events. E.g., the sorting code has a cache that
31is correctly invalidated on each issue change, but needs to be completely
32dropped when a config is modified.
33
34TODO(jrobbins): If this part of the system becomes a bottleneck, consider
35some optimizations: (a) splitting the table into multiple tables by
36kind, or (b) sharding the table by cache_key. Or, maybe leverage memcache
37to avoid even hitting the DB in the frequent case where nothing has changed.
38"""
39from __future__ import print_function
40from __future__ import division
41from __future__ import absolute_import
42
43import collections
44import logging
45
46from framework import jsonfeed
47from framework import sql
48
49
50INVALIDATE_TABLE_NAME = 'Invalidate'
51INVALIDATE_COLS = ['timestep', 'kind', 'cache_key']
52# Note: *_id invalidations should happen only when there's a change
53# in one of the values used to look up the internal ID number.
54# E.g. hotlist_id_2lc should only be invalidated when the hotlist
55# name or owner changes.
56INVALIDATE_KIND_VALUES = [
57 'user', 'usergroup', 'project', 'project_id', 'issue', 'issue_id',
58 'hotlist', 'hotlist_id', 'comment', 'template'
59]
60INVALIDATE_ALL_KEYS = 0
61MAX_INVALIDATE_ROWS_TO_CONSIDER = 1000
62
63
64class CacheManager(object):
65 """Service class to manage RAM caches and shared Invalidate table."""
66
67 def __init__(self):
68 self.cache_registry = collections.defaultdict(list)
69 self.processed_invalidations_up_to = 0
70 self.invalidate_tbl = sql.SQLTableManager(INVALIDATE_TABLE_NAME)
71
72 def RegisterCache(self, cache, kind):
73 """Register a cache to be notified of future invalidations."""
74 assert kind in INVALIDATE_KIND_VALUES
75 self.cache_registry[kind].append(cache)
76
77 def _InvalidateAllCaches(self):
78 """Invalidate all cache entries."""
79 for cache_list in self.cache_registry.values():
80 for cache in cache_list:
81 cache.LocalInvalidateAll()
82
83 def _ProcessInvalidationRows(self, rows):
84 """Invalidate cache entries indicated by database rows."""
85 already_done = set()
86 for timestep, kind, key in rows:
87 self.processed_invalidations_up_to = max(
88 self.processed_invalidations_up_to, timestep)
89 if (kind, key) in already_done:
90 continue
91 already_done.add((kind, key))
92 for cache in self.cache_registry[kind]:
93 if key == INVALIDATE_ALL_KEYS:
94 cache.LocalInvalidateAll()
95 else:
96 cache.LocalInvalidate(key)
97
98 def DoDistributedInvalidation(self, cnxn):
99 """Drop any cache entries that were invalidated by other jobs."""
100 # Only consider a reasonable number of rows so that we can never
101 # get bogged down on this step. If there are too many rows to
102 # process, just invalidate all caches, and process the last group
103 # of rows to update processed_invalidations_up_to.
104 rows = self.invalidate_tbl.Select(
105 cnxn, cols=INVALIDATE_COLS,
106 where=[('timestep > %s', [self.processed_invalidations_up_to])],
107 order_by=[('timestep DESC', [])],
108 limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
109
110 cnxn.Commit()
111
112 if len(rows) == MAX_INVALIDATE_ROWS_TO_CONSIDER:
113 logging.info('Invaliditing all caches: there are too many invalidations')
114 self._InvalidateAllCaches()
115
116 logging.info('Saw %d invalidation rows', len(rows))
117 self._ProcessInvalidationRows(rows)
118
119 def StoreInvalidateRows(self, cnxn, kind, keys):
120 """Store rows to let all jobs know to invalidate the given keys."""
121 assert kind in INVALIDATE_KIND_VALUES
122 self.invalidate_tbl.InsertRows(
123 cnxn, ['kind', 'cache_key'], [(kind, key) for key in keys])
124
125 def StoreInvalidateAll(self, cnxn, kind):
126 """Store a value to tell all jobs to invalidate all items of this kind."""
127 last_timestep = self.invalidate_tbl.InsertRow(
128 cnxn, kind=kind, cache_key=INVALIDATE_ALL_KEYS)
129 self.invalidate_tbl.Delete(
130 cnxn, kind=kind, where=[('timestep < %s', [last_timestep])])
131
132
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200133# TODO: change to FlaskInternalTask when convert to Flask
Copybara854996b2021-09-07 19:36:02 +0000134class RamCacheConsolidate(jsonfeed.InternalTask):
135 """Drop old Invalidate rows when there are too many of them."""
136
137 def HandleRequest(self, mr):
138 """Drop excessive rows in the Invalidate table and return some stats.
139
140 Args:
141 mr: common information parsed from the HTTP request.
142
143 Returns:
144 Results dictionary in JSON format. The stats are just for debugging,
145 they are not used by any other part of the system.
146 """
147 tbl = self.services.cache_manager.invalidate_tbl
148 old_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
149
150 # Delete anything other than the last 1000 rows because we won't
151 # look at them anyway. If a job gets a request and sees 1000 new
152 # rows, it will drop all caches of all types, so it is as if there
153 # were INVALIDATE_ALL_KEYS entries.
154 if old_count > MAX_INVALIDATE_ROWS_TO_CONSIDER:
155 kept_timesteps = tbl.Select(
156 mr.cnxn, ['timestep'],
157 order_by=[('timestep DESC', [])],
158 limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
159 earliest_kept = kept_timesteps[-1][0]
160 tbl.Delete(mr.cnxn, where=[('timestep < %s', [earliest_kept])])
161
162 new_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
163
164 return {
165 'old_count': old_count,
166 'new_count': new_count,
167 }
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200168
169 # def GetRamCacheConsolidate(self, **kwargs):
170 # return self.handler(**kwargs)
171
172 # def PostRamCacheConsolidate(self, **kwargs):
173 # return self.handler(**kwargs)