Project import generated by Copybara.

GitOrigin-RevId: d9e9e3fb4e31372ec1fb43b178994ca78fa8fe70
diff --git a/services/cachemanager_svc.py b/services/cachemanager_svc.py
new file mode 100644
index 0000000..8dc5753
--- /dev/null
+++ b/services/cachemanager_svc.py
@@ -0,0 +1,166 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+"""A simple in-RAM cache with distributed invalidation.
+
+Here's how it works:
+ + Each frontend or backend job has one CacheManager which
+   owns a set of RamCache objects, which are basically dictionaries.
+ + Each job can put objects in its own local cache, and retrieve them.
+ + When an item is modified, the item at the corresponding cache key
+   is invalidated, which means two things: (a) it is dropped from the
+   local RAM cache, and (b) the key is written to the Invalidate table.
+ + On each incoming request, the job checks the Invalidate table for
+   any entries added since the last time that it checked.  If it finds
+   any, it drops all RamCache entries for the corresponding key.
+ + There is also a cron task that truncates old Invalidate entries
+   when the table is too large.  If a frontend job sees more than the
+   max Invalidate rows, it will drop everything from all caches,
+   because it does not know what it missed due to truncation.
+ + The special key 0 means to drop all cache entries.
+
+This approach makes jobs use cached values that are not stale at the
+time that processing of each request begins.  There is no guarantee that
+an item will not be modified by some other job and that the cached entry
+could become stale during the lifetime of that same request.
+
+TODO(jrobbins): Listener hook so that client code can register its own
+handler for invalidation events.  E.g., the sorting code has a cache that
+is correctly invalidated on each issue change, but needs to be completely
+dropped when a config is modified.
+
+TODO(jrobbins): If this part of the system becomes a bottleneck, consider
+some optimizations: (a) splitting the table into multiple tables by
+kind, or (b) sharding the table by cache_key.  Or, maybe leverage memcache
+to avoid even hitting the DB in the frequent case where nothing has changed.
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+
+import collections
+import logging
+
+from framework import jsonfeed
+from framework import sql
+
+
+INVALIDATE_TABLE_NAME = 'Invalidate'
+INVALIDATE_COLS = ['timestep', 'kind', 'cache_key']
+# Note: *_id invalidations should happen only when there's a change
+# in one of the values used to look up the internal ID number.
+# E.g. hotlist_id_2lc should only be invalidated when the hotlist
+# name or owner changes.
+INVALIDATE_KIND_VALUES = [
+    'user', 'usergroup', 'project', 'project_id', 'issue', 'issue_id',
+    'hotlist', 'hotlist_id', 'comment', 'template'
+]
+INVALIDATE_ALL_KEYS = 0
+MAX_INVALIDATE_ROWS_TO_CONSIDER = 1000
+
+
+class CacheManager(object):
+  """Service class to manage RAM caches and shared Invalidate table."""
+
+  def __init__(self):
+    self.cache_registry = collections.defaultdict(list)
+    self.processed_invalidations_up_to = 0
+    self.invalidate_tbl = sql.SQLTableManager(INVALIDATE_TABLE_NAME)
+
+  def RegisterCache(self, cache, kind):
+    """Register a cache to be notified of future invalidations."""
+    assert kind in INVALIDATE_KIND_VALUES
+    self.cache_registry[kind].append(cache)
+
+  def _InvalidateAllCaches(self):
+    """Invalidate all cache entries."""
+    for cache_list in self.cache_registry.values():
+      for cache in cache_list:
+        cache.LocalInvalidateAll()
+
+  def _ProcessInvalidationRows(self, rows):
+    """Invalidate cache entries indicated by database rows."""
+    already_done = set()
+    for timestep, kind, key in rows:
+      self.processed_invalidations_up_to = max(
+          self.processed_invalidations_up_to, timestep)
+      if (kind, key) in already_done:
+        continue
+      already_done.add((kind, key))
+      for cache in self.cache_registry[kind]:
+        if key == INVALIDATE_ALL_KEYS:
+          cache.LocalInvalidateAll()
+        else:
+          cache.LocalInvalidate(key)
+
+  def DoDistributedInvalidation(self, cnxn):
+    """Drop any cache entries that were invalidated by other jobs."""
+    # Only consider a reasonable number of rows so that we can never
+    # get bogged down on this step.  If there are too many rows to
+    # process, just invalidate all caches, and process the last group
+    # of rows to update processed_invalidations_up_to.
+    rows = self.invalidate_tbl.Select(
+        cnxn, cols=INVALIDATE_COLS,
+        where=[('timestep > %s', [self.processed_invalidations_up_to])],
+        order_by=[('timestep DESC', [])],
+        limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
+
+    cnxn.Commit()
+
+    if len(rows) == MAX_INVALIDATE_ROWS_TO_CONSIDER:
+      logging.info('Invaliditing all caches: there are too many invalidations')
+      self._InvalidateAllCaches()
+
+    logging.info('Saw %d invalidation rows', len(rows))
+    self._ProcessInvalidationRows(rows)
+
+  def StoreInvalidateRows(self, cnxn, kind, keys):
+    """Store rows to let all jobs know to invalidate the given keys."""
+    assert kind in INVALIDATE_KIND_VALUES
+    self.invalidate_tbl.InsertRows(
+        cnxn, ['kind', 'cache_key'], [(kind, key) for key in keys])
+
+  def StoreInvalidateAll(self, cnxn, kind):
+    """Store a value to tell all jobs to invalidate all items of this kind."""
+    last_timestep = self.invalidate_tbl.InsertRow(
+        cnxn, kind=kind, cache_key=INVALIDATE_ALL_KEYS)
+    self.invalidate_tbl.Delete(
+        cnxn, kind=kind, where=[('timestep < %s', [last_timestep])])
+
+
+class RamCacheConsolidate(jsonfeed.InternalTask):
+  """Drop old Invalidate rows when there are too many of them."""
+
+  def HandleRequest(self, mr):
+    """Drop excessive rows in the Invalidate table and return some stats.
+
+    Args:
+      mr: common information parsed from the HTTP request.
+
+    Returns:
+      Results dictionary in JSON format.  The stats are just for debugging,
+      they are not used by any other part of the system.
+    """
+    tbl = self.services.cache_manager.invalidate_tbl
+    old_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
+
+    # Delete anything other than the last 1000 rows because we won't
+    # look at them anyway.  If a job gets a request and sees 1000 new
+    # rows, it will drop all caches of all types, so it is as if there
+    # were INVALIDATE_ALL_KEYS entries.
+    if old_count > MAX_INVALIDATE_ROWS_TO_CONSIDER:
+      kept_timesteps = tbl.Select(
+        mr.cnxn, ['timestep'],
+        order_by=[('timestep DESC', [])],
+        limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
+      earliest_kept = kept_timesteps[-1][0]
+      tbl.Delete(mr.cnxn, where=[('timestep < %s', [earliest_kept])])
+
+    new_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
+
+    return {
+      'old_count': old_count,
+      'new_count': new_count,
+      }