Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 1 | # Copyright 2019 The Chromium Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style |
| 3 | # license that can be found in the LICENSE file. |
| 4 | |
| 5 | """Cron and task handlers for syncing with wipeoute-lite and deleting users.""" |
| 6 | |
| 7 | from __future__ import print_function |
| 8 | from __future__ import division |
| 9 | from __future__ import absolute_import |
| 10 | |
| 11 | import json |
| 12 | import logging |
| 13 | import httplib2 |
| 14 | |
| 15 | from google.appengine.api import app_identity |
| 16 | |
| 17 | from businesslogic import work_env |
| 18 | from framework import cloud_tasks_helpers |
| 19 | from framework import framework_constants |
| 20 | from framework import jsonfeed |
| 21 | from framework import urls |
| 22 | from oauth2client.client import GoogleCredentials |
| 23 | |
| 24 | WIPEOUT_ENDPOINT = 'https://emporia-pa.googleapis.com/v1/apps/%s' |
| 25 | MAX_BATCH_SIZE = 10000 |
| 26 | MAX_DELETE_USERS_SIZE = 1000 |
| 27 | |
| 28 | |
| 29 | def authorize(): |
| 30 | credentials = GoogleCredentials.get_application_default() |
| 31 | credentials = credentials.create_scoped(framework_constants.OAUTH_SCOPE) |
| 32 | return credentials.authorize(httplib2.Http(timeout=60)) |
| 33 | |
| 34 | |
| 35 | class WipeoutSyncCron(jsonfeed.InternalTask): |
| 36 | """Enqueue tasks for sending user lists to wipeout-lite and deleting deleted |
| 37 | users fetched from wipeout-lite.""" |
| 38 | |
| 39 | def HandleRequest(self, mr): |
| 40 | batch_param = mr.GetIntParam('batchsize', default_value=MAX_BATCH_SIZE) |
| 41 | # Use batch_param as batch_size unless it is None or 0. |
| 42 | batch_size = min(batch_param, MAX_BATCH_SIZE) |
| 43 | total_users = self.services.user.TotalUsersCount(mr.cnxn) |
| 44 | total_batches = int(total_users / batch_size) |
| 45 | # Add an extra batch to process remainder user emails. |
| 46 | if total_users % batch_size: |
| 47 | total_batches += 1 |
| 48 | if not total_batches: |
| 49 | logging.info('No users to report.') |
| 50 | return |
| 51 | |
| 52 | for i in range(total_batches): |
| 53 | params = dict(limit=batch_size, offset=i * batch_size) |
| 54 | task = cloud_tasks_helpers.generate_simple_task( |
| 55 | urls.SEND_WIPEOUT_USER_LISTS_TASK + '.do', params) |
| 56 | cloud_tasks_helpers.create_task( |
| 57 | task, queue=framework_constants.QUEUE_SEND_WIPEOUT_USER_LISTS) |
| 58 | |
| 59 | task = cloud_tasks_helpers.generate_simple_task( |
| 60 | urls.DELETE_WIPEOUT_USERS_TASK + '.do', {}) |
| 61 | cloud_tasks_helpers.create_task( |
| 62 | task, queue=framework_constants.QUEUE_FETCH_WIPEOUT_DELETED_USERS) |
| 63 | |
| 64 | |
| 65 | class SendWipeoutUserListsTask(jsonfeed.InternalTask): |
| 66 | """Sends a batch of monorail users to wipeout-lite.""" |
| 67 | |
| 68 | def HandleRequest(self, mr): |
| 69 | limit = mr.GetIntParam('limit') |
| 70 | assert limit != None, 'Missing param limit' |
| 71 | offset = mr.GetIntParam('offset') |
| 72 | assert offset != None, 'Missing param offset' |
| 73 | emails = self.services.user.GetAllUserEmailsBatch( |
| 74 | mr.cnxn, limit=limit, offset=offset) |
| 75 | accounts = [{'id': email} for email in emails] |
| 76 | service = authorize() |
| 77 | self.sendUserLists(service, accounts) |
| 78 | |
| 79 | def sendUserLists(self, service, accounts): |
| 80 | app_id = app_identity.get_application_id() |
| 81 | endpoint = WIPEOUT_ENDPOINT % app_id |
| 82 | resp, data = service.request( |
| 83 | '%s/verifiedaccounts' % endpoint, |
| 84 | method='POST', |
| 85 | headers={'Content-Type': 'application/json; charset=UTF-8'}, |
| 86 | body=json.dumps(accounts)) |
| 87 | logging.info( |
| 88 | 'Received response, %s with contents, %s', resp, data) |
| 89 | |
| 90 | |
| 91 | class DeleteWipeoutUsersTask(jsonfeed.InternalTask): |
| 92 | """Fetches deleted users from wipeout-lite and enqueues tasks to delete |
| 93 | those users from Monorail's DB.""" |
| 94 | |
| 95 | def HandleRequest(self, mr): |
| 96 | limit = mr.GetIntParam('limit', MAX_DELETE_USERS_SIZE) |
| 97 | limit = min(limit, MAX_DELETE_USERS_SIZE) |
| 98 | service = authorize() |
| 99 | deleted_user_data = self.fetchDeletedUsers(service) |
| 100 | deleted_emails = [user_object['id'] for user_object in deleted_user_data] |
| 101 | total_batches = int(len(deleted_emails) / limit) |
| 102 | if len(deleted_emails) % limit: |
| 103 | total_batches += 1 |
| 104 | |
| 105 | for i in range(total_batches): |
| 106 | start = i * limit |
| 107 | end = start + limit |
| 108 | params = dict(emails=','.join(deleted_emails[start:end])) |
| 109 | task = cloud_tasks_helpers.generate_simple_task( |
| 110 | urls.DELETE_USERS_TASK + '.do', params) |
| 111 | cloud_tasks_helpers.create_task( |
| 112 | task, queue=framework_constants.QUEUE_DELETE_USERS) |
| 113 | |
| 114 | def fetchDeletedUsers(self, service): |
| 115 | app_id = app_identity.get_application_id() |
| 116 | endpoint = WIPEOUT_ENDPOINT % app_id |
| 117 | resp, data = service.request( |
| 118 | '%s/deletedaccounts' % endpoint, |
| 119 | method='GET', |
| 120 | headers={'Content-Type': 'application/json; charset=UTF-8'}) |
| 121 | logging.info( |
| 122 | 'Received response, %s with contents, %s', resp, data) |
| 123 | return json.loads(data) |
| 124 | |
| 125 | |
| 126 | class DeleteUsersTask(jsonfeed.InternalTask): |
| 127 | """Deletes users from Monorail's DB.""" |
| 128 | |
| 129 | def HandleRequest(self, mr): |
| 130 | """Delete users with the emails given in the 'emails' param.""" |
| 131 | emails = mr.GetListParam('emails', default_value=[]) |
| 132 | assert len(emails) <= MAX_DELETE_USERS_SIZE, ( |
| 133 | 'We cannot delete more than %d users at once, current users: %d' % |
| 134 | (MAX_DELETE_USERS_SIZE, len(emails))) |
| 135 | if len(emails) == 0: |
| 136 | logging.info("No user emails found in deletion request") |
| 137 | return |
| 138 | with work_env.WorkEnv(mr, self.services) as we: |
| 139 | we.ExpungeUsers(emails, check_perms=False) |