Merge branch 'main' into avm99963-monorail

Merged commit cd4b3b336f1f14afa02990fdc2eec5d9467a827e

GitOrigin-RevId: e67bbf185d5538e1472bb42e0abb2a141f88bac1
diff --git a/services/caches.py b/services/caches.py
index 07702bf..35276a0 100644
--- a/services/caches.py
+++ b/services/caches.py
@@ -20,7 +20,6 @@
 from __future__ import absolute_import
 
 import logging
-import redis
 
 from protorpc import protobuf
 
@@ -28,8 +27,6 @@
 
 import settings
 from framework import framework_constants
-from framework import redis_utils
-from proto import tracker_pb2
 
 
 DEFAULT_MAX_SIZE = 10000
@@ -186,28 +183,12 @@
   # so as to avoid timeouts.
   _FETCH_BATCH_SIZE = 10000
 
-  def __init__(
-      self,
-      cache_manager,
-      kind,
-      prefix,
-      pb_class,
-      max_size=None,
-      use_redis=False,
-      redis_client=None):
+  def __init__(self, cache_manager, kind, prefix, pb_class, max_size=None):
 
     self.cache = self._MakeCache(cache_manager, kind, max_size=max_size)
     self.prefix = prefix
     self.pb_class = pb_class
 
-    if use_redis:
-      self.redis_client = redis_client or redis_utils.CreateRedisClient()
-      self.use_redis = redis_utils.VerifyRedisConnection(
-          self.redis_client, msg=kind)
-    else:
-      self.redis_client = None
-      self.use_redis = False
-
   def _MakeCache(self, cache_manager, kind, max_size=None):
     """Make the RAM cache and register it with the cache_manager."""
     return RamCache(cache_manager, kind, max_size=max_size)
@@ -215,7 +196,7 @@
   def CacheItem(self, key, value):
     """Add the given key-value pair to RAM and L2 cache."""
     self.cache.CacheItem(key, value)
-    self._WriteToCache({key: value})
+    self._WriteToMemcache({key: value})
 
   def HasItem(self, key):
     """Return True if the given key is in the RAM cache."""
@@ -258,7 +239,7 @@
 
     if missed_keys:
       if use_cache:
-        cache_hits, missed_keys = self._ReadFromCache(missed_keys)
+        cache_hits, missed_keys = self._ReadFromMemcache(missed_keys)
         result_dict.update(cache_hits)
         self.cache.CacheAll(cache_hits)
 
@@ -269,7 +250,7 @@
       result_dict.update(retrieved_dict)
       if use_cache:
         self.cache.CacheAll(retrieved_dict)
-        self._WriteToCache(retrieved_dict)
+        self._WriteToMemcache(retrieved_dict)
 
     still_missing_keys = [key for key in keys if key not in result_dict]
     return result_dict, still_missing_keys
@@ -283,7 +264,7 @@
   def InvalidateKeys(self, cnxn, keys):
     """Drop the given keys from both RAM and L2 cache."""
     self.cache.InvalidateKeys(cnxn, keys)
-    self._DeleteFromCache(keys)
+    self._DeleteFromMemcache(keys)
 
   def InvalidateAllKeys(self, cnxn, keys):
     """Drop the given keys from L2 cache and invalidate all keys in RAM.
@@ -292,7 +273,7 @@
     invalidating a large group of keys all at once. Only use when necessary.
     """
     self.cache.InvalidateAll(cnxn)
-    self._DeleteFromCache(keys)
+    self._DeleteFromMemcache(keys)
 
   def GetAllAlreadyInRam(self, keys):
     """Look only in RAM to return {key: values}, missed_keys."""
@@ -307,55 +288,6 @@
     """On RAM and L2 cache miss, hit the database."""
     raise NotImplementedError()
 
-  def _ReadFromCache(self, keys):
-    # type: (Sequence[int]) -> Mapping[str, Any], Sequence[int]
-    """Reads a list of keys from secondary caching service.
-
-    Redis will be used if Redis is enabled and connection is valid;
-    otherwise, memcache will be used.
-
-    Args:
-      keys: List of integer keys to look up in L2 cache.
-
-    Returns:
-      A pair: hits, misses.  Where hits is {key: value} and misses is
-        a list of any keys that were not found anywhere.
-    """
-    if self.use_redis:
-      return self._ReadFromRedis(keys)
-    else:
-      return self._ReadFromMemcache(keys)
-
-  def _WriteToCache(self, retrieved_dict):
-    # type: (Mapping[int, Any]) -> None
-    """Writes a set of key-value pairs to secondary caching service.
-
-    Redis will be used if Redis is enabled and connection is valid;
-    otherwise, memcache will be used.
-
-    Args:
-      retrieved_dict: Dictionary contains pairs of key-values to write to cache.
-    """
-    if self.use_redis:
-      return self._WriteToRedis(retrieved_dict)
-    else:
-      return self._WriteToMemcache(retrieved_dict)
-
-  def _DeleteFromCache(self, keys):
-    # type: (Sequence[int]) -> None
-    """Selects which cache to delete from.
-
-    Redis will be used if Redis is enabled and connection is valid;
-    otherwise, memcache will be used.
-
-    Args:
-      keys: List of integer keys to delete from cache.
-    """
-    if self.use_redis:
-      return self._DeleteFromRedis(keys)
-    else:
-      return self._DeleteFromMemcache(keys)
-
   def _ReadFromMemcache(self, keys):
     # type: (Sequence[int]) -> Mapping[str, Any], Sequence[int]
     """Read the given keys from memcache, return {key: value}, missing_keys."""
@@ -403,79 +335,6 @@
         key_prefix=self.prefix,
         namespace=settings.memcache_namespace)
 
-  def _WriteToRedis(self, retrieved_dict):
-    # type: (Mapping[int, Any]) -> None
-    """Write entries for each key-value pair to Redis.  Encode PBs.
-
-    Args:
-      retrieved_dict: Dictionary of key-value pairs to write to Redis.
-    """
-    try:
-      for key, value in retrieved_dict.items():
-        redis_key = redis_utils.FormatRedisKey(key, prefix=self.prefix)
-        redis_value = self._ValueToStr(value)
-
-        self.redis_client.setex(
-            redis_key, framework_constants.CACHE_EXPIRATION, redis_value)
-    except redis.RedisError as identifier:
-      logging.error(
-          'Redis error occurred during write operation: %s', identifier)
-      self._DeleteFromRedis(list(retrieved_dict.keys()))
-      return
-    logging.info(
-        'cached batch of %d values in redis %s', len(retrieved_dict),
-        self.prefix)
-
-  def _ReadFromRedis(self, keys):
-    # type: (Sequence[int]) -> Mapping[str, Any], Sequence[int]
-    """Read the given keys from Redis, return {key: value}, missing keys.
-
-    Args:
-      keys: List of integer keys to read from Redis.
-
-    Returns:
-      A pair: hits, misses.  Where hits is {key: value} and misses is
-        a list of any keys that were not found anywhere.
-    """
-    cache_hits = {}
-    missing_keys = []
-    try:
-      values_list = self.redis_client.mget(
-          [redis_utils.FormatRedisKey(key, prefix=self.prefix) for key in keys])
-    except redis.RedisError as identifier:
-      logging.error(
-          'Redis error occurred during read operation: %s', identifier)
-      values_list = [None] * len(keys)
-
-    for key, serialized_value in zip(keys, values_list):
-      if serialized_value:
-        value = self._StrToValue(serialized_value)
-        cache_hits[key] = value
-        self.cache.CacheItem(key, value)
-      else:
-        missing_keys.append(key)
-    logging.info(
-        'decoded %d values from redis %s, missing %d', len(cache_hits),
-        self.prefix, len(missing_keys))
-    return cache_hits, missing_keys
-
-  def _DeleteFromRedis(self, keys):
-    # type: (Sequence[int]) -> None
-    """Delete key-values from redis.
-
-    Args:
-      keys: List of integer keys to delete.
-    """
-    try:
-      self.redis_client.delete(
-          *[
-              redis_utils.FormatRedisKey(key, prefix=self.prefix)
-              for key in keys
-          ])
-    except redis.RedisError as identifier:
-      logging.error(
-          'Redis error occurred during delete operation %s', identifier)
-
   def _KeyToStr(self, key):
     # type: (int) -> str
     """Convert our int IDs to strings for use as memcache keys."""
@@ -489,26 +348,19 @@
   def _ValueToStr(self, value):
     # type: (Any) -> str
     """Serialize an application object so that it can be stored in L2 cache."""
-    if self.use_redis:
-      return redis_utils.SerializeValue(value, pb_class=self.pb_class)
+    if not self.pb_class:
+      return value
+    elif self.pb_class == int:
+      return str(value)
     else:
-      if not self.pb_class:
-        return value
-      elif self.pb_class == int:
-        return str(value)
-      else:
-        return protobuf.encode_message(value)
+      return protobuf.encode_message(value)
 
   def _StrToValue(self, serialized_value):
     # type: (str) -> Any
     """Deserialize L2 cache string into an application object."""
-    if self.use_redis:
-      return redis_utils.DeserializeValue(
-          serialized_value, pb_class=self.pb_class)
+    if not self.pb_class:
+      return serialized_value
+    elif self.pb_class == int:
+      return int(serialized_value)
     else:
-      if not self.pb_class:
-        return serialized_value
-      elif self.pb_class == int:
-        return int(serialized_value)
-      else:
-        return protobuf.decode_message(self.pb_class, serialized_value)
+      return protobuf.decode_message(self.pb_class, serialized_value)