Project import generated by Copybara.
GitOrigin-RevId: d9e9e3fb4e31372ec1fb43b178994ca78fa8fe70
diff --git a/framework/redis_utils.py b/framework/redis_utils.py
new file mode 100644
index 0000000..440603b
--- /dev/null
+++ b/framework/redis_utils.py
@@ -0,0 +1,125 @@
+# Copyright 2020 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""A utility module for interfacing with Redis conveniently. """
+import json
+import logging
+import threading
+
+import redis
+
+import settings
+from protorpc import protobuf
+
+connection_pool = None
+
+def CreateRedisClient():
+ # type: () -> redis.Redis
+ """Creates a Redis object which implements Redis protocol and connection.
+
+ Returns:
+ redis.Redis object initialized with a connection pool.
+ None on failure.
+ """
+ global connection_pool
+ if not connection_pool:
+ connection_pool = redis.BlockingConnectionPool(
+ host=settings.redis_host,
+ port=settings.redis_port,
+ max_connections=1,
+ # When Redis is not available, calls hang indefinitely without these.
+ socket_connect_timeout=2,
+ socket_timeout=2,
+ )
+ return redis.Redis(connection_pool=connection_pool)
+
+
+def AsyncVerifyRedisConnection():
+ # type: () -> None
+ """Verifies the redis connection in a separate thread.
+
+ Note that although an exception in the thread won't kill the main thread,
+ it is not risk free.
+
+ AppEngine joins with any running threads before finishing the request.
+ If this thread were to hang indefinitely, then it would cause the request
+ to hit DeadlineExceeded, thus still causing a user facing failure.
+
+ We mitigate this risk by setting socket timeouts on our connection pool.
+
+ # TODO(crbug/monorail/8221): Remove this code during this milestone.
+ """
+
+ def _AsyncVerifyRedisConnection():
+ logging.info('AsyncVerifyRedisConnection thread started.')
+ redis_client = CreateRedisClient()
+ VerifyRedisConnection(redis_client)
+
+ logging.info('Starting thread for AsyncVerifyRedisConnection.')
+ threading.Thread(target=_AsyncVerifyRedisConnection).start()
+
+
+def FormatRedisKey(key, prefix=None):
+ # type: (int, str) -> str
+ """Converts key to string and prepends the prefix.
+
+ Args:
+ key: Integer key.
+ prefix: String to prepend to the key.
+
+ Returns:
+ Formatted key with the format: "namespace:prefix:key".
+ """
+ formatted_key = ''
+ if prefix:
+ if prefix[-1] != ':':
+ prefix += ':'
+ formatted_key += prefix
+ return formatted_key + str(key)
+
+def VerifyRedisConnection(redis_client, msg=None):
+ # type: (redis.Redis, Optional[str]) -> bool
+ """Checks the connection to Redis to ensure a connection can be established.
+
+ Args:
+ redis_client: client to connect and ping redis server. This can be a redis
+ or fakeRedis object.
+ msg: string for used logging information.
+
+ Returns:
+ True when connection to server is valid.
+ False when an error occurs or redis_client is None.
+ """
+ if not redis_client:
+ logging.info('Redis client is set to None on connect in %s', msg)
+ return False
+ try:
+ redis_client.ping()
+ logging.info('Redis client successfully connected to Redis in %s', msg)
+ return True
+ except redis.RedisError as identifier:
+ # TODO(crbug/monorail/8224): We can downgrade this to warning once we are
+ # done with the switchover from memcache. Before that, log it to ensure we
+ # see it.
+ logging.exception(
+ 'Redis error occurred while connecting to server in %s: %s', msg,
+ identifier)
+ return False
+
+
+def SerializeValue(value, pb_class=None):
+ # type: (Any, Optional[type|classobj]) -> str
+ """Serialize object as for storage in Redis. """
+ if pb_class and pb_class is not int:
+ return protobuf.encode_message(value)
+ else:
+ return json.dumps(value)
+
+
+def DeserializeValue(value, pb_class=None):
+ # type: (str, Optional[type|classobj]) -> Any
+ """Deserialize a string to create a python object. """
+ if pb_class and pb_class is not int:
+ return protobuf.decode_message(pb_class, value)
+ else:
+ return json.loads(value)