blob: 02ad6dd54c46f69e205ae5e185730e90cd921003 [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""A simple in-RAM cache with distributed invalidation.
Here's how it works:
+ Each frontend or backend job has one CacheManager which
owns a set of RamCache objects, which are basically dictionaries.
+ Each job can put objects in its own local cache, and retrieve them.
+ When an item is modified, the item at the corresponding cache key
is invalidated, which means two things: (a) it is dropped from the
local RAM cache, and (b) the key is written to the Invalidate table.
+ On each incoming request, the job checks the Invalidate table for
any entries added since the last time that it checked. If it finds
any, it drops all RamCache entries for the corresponding key.
+ There is also a cron task that truncates old Invalidate entries
when the table is too large. If a frontend job sees more than the
max Invalidate rows, it will drop everything from all caches,
because it does not know what it missed due to truncation.
+ The special key 0 means to drop all cache entries.
This approach makes jobs use cached values that are not stale at the
time that processing of each request begins. There is no guarantee that
an item will not be modified by some other job and that the cached entry
could become stale during the lifetime of that same request.
TODO(jrobbins): Listener hook so that client code can register its own
handler for invalidation events. E.g., the sorting code has a cache that
is correctly invalidated on each issue change, but needs to be completely
dropped when a config is modified.
TODO(jrobbins): If this part of the system becomes a bottleneck, consider
some optimizations: (a) splitting the table into multiple tables by
kind, or (b) sharding the table by cache_key. Or, maybe leverage memcache
to avoid even hitting the DB in the frequent case where nothing has changed.
"""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import collections
import logging
from framework import jsonfeed
from framework import sql
INVALIDATE_TABLE_NAME = 'Invalidate'
INVALIDATE_COLS = ['timestep', 'kind', 'cache_key']
# Note: *_id invalidations should happen only when there's a change
# in one of the values used to look up the internal ID number.
# E.g. hotlist_id_2lc should only be invalidated when the hotlist
# name or owner changes.
INVALIDATE_KIND_VALUES = [
'user', 'usergroup', 'project', 'project_id', 'issue', 'issue_id',
'hotlist', 'hotlist_id', 'comment', 'template'
]
INVALIDATE_ALL_KEYS = 0
MAX_INVALIDATE_ROWS_TO_CONSIDER = 1000
class CacheManager(object):
"""Service class to manage RAM caches and shared Invalidate table."""
def __init__(self):
self.cache_registry = collections.defaultdict(list)
self.processed_invalidations_up_to = 0
self.invalidate_tbl = sql.SQLTableManager(INVALIDATE_TABLE_NAME)
def RegisterCache(self, cache, kind):
"""Register a cache to be notified of future invalidations."""
assert kind in INVALIDATE_KIND_VALUES
self.cache_registry[kind].append(cache)
def _InvalidateAllCaches(self):
"""Invalidate all cache entries."""
for cache_list in self.cache_registry.values():
for cache in cache_list:
cache.LocalInvalidateAll()
def _ProcessInvalidationRows(self, rows):
"""Invalidate cache entries indicated by database rows."""
already_done = set()
for timestep, kind, key in rows:
self.processed_invalidations_up_to = max(
self.processed_invalidations_up_to, timestep)
if (kind, key) in already_done:
continue
already_done.add((kind, key))
for cache in self.cache_registry[kind]:
if key == INVALIDATE_ALL_KEYS:
cache.LocalInvalidateAll()
else:
cache.LocalInvalidate(key)
def DoDistributedInvalidation(self, cnxn):
"""Drop any cache entries that were invalidated by other jobs."""
# Only consider a reasonable number of rows so that we can never
# get bogged down on this step. If there are too many rows to
# process, just invalidate all caches, and process the last group
# of rows to update processed_invalidations_up_to.
rows = self.invalidate_tbl.Select(
cnxn, cols=INVALIDATE_COLS,
where=[('timestep > %s', [self.processed_invalidations_up_to])],
order_by=[('timestep DESC', [])],
limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
cnxn.Commit()
if len(rows) == MAX_INVALIDATE_ROWS_TO_CONSIDER:
logging.info('Invaliditing all caches: there are too many invalidations')
self._InvalidateAllCaches()
logging.info('Saw %d invalidation rows', len(rows))
self._ProcessInvalidationRows(rows)
def StoreInvalidateRows(self, cnxn, kind, keys):
"""Store rows to let all jobs know to invalidate the given keys."""
assert kind in INVALIDATE_KIND_VALUES
self.invalidate_tbl.InsertRows(
cnxn, ['kind', 'cache_key'], [(kind, key) for key in keys])
def StoreInvalidateAll(self, cnxn, kind):
"""Store a value to tell all jobs to invalidate all items of this kind."""
last_timestep = self.invalidate_tbl.InsertRow(
cnxn, kind=kind, cache_key=INVALIDATE_ALL_KEYS)
self.invalidate_tbl.Delete(
cnxn, kind=kind, where=[('timestep < %s', [last_timestep])])
# TODO: change to FlaskInternalTask when convert to Flask
class RamCacheConsolidate(jsonfeed.InternalTask):
"""Drop old Invalidate rows when there are too many of them."""
def HandleRequest(self, mr):
"""Drop excessive rows in the Invalidate table and return some stats.
Args:
mr: common information parsed from the HTTP request.
Returns:
Results dictionary in JSON format. The stats are just for debugging,
they are not used by any other part of the system.
"""
tbl = self.services.cache_manager.invalidate_tbl
old_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
# Delete anything other than the last 1000 rows because we won't
# look at them anyway. If a job gets a request and sees 1000 new
# rows, it will drop all caches of all types, so it is as if there
# were INVALIDATE_ALL_KEYS entries.
if old_count > MAX_INVALIDATE_ROWS_TO_CONSIDER:
kept_timesteps = tbl.Select(
mr.cnxn, ['timestep'],
order_by=[('timestep DESC', [])],
limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
earliest_kept = kept_timesteps[-1][0]
tbl.Delete(mr.cnxn, where=[('timestep < %s', [earliest_kept])])
new_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
return {
'old_count': old_count,
'new_count': new_count,
}
# def GetRamCacheConsolidate(self, **kwargs):
# return self.handler(**kwargs)
# def PostRamCacheConsolidate(self, **kwargs):
# return self.handler(**kwargs)