blob: bd9b7f96dc3d48d94837a12849d029f6c63b4a0b [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2020 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""A helper module for interfacing with google cloud tasks.
5
6This module wraps Gooogle Cloud Tasks, link to its documentation:
7https://googleapis.dev/python/cloudtasks/1.3.0/gapic/v2/api.html
8"""
9
10from __future__ import absolute_import
11from __future__ import division
12from __future__ import print_function
13
14import logging
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +020015from six.moves import urllib
Copybara854996b2021-09-07 19:36:02 +000016
17from google.api_core import exceptions
18from google.api_core import retry
19
20import settings
21
22if not settings.unit_test_mode:
23 import grpc
24 from google.cloud import tasks
25
26_client = None
27# Default exponential backoff retry config for enqueueing, not to be confused
28# with retry config for dispatching, which exists per queue.
29_DEFAULT_RETRY = retry.Retry(initial=.1, maximum=1.6, multiplier=2, deadline=10)
30
31
32def _get_client():
33 # type: () -> tasks.CloudTasksClient
34 """Returns a cloud tasks client."""
35 global _client
36 if not _client:
37 if settings.local_mode:
38 _client = tasks.CloudTasksClient(
39 channel=grpc.insecure_channel(settings.CLOUD_TASKS_EMULATOR_ADDRESS))
40 else:
41 _client = tasks.CloudTasksClient()
42 return _client
43
44
45def create_task(task, queue='default', **kwargs):
46 # type: (Union[dict, tasks.types.Task], str, **Any) ->
47 # tasks.types.Task
48 """Tries and catches creating a cloud task.
49
50 This exposes a simplied task creation interface by wrapping
51 tasks.CloudTasksClient.create_task; see its documentation:
52 https://googleapis.dev/python/cloudtasks/1.5.0/gapic/v2/api.html#google.cloud.tasks_v2.CloudTasksClient.create_task
53
54 Args:
55 task: A dict or Task describing the task to add.
56 queue: A string indicating name of the queue to add task to.
57 kwargs: Additional arguments to pass to cloud task client's create_task
58
59 Returns:
60 Successfully created Task object.
61
62 Raises:
63 AttributeError: If input task is malformed or missing attributes.
64 google.api_core.exceptions.GoogleAPICallError: If the request failed for any
65 reason.
66 google.api_core.exceptions.RetryError: If the request failed due to a
67 retryable error and retry attempts failed.
68 ValueError: If the parameters are invalid.
69 """
70 client = _get_client()
71
72 parent = client.queue_path(
73 settings.app_id, settings.CLOUD_TASKS_REGION, queue)
74 target = task.get('app_engine_http_request').get('relative_uri')
75 kwargs.setdefault('retry', _DEFAULT_RETRY)
76 logging.info('Enqueueing %s task to %s', target, parent)
77 return client.create_task(parent, task, **kwargs)
78
79
80def generate_simple_task(url, params):
81 # type: (str, dict) -> dict
82 """Construct a basic cloud tasks Task for an appengine handler.
83 Args:
84 url: Url path that handles the task.
85 params: Url query parameters dict.
86
87 Returns:
88 Dict representing a cloud tasks Task object.
89 """
90 return {
91 'app_engine_http_request':
92 {
93 'relative_uri': url,
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +020094 'body': urllib.parse.urlencode(params),
Copybara854996b2021-09-07 19:36:02 +000095 'headers': {
96 'Content-type': 'application/x-www-form-urlencoded'
97 }
98 }
99 }