Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 1 | # Copyright 2020 The Chromium Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | """A utility module for interfacing with Redis conveniently. """ |
| 5 | import json |
| 6 | import logging |
| 7 | import threading |
| 8 | |
| 9 | import redis |
| 10 | |
| 11 | import settings |
| 12 | from protorpc import protobuf |
| 13 | |
| 14 | connection_pool = None |
| 15 | |
| 16 | def CreateRedisClient(): |
| 17 | # type: () -> redis.Redis |
| 18 | """Creates a Redis object which implements Redis protocol and connection. |
| 19 | |
| 20 | Returns: |
| 21 | redis.Redis object initialized with a connection pool. |
| 22 | None on failure. |
| 23 | """ |
| 24 | global connection_pool |
| 25 | if not connection_pool: |
| 26 | connection_pool = redis.BlockingConnectionPool( |
| 27 | host=settings.redis_host, |
| 28 | port=settings.redis_port, |
| 29 | max_connections=1, |
| 30 | # When Redis is not available, calls hang indefinitely without these. |
| 31 | socket_connect_timeout=2, |
| 32 | socket_timeout=2, |
| 33 | ) |
| 34 | return redis.Redis(connection_pool=connection_pool) |
| 35 | |
| 36 | |
| 37 | def AsyncVerifyRedisConnection(): |
| 38 | # type: () -> None |
| 39 | """Verifies the redis connection in a separate thread. |
| 40 | |
| 41 | Note that although an exception in the thread won't kill the main thread, |
| 42 | it is not risk free. |
| 43 | |
| 44 | AppEngine joins with any running threads before finishing the request. |
| 45 | If this thread were to hang indefinitely, then it would cause the request |
| 46 | to hit DeadlineExceeded, thus still causing a user facing failure. |
| 47 | |
| 48 | We mitigate this risk by setting socket timeouts on our connection pool. |
| 49 | |
| 50 | # TODO(crbug/monorail/8221): Remove this code during this milestone. |
| 51 | """ |
| 52 | |
| 53 | def _AsyncVerifyRedisConnection(): |
| 54 | logging.info('AsyncVerifyRedisConnection thread started.') |
| 55 | redis_client = CreateRedisClient() |
| 56 | VerifyRedisConnection(redis_client) |
| 57 | |
| 58 | logging.info('Starting thread for AsyncVerifyRedisConnection.') |
| 59 | threading.Thread(target=_AsyncVerifyRedisConnection).start() |
| 60 | |
| 61 | |
| 62 | def FormatRedisKey(key, prefix=None): |
| 63 | # type: (int, str) -> str |
| 64 | """Converts key to string and prepends the prefix. |
| 65 | |
| 66 | Args: |
| 67 | key: Integer key. |
| 68 | prefix: String to prepend to the key. |
| 69 | |
| 70 | Returns: |
| 71 | Formatted key with the format: "namespace:prefix:key". |
| 72 | """ |
| 73 | formatted_key = '' |
| 74 | if prefix: |
| 75 | if prefix[-1] != ':': |
| 76 | prefix += ':' |
| 77 | formatted_key += prefix |
| 78 | return formatted_key + str(key) |
| 79 | |
| 80 | def VerifyRedisConnection(redis_client, msg=None): |
| 81 | # type: (redis.Redis, Optional[str]) -> bool |
| 82 | """Checks the connection to Redis to ensure a connection can be established. |
| 83 | |
| 84 | Args: |
| 85 | redis_client: client to connect and ping redis server. This can be a redis |
| 86 | or fakeRedis object. |
| 87 | msg: string for used logging information. |
| 88 | |
| 89 | Returns: |
| 90 | True when connection to server is valid. |
| 91 | False when an error occurs or redis_client is None. |
| 92 | """ |
| 93 | if not redis_client: |
| 94 | logging.info('Redis client is set to None on connect in %s', msg) |
| 95 | return False |
| 96 | try: |
| 97 | redis_client.ping() |
| 98 | logging.info('Redis client successfully connected to Redis in %s', msg) |
| 99 | return True |
| 100 | except redis.RedisError as identifier: |
| 101 | # TODO(crbug/monorail/8224): We can downgrade this to warning once we are |
| 102 | # done with the switchover from memcache. Before that, log it to ensure we |
| 103 | # see it. |
| 104 | logging.exception( |
| 105 | 'Redis error occurred while connecting to server in %s: %s', msg, |
| 106 | identifier) |
| 107 | return False |
| 108 | |
| 109 | |
| 110 | def SerializeValue(value, pb_class=None): |
| 111 | # type: (Any, Optional[type|classobj]) -> str |
| 112 | """Serialize object as for storage in Redis. """ |
| 113 | if pb_class and pb_class is not int: |
| 114 | return protobuf.encode_message(value) |
| 115 | else: |
| 116 | return json.dumps(value) |
| 117 | |
| 118 | |
| 119 | def DeserializeValue(value, pb_class=None): |
| 120 | # type: (str, Optional[type|classobj]) -> Any |
| 121 | """Deserialize a string to create a python object. """ |
| 122 | if pb_class and pb_class is not int: |
| 123 | return protobuf.decode_message(pb_class, value) |
| 124 | else: |
| 125 | return json.loads(value) |