blob: 8801ef613dbcf97711471e6283d2972293b874ef [file] [log] [blame]
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001# Copyright 2016 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
Copybara854996b2021-09-07 19:36:02 +00004
5"""A simple in-RAM cache with distributed invalidation.
6
7Here's how it works:
8 + Each frontend or backend job has one CacheManager which
9 owns a set of RamCache objects, which are basically dictionaries.
10 + Each job can put objects in its own local cache, and retrieve them.
11 + When an item is modified, the item at the corresponding cache key
12 is invalidated, which means two things: (a) it is dropped from the
13 local RAM cache, and (b) the key is written to the Invalidate table.
14 + On each incoming request, the job checks the Invalidate table for
15 any entries added since the last time that it checked. If it finds
16 any, it drops all RamCache entries for the corresponding key.
17 + There is also a cron task that truncates old Invalidate entries
18 when the table is too large. If a frontend job sees more than the
19 max Invalidate rows, it will drop everything from all caches,
20 because it does not know what it missed due to truncation.
21 + The special key 0 means to drop all cache entries.
22
23This approach makes jobs use cached values that are not stale at the
24time that processing of each request begins. There is no guarantee that
25an item will not be modified by some other job and that the cached entry
26could become stale during the lifetime of that same request.
27
28TODO(jrobbins): Listener hook so that client code can register its own
29handler for invalidation events. E.g., the sorting code has a cache that
30is correctly invalidated on each issue change, but needs to be completely
31dropped when a config is modified.
32
33TODO(jrobbins): If this part of the system becomes a bottleneck, consider
34some optimizations: (a) splitting the table into multiple tables by
35kind, or (b) sharding the table by cache_key. Or, maybe leverage memcache
36to avoid even hitting the DB in the frequent case where nothing has changed.
37"""
38from __future__ import print_function
39from __future__ import division
40from __future__ import absolute_import
41
42import collections
43import logging
44
45from framework import jsonfeed
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +020046from framework import logger
Copybara854996b2021-09-07 19:36:02 +000047from framework import sql
48
49
50INVALIDATE_TABLE_NAME = 'Invalidate'
51INVALIDATE_COLS = ['timestep', 'kind', 'cache_key']
52# Note: *_id invalidations should happen only when there's a change
53# in one of the values used to look up the internal ID number.
54# E.g. hotlist_id_2lc should only be invalidated when the hotlist
55# name or owner changes.
56INVALIDATE_KIND_VALUES = [
57 'user', 'usergroup', 'project', 'project_id', 'issue', 'issue_id',
58 'hotlist', 'hotlist_id', 'comment', 'template'
59]
60INVALIDATE_ALL_KEYS = 0
61MAX_INVALIDATE_ROWS_TO_CONSIDER = 1000
62
63
64class CacheManager(object):
65 """Service class to manage RAM caches and shared Invalidate table."""
66
67 def __init__(self):
68 self.cache_registry = collections.defaultdict(list)
69 self.processed_invalidations_up_to = 0
70 self.invalidate_tbl = sql.SQLTableManager(INVALIDATE_TABLE_NAME)
71
72 def RegisterCache(self, cache, kind):
73 """Register a cache to be notified of future invalidations."""
74 assert kind in INVALIDATE_KIND_VALUES
75 self.cache_registry[kind].append(cache)
76
77 def _InvalidateAllCaches(self):
78 """Invalidate all cache entries."""
79 for cache_list in self.cache_registry.values():
80 for cache in cache_list:
81 cache.LocalInvalidateAll()
82
83 def _ProcessInvalidationRows(self, rows):
84 """Invalidate cache entries indicated by database rows."""
85 already_done = set()
86 for timestep, kind, key in rows:
87 self.processed_invalidations_up_to = max(
88 self.processed_invalidations_up_to, timestep)
89 if (kind, key) in already_done:
90 continue
91 already_done.add((kind, key))
92 for cache in self.cache_registry[kind]:
93 if key == INVALIDATE_ALL_KEYS:
94 cache.LocalInvalidateAll()
95 else:
96 cache.LocalInvalidate(key)
97
98 def DoDistributedInvalidation(self, cnxn):
99 """Drop any cache entries that were invalidated by other jobs."""
100 # Only consider a reasonable number of rows so that we can never
101 # get bogged down on this step. If there are too many rows to
102 # process, just invalidate all caches, and process the last group
103 # of rows to update processed_invalidations_up_to.
104 rows = self.invalidate_tbl.Select(
105 cnxn, cols=INVALIDATE_COLS,
106 where=[('timestep > %s', [self.processed_invalidations_up_to])],
107 order_by=[('timestep DESC', [])],
108 limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
109
110 cnxn.Commit()
111
112 if len(rows) == MAX_INVALIDATE_ROWS_TO_CONSIDER:
113 logging.info('Invaliditing all caches: there are too many invalidations')
114 self._InvalidateAllCaches()
115
116 logging.info('Saw %d invalidation rows', len(rows))
117 self._ProcessInvalidationRows(rows)
118
119 def StoreInvalidateRows(self, cnxn, kind, keys):
120 """Store rows to let all jobs know to invalidate the given keys."""
121 assert kind in INVALIDATE_KIND_VALUES
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +0200122 logger.log(
123 {
124 'log_type': 'cache/invalidate/rows',
125 'kind': kind,
126 'count': len(keys),
127 'keys': str(keys),
128 })
Copybara854996b2021-09-07 19:36:02 +0000129 self.invalidate_tbl.InsertRows(
130 cnxn, ['kind', 'cache_key'], [(kind, key) for key in keys])
131
132 def StoreInvalidateAll(self, cnxn, kind):
133 """Store a value to tell all jobs to invalidate all items of this kind."""
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +0200134 logger.log({'log_type': 'cache/invalidate/all', 'kind': kind})
Copybara854996b2021-09-07 19:36:02 +0000135 last_timestep = self.invalidate_tbl.InsertRow(
136 cnxn, kind=kind, cache_key=INVALIDATE_ALL_KEYS)
137 self.invalidate_tbl.Delete(
138 cnxn, kind=kind, where=[('timestep < %s', [last_timestep])])
139
140
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100141class RamCacheConsolidate(jsonfeed.InternalTask):
Copybara854996b2021-09-07 19:36:02 +0000142 """Drop old Invalidate rows when there are too many of them."""
143
144 def HandleRequest(self, mr):
145 """Drop excessive rows in the Invalidate table and return some stats.
146
147 Args:
148 mr: common information parsed from the HTTP request.
149
150 Returns:
151 Results dictionary in JSON format. The stats are just for debugging,
152 they are not used by any other part of the system.
153 """
154 tbl = self.services.cache_manager.invalidate_tbl
155 old_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
156
157 # Delete anything other than the last 1000 rows because we won't
158 # look at them anyway. If a job gets a request and sees 1000 new
159 # rows, it will drop all caches of all types, so it is as if there
160 # were INVALIDATE_ALL_KEYS entries.
161 if old_count > MAX_INVALIDATE_ROWS_TO_CONSIDER:
162 kept_timesteps = tbl.Select(
163 mr.cnxn, ['timestep'],
164 order_by=[('timestep DESC', [])],
165 limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
166 earliest_kept = kept_timesteps[-1][0]
167 tbl.Delete(mr.cnxn, where=[('timestep < %s', [earliest_kept])])
168
169 new_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
170
171 return {
172 'old_count': old_count,
173 'new_count': new_count,
174 }
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200175
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +0200176 def GetRamCacheConsolidate(self, **kwargs):
177 return self.handler(**kwargs)