blob: 89954e3f9dba59513108622ae0eddd29c34d799f [file] [log] [blame]
# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A helper module for interfacing with google cloud tasks.
This module wraps Gooogle Cloud Tasks, link to its documentation:
https://googleapis.dev/python/cloudtasks/1.3.0/gapic/v2/api.html
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import six
from six.moves import urllib
from google.api_core import exceptions
from google.api_core import retry
import settings
if not settings.unit_test_mode:
import grpc
from google.cloud import tasks_v2
from google.cloud.tasks_v2.services import cloud_tasks
_client = None
# Default exponential backoff retry config for enqueueing, not to be confused
# with retry config for dispatching, which exists per queue.
_DEFAULT_RETRY = retry.Retry(initial=.1, maximum=1.6, multiplier=2, deadline=10)
def _get_client():
# type: () -> tasks.CloudTasksClient
"""Returns a cloud tasks client."""
global _client
if not _client:
if settings.local_mode:
transport = cloud_tasks.transports.CloudTasksGrpcTransport(
channel=grpc.insecure_channel(settings.CLOUD_TASKS_EMULATOR_ADDRESS))
_client = tasks_v2.CloudTasksClient(transport=transport)
else:
_client = tasks_v2.CloudTasksClient()
return _client
def create_task(task, queue='default', **kwargs):
# type: (Union[dict, tasks.types.Task], str, **Any) ->
# tasks.types.Task
"""Tries and catches creating a cloud task.
This exposes a simplied task creation interface by wrapping
tasks.CloudTasksClient.create_task; see its documentation:
https://googleapis.dev/python/cloudtasks/1.5.0/gapic/v2/api.html#google.cloud.tasks_v2.CloudTasksClient.create_task
Args:
task: A dict or Task describing the task to add.
queue: A string indicating name of the queue to add task to.
kwargs: Additional arguments to pass to cloud task client's create_task
Returns:
Successfully created Task object.
Raises:
AttributeError: If input task is malformed or missing attributes.
google.api_core.exceptions.GoogleAPICallError: If the request failed for any
reason.
google.api_core.exceptions.RetryError: If the request failed due to a
retryable error and retry attempts failed.
ValueError: If the parameters are invalid.
"""
client = _get_client()
parent = client.queue_path(
settings.app_id, settings.CLOUD_TASKS_REGION, queue)
target = task.get('app_engine_http_request').get('relative_uri')
kwargs.setdefault('retry', _DEFAULT_RETRY)
logging.info('Enqueueing %s task to %s', target, parent)
return client.create_task(parent=parent, task=task, **kwargs)
def generate_simple_task(url, params):
# type: (str, dict) -> dict
"""Construct a basic cloud tasks Task for an appengine handler.
Args:
url: Url path that handles the task.
params: Url query parameters dict.
Returns:
Dict representing a cloud tasks Task object.
"""
return {
'app_engine_http_request':
{
'relative_uri': url,
'body': six.ensure_binary(urllib.parse.urlencode(params)),
'headers': {
'Content-type': 'application/x-www-form-urlencoded'
}
}
}