Merge branch 'main' into avm99963-monorail
Merged commit cd4b3b336f1f14afa02990fdc2eec5d9467a827e
GitOrigin-RevId: e67bbf185d5538e1472bb42e0abb2a141f88bac1
diff --git a/services/caches.py b/services/caches.py
index 07702bf..35276a0 100644
--- a/services/caches.py
+++ b/services/caches.py
@@ -20,7 +20,6 @@
from __future__ import absolute_import
import logging
-import redis
from protorpc import protobuf
@@ -28,8 +27,6 @@
import settings
from framework import framework_constants
-from framework import redis_utils
-from proto import tracker_pb2
DEFAULT_MAX_SIZE = 10000
@@ -186,28 +183,12 @@
# so as to avoid timeouts.
_FETCH_BATCH_SIZE = 10000
- def __init__(
- self,
- cache_manager,
- kind,
- prefix,
- pb_class,
- max_size=None,
- use_redis=False,
- redis_client=None):
+ def __init__(self, cache_manager, kind, prefix, pb_class, max_size=None):
self.cache = self._MakeCache(cache_manager, kind, max_size=max_size)
self.prefix = prefix
self.pb_class = pb_class
- if use_redis:
- self.redis_client = redis_client or redis_utils.CreateRedisClient()
- self.use_redis = redis_utils.VerifyRedisConnection(
- self.redis_client, msg=kind)
- else:
- self.redis_client = None
- self.use_redis = False
-
def _MakeCache(self, cache_manager, kind, max_size=None):
"""Make the RAM cache and register it with the cache_manager."""
return RamCache(cache_manager, kind, max_size=max_size)
@@ -215,7 +196,7 @@
def CacheItem(self, key, value):
"""Add the given key-value pair to RAM and L2 cache."""
self.cache.CacheItem(key, value)
- self._WriteToCache({key: value})
+ self._WriteToMemcache({key: value})
def HasItem(self, key):
"""Return True if the given key is in the RAM cache."""
@@ -258,7 +239,7 @@
if missed_keys:
if use_cache:
- cache_hits, missed_keys = self._ReadFromCache(missed_keys)
+ cache_hits, missed_keys = self._ReadFromMemcache(missed_keys)
result_dict.update(cache_hits)
self.cache.CacheAll(cache_hits)
@@ -269,7 +250,7 @@
result_dict.update(retrieved_dict)
if use_cache:
self.cache.CacheAll(retrieved_dict)
- self._WriteToCache(retrieved_dict)
+ self._WriteToMemcache(retrieved_dict)
still_missing_keys = [key for key in keys if key not in result_dict]
return result_dict, still_missing_keys
@@ -283,7 +264,7 @@
def InvalidateKeys(self, cnxn, keys):
"""Drop the given keys from both RAM and L2 cache."""
self.cache.InvalidateKeys(cnxn, keys)
- self._DeleteFromCache(keys)
+ self._DeleteFromMemcache(keys)
def InvalidateAllKeys(self, cnxn, keys):
"""Drop the given keys from L2 cache and invalidate all keys in RAM.
@@ -292,7 +273,7 @@
invalidating a large group of keys all at once. Only use when necessary.
"""
self.cache.InvalidateAll(cnxn)
- self._DeleteFromCache(keys)
+ self._DeleteFromMemcache(keys)
def GetAllAlreadyInRam(self, keys):
"""Look only in RAM to return {key: values}, missed_keys."""
@@ -307,55 +288,6 @@
"""On RAM and L2 cache miss, hit the database."""
raise NotImplementedError()
- def _ReadFromCache(self, keys):
- # type: (Sequence[int]) -> Mapping[str, Any], Sequence[int]
- """Reads a list of keys from secondary caching service.
-
- Redis will be used if Redis is enabled and connection is valid;
- otherwise, memcache will be used.
-
- Args:
- keys: List of integer keys to look up in L2 cache.
-
- Returns:
- A pair: hits, misses. Where hits is {key: value} and misses is
- a list of any keys that were not found anywhere.
- """
- if self.use_redis:
- return self._ReadFromRedis(keys)
- else:
- return self._ReadFromMemcache(keys)
-
- def _WriteToCache(self, retrieved_dict):
- # type: (Mapping[int, Any]) -> None
- """Writes a set of key-value pairs to secondary caching service.
-
- Redis will be used if Redis is enabled and connection is valid;
- otherwise, memcache will be used.
-
- Args:
- retrieved_dict: Dictionary contains pairs of key-values to write to cache.
- """
- if self.use_redis:
- return self._WriteToRedis(retrieved_dict)
- else:
- return self._WriteToMemcache(retrieved_dict)
-
- def _DeleteFromCache(self, keys):
- # type: (Sequence[int]) -> None
- """Selects which cache to delete from.
-
- Redis will be used if Redis is enabled and connection is valid;
- otherwise, memcache will be used.
-
- Args:
- keys: List of integer keys to delete from cache.
- """
- if self.use_redis:
- return self._DeleteFromRedis(keys)
- else:
- return self._DeleteFromMemcache(keys)
-
def _ReadFromMemcache(self, keys):
# type: (Sequence[int]) -> Mapping[str, Any], Sequence[int]
"""Read the given keys from memcache, return {key: value}, missing_keys."""
@@ -403,79 +335,6 @@
key_prefix=self.prefix,
namespace=settings.memcache_namespace)
- def _WriteToRedis(self, retrieved_dict):
- # type: (Mapping[int, Any]) -> None
- """Write entries for each key-value pair to Redis. Encode PBs.
-
- Args:
- retrieved_dict: Dictionary of key-value pairs to write to Redis.
- """
- try:
- for key, value in retrieved_dict.items():
- redis_key = redis_utils.FormatRedisKey(key, prefix=self.prefix)
- redis_value = self._ValueToStr(value)
-
- self.redis_client.setex(
- redis_key, framework_constants.CACHE_EXPIRATION, redis_value)
- except redis.RedisError as identifier:
- logging.error(
- 'Redis error occurred during write operation: %s', identifier)
- self._DeleteFromRedis(list(retrieved_dict.keys()))
- return
- logging.info(
- 'cached batch of %d values in redis %s', len(retrieved_dict),
- self.prefix)
-
- def _ReadFromRedis(self, keys):
- # type: (Sequence[int]) -> Mapping[str, Any], Sequence[int]
- """Read the given keys from Redis, return {key: value}, missing keys.
-
- Args:
- keys: List of integer keys to read from Redis.
-
- Returns:
- A pair: hits, misses. Where hits is {key: value} and misses is
- a list of any keys that were not found anywhere.
- """
- cache_hits = {}
- missing_keys = []
- try:
- values_list = self.redis_client.mget(
- [redis_utils.FormatRedisKey(key, prefix=self.prefix) for key in keys])
- except redis.RedisError as identifier:
- logging.error(
- 'Redis error occurred during read operation: %s', identifier)
- values_list = [None] * len(keys)
-
- for key, serialized_value in zip(keys, values_list):
- if serialized_value:
- value = self._StrToValue(serialized_value)
- cache_hits[key] = value
- self.cache.CacheItem(key, value)
- else:
- missing_keys.append(key)
- logging.info(
- 'decoded %d values from redis %s, missing %d', len(cache_hits),
- self.prefix, len(missing_keys))
- return cache_hits, missing_keys
-
- def _DeleteFromRedis(self, keys):
- # type: (Sequence[int]) -> None
- """Delete key-values from redis.
-
- Args:
- keys: List of integer keys to delete.
- """
- try:
- self.redis_client.delete(
- *[
- redis_utils.FormatRedisKey(key, prefix=self.prefix)
- for key in keys
- ])
- except redis.RedisError as identifier:
- logging.error(
- 'Redis error occurred during delete operation %s', identifier)
-
def _KeyToStr(self, key):
# type: (int) -> str
"""Convert our int IDs to strings for use as memcache keys."""
@@ -489,26 +348,19 @@
def _ValueToStr(self, value):
# type: (Any) -> str
"""Serialize an application object so that it can be stored in L2 cache."""
- if self.use_redis:
- return redis_utils.SerializeValue(value, pb_class=self.pb_class)
+ if not self.pb_class:
+ return value
+ elif self.pb_class == int:
+ return str(value)
else:
- if not self.pb_class:
- return value
- elif self.pb_class == int:
- return str(value)
- else:
- return protobuf.encode_message(value)
+ return protobuf.encode_message(value)
def _StrToValue(self, serialized_value):
# type: (str) -> Any
"""Deserialize L2 cache string into an application object."""
- if self.use_redis:
- return redis_utils.DeserializeValue(
- serialized_value, pb_class=self.pb_class)
+ if not self.pb_class:
+ return serialized_value
+ elif self.pb_class == int:
+ return int(serialized_value)
else:
- if not self.pb_class:
- return serialized_value
- elif self.pb_class == int:
- return int(serialized_value)
- else:
- return protobuf.decode_message(self.pb_class, serialized_value)
+ return protobuf.decode_message(self.pb_class, serialized_value)