blob: 0c23ac52fff4115f8d5cb148288ca0910775b8d7 [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2019 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file.
4
5"""Cron and task handlers for syncing with wipeoute-lite and deleting users."""
6
7from __future__ import print_function
8from __future__ import division
9from __future__ import absolute_import
10
11import json
12import logging
13import httplib2
14
15from google.appengine.api import app_identity
16
17from businesslogic import work_env
18from framework import cloud_tasks_helpers
19from framework import framework_constants
20from framework import jsonfeed
21from framework import urls
22from oauth2client.client import GoogleCredentials
23
24WIPEOUT_ENDPOINT = 'https://emporia-pa.googleapis.com/v1/apps/%s'
25MAX_BATCH_SIZE = 10000
26MAX_DELETE_USERS_SIZE = 1000
27
28
29def authorize():
30 credentials = GoogleCredentials.get_application_default()
31 credentials = credentials.create_scoped(framework_constants.OAUTH_SCOPE)
32 return credentials.authorize(httplib2.Http(timeout=60))
33
34
35class WipeoutSyncCron(jsonfeed.InternalTask):
36 """Enqueue tasks for sending user lists to wipeout-lite and deleting deleted
37 users fetched from wipeout-lite."""
38
39 def HandleRequest(self, mr):
40 batch_param = mr.GetIntParam('batchsize', default_value=MAX_BATCH_SIZE)
41 # Use batch_param as batch_size unless it is None or 0.
42 batch_size = min(batch_param, MAX_BATCH_SIZE)
43 total_users = self.services.user.TotalUsersCount(mr.cnxn)
44 total_batches = int(total_users / batch_size)
45 # Add an extra batch to process remainder user emails.
46 if total_users % batch_size:
47 total_batches += 1
48 if not total_batches:
49 logging.info('No users to report.')
50 return
51
52 for i in range(total_batches):
53 params = dict(limit=batch_size, offset=i * batch_size)
54 task = cloud_tasks_helpers.generate_simple_task(
55 urls.SEND_WIPEOUT_USER_LISTS_TASK + '.do', params)
56 cloud_tasks_helpers.create_task(
57 task, queue=framework_constants.QUEUE_SEND_WIPEOUT_USER_LISTS)
58
59 task = cloud_tasks_helpers.generate_simple_task(
60 urls.DELETE_WIPEOUT_USERS_TASK + '.do', {})
61 cloud_tasks_helpers.create_task(
62 task, queue=framework_constants.QUEUE_FETCH_WIPEOUT_DELETED_USERS)
63
64
65class SendWipeoutUserListsTask(jsonfeed.InternalTask):
66 """Sends a batch of monorail users to wipeout-lite."""
67
68 def HandleRequest(self, mr):
69 limit = mr.GetIntParam('limit')
70 assert limit != None, 'Missing param limit'
71 offset = mr.GetIntParam('offset')
72 assert offset != None, 'Missing param offset'
73 emails = self.services.user.GetAllUserEmailsBatch(
74 mr.cnxn, limit=limit, offset=offset)
75 accounts = [{'id': email} for email in emails]
76 service = authorize()
77 self.sendUserLists(service, accounts)
78
79 def sendUserLists(self, service, accounts):
80 app_id = app_identity.get_application_id()
81 endpoint = WIPEOUT_ENDPOINT % app_id
82 resp, data = service.request(
83 '%s/verifiedaccounts' % endpoint,
84 method='POST',
85 headers={'Content-Type': 'application/json; charset=UTF-8'},
86 body=json.dumps(accounts))
87 logging.info(
88 'Received response, %s with contents, %s', resp, data)
89
90
91class DeleteWipeoutUsersTask(jsonfeed.InternalTask):
92 """Fetches deleted users from wipeout-lite and enqueues tasks to delete
93 those users from Monorail's DB."""
94
95 def HandleRequest(self, mr):
96 limit = mr.GetIntParam('limit', MAX_DELETE_USERS_SIZE)
97 limit = min(limit, MAX_DELETE_USERS_SIZE)
98 service = authorize()
99 deleted_user_data = self.fetchDeletedUsers(service)
100 deleted_emails = [user_object['id'] for user_object in deleted_user_data]
101 total_batches = int(len(deleted_emails) / limit)
102 if len(deleted_emails) % limit:
103 total_batches += 1
104
105 for i in range(total_batches):
106 start = i * limit
107 end = start + limit
108 params = dict(emails=','.join(deleted_emails[start:end]))
109 task = cloud_tasks_helpers.generate_simple_task(
110 urls.DELETE_USERS_TASK + '.do', params)
111 cloud_tasks_helpers.create_task(
112 task, queue=framework_constants.QUEUE_DELETE_USERS)
113
114 def fetchDeletedUsers(self, service):
115 app_id = app_identity.get_application_id()
116 endpoint = WIPEOUT_ENDPOINT % app_id
117 resp, data = service.request(
118 '%s/deletedaccounts' % endpoint,
119 method='GET',
120 headers={'Content-Type': 'application/json; charset=UTF-8'})
121 logging.info(
122 'Received response, %s with contents, %s', resp, data)
123 return json.loads(data)
124
125
126class DeleteUsersTask(jsonfeed.InternalTask):
127 """Deletes users from Monorail's DB."""
128
129 def HandleRequest(self, mr):
130 """Delete users with the emails given in the 'emails' param."""
131 emails = mr.GetListParam('emails', default_value=[])
132 assert len(emails) <= MAX_DELETE_USERS_SIZE, (
133 'We cannot delete more than %d users at once, current users: %d' %
134 (MAX_DELETE_USERS_SIZE, len(emails)))
135 if len(emails) == 0:
136 logging.info("No user emails found in deletion request")
137 return
138 with work_env.WorkEnv(mr, self.services) as we:
139 we.ExpungeUsers(emails, check_perms=False)