blob: 739782e5b5fe99afd5dfeaf057827aa8925999aa [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2019 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file.
4
5"""Cron and task handlers for syncing with wipeoute-lite and deleting users."""
6
7from __future__ import print_function
8from __future__ import division
9from __future__ import absolute_import
10
11import json
12import logging
13import httplib2
14
15from google.appengine.api import app_identity
16
17from businesslogic import work_env
18from framework import cloud_tasks_helpers
19from framework import framework_constants
20from framework import jsonfeed
21from framework import urls
22from oauth2client.client import GoogleCredentials
23
24WIPEOUT_ENDPOINT = 'https://emporia-pa.googleapis.com/v1/apps/%s'
25MAX_BATCH_SIZE = 10000
26MAX_DELETE_USERS_SIZE = 1000
27
28
29def authorize():
30 credentials = GoogleCredentials.get_application_default()
31 credentials = credentials.create_scoped(framework_constants.OAUTH_SCOPE)
32 return credentials.authorize(httplib2.Http(timeout=60))
33
34
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +020035# TODO: change to FlaskInternalTask when convert to Flask
Copybara854996b2021-09-07 19:36:02 +000036class WipeoutSyncCron(jsonfeed.InternalTask):
37 """Enqueue tasks for sending user lists to wipeout-lite and deleting deleted
38 users fetched from wipeout-lite."""
39
40 def HandleRequest(self, mr):
41 batch_param = mr.GetIntParam('batchsize', default_value=MAX_BATCH_SIZE)
42 # Use batch_param as batch_size unless it is None or 0.
43 batch_size = min(batch_param, MAX_BATCH_SIZE)
44 total_users = self.services.user.TotalUsersCount(mr.cnxn)
45 total_batches = int(total_users / batch_size)
46 # Add an extra batch to process remainder user emails.
47 if total_users % batch_size:
48 total_batches += 1
49 if not total_batches:
50 logging.info('No users to report.')
51 return
52
53 for i in range(total_batches):
54 params = dict(limit=batch_size, offset=i * batch_size)
55 task = cloud_tasks_helpers.generate_simple_task(
56 urls.SEND_WIPEOUT_USER_LISTS_TASK + '.do', params)
57 cloud_tasks_helpers.create_task(
58 task, queue=framework_constants.QUEUE_SEND_WIPEOUT_USER_LISTS)
59
60 task = cloud_tasks_helpers.generate_simple_task(
61 urls.DELETE_WIPEOUT_USERS_TASK + '.do', {})
62 cloud_tasks_helpers.create_task(
63 task, queue=framework_constants.QUEUE_FETCH_WIPEOUT_DELETED_USERS)
64
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +020065 # def GetWipeoutSyncCron(self, **kwargs):
66 # return self.handler(**kwargs)
Copybara854996b2021-09-07 19:36:02 +000067
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +020068 # def PostWipeoutSyncCron(self, **kwargs):
69 # return self.handler(**kwargs)
70
71
72# TODO: Set to FlaskInternalTask when convert
Copybara854996b2021-09-07 19:36:02 +000073class SendWipeoutUserListsTask(jsonfeed.InternalTask):
74 """Sends a batch of monorail users to wipeout-lite."""
75
76 def HandleRequest(self, mr):
77 limit = mr.GetIntParam('limit')
78 assert limit != None, 'Missing param limit'
79 offset = mr.GetIntParam('offset')
80 assert offset != None, 'Missing param offset'
81 emails = self.services.user.GetAllUserEmailsBatch(
82 mr.cnxn, limit=limit, offset=offset)
83 accounts = [{'id': email} for email in emails]
84 service = authorize()
85 self.sendUserLists(service, accounts)
86
87 def sendUserLists(self, service, accounts):
88 app_id = app_identity.get_application_id()
89 endpoint = WIPEOUT_ENDPOINT % app_id
90 resp, data = service.request(
91 '%s/verifiedaccounts' % endpoint,
92 method='POST',
93 headers={'Content-Type': 'application/json; charset=UTF-8'},
94 body=json.dumps(accounts))
95 logging.info(
96 'Received response, %s with contents, %s', resp, data)
97
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +020098 # def GetSendWipeoutUserListsTask(self, **kwargs):
99 # return self.handler(**kwargs)
Copybara854996b2021-09-07 19:36:02 +0000100
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200101 # def PostSendWipeoutUserListsTask(self, **kwargs):
102 # return self.handler(**kwargs)
103
104
105# TODO: Set to FlaskInternalTask when convert
Copybara854996b2021-09-07 19:36:02 +0000106class DeleteWipeoutUsersTask(jsonfeed.InternalTask):
107 """Fetches deleted users from wipeout-lite and enqueues tasks to delete
108 those users from Monorail's DB."""
109
110 def HandleRequest(self, mr):
111 limit = mr.GetIntParam('limit', MAX_DELETE_USERS_SIZE)
112 limit = min(limit, MAX_DELETE_USERS_SIZE)
113 service = authorize()
114 deleted_user_data = self.fetchDeletedUsers(service)
115 deleted_emails = [user_object['id'] for user_object in deleted_user_data]
116 total_batches = int(len(deleted_emails) / limit)
117 if len(deleted_emails) % limit:
118 total_batches += 1
119
120 for i in range(total_batches):
121 start = i * limit
122 end = start + limit
123 params = dict(emails=','.join(deleted_emails[start:end]))
124 task = cloud_tasks_helpers.generate_simple_task(
125 urls.DELETE_USERS_TASK + '.do', params)
126 cloud_tasks_helpers.create_task(
127 task, queue=framework_constants.QUEUE_DELETE_USERS)
128
129 def fetchDeletedUsers(self, service):
130 app_id = app_identity.get_application_id()
131 endpoint = WIPEOUT_ENDPOINT % app_id
132 resp, data = service.request(
133 '%s/deletedaccounts' % endpoint,
134 method='GET',
135 headers={'Content-Type': 'application/json; charset=UTF-8'})
136 logging.info(
137 'Received response, %s with contents, %s', resp, data)
138 return json.loads(data)
139
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200140 # def GetDeleteWipeoutUsersTask(self, **kwargs):
141 # return self.handler(**kwargs)
Copybara854996b2021-09-07 19:36:02 +0000142
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200143 # def PostDeleteWipeoutUsersTask(self, **kwargs):
144 # return self.handler(**kwargs)
145
146
147# TODO: Set to FlaskInternalTask when convert
Copybara854996b2021-09-07 19:36:02 +0000148class DeleteUsersTask(jsonfeed.InternalTask):
149 """Deletes users from Monorail's DB."""
150
151 def HandleRequest(self, mr):
152 """Delete users with the emails given in the 'emails' param."""
153 emails = mr.GetListParam('emails', default_value=[])
154 assert len(emails) <= MAX_DELETE_USERS_SIZE, (
155 'We cannot delete more than %d users at once, current users: %d' %
156 (MAX_DELETE_USERS_SIZE, len(emails)))
157 if len(emails) == 0:
158 logging.info("No user emails found in deletion request")
159 return
160 with work_env.WorkEnv(mr, self.services) as we:
161 we.ExpungeUsers(emails, check_perms=False)
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200162
163 # def GetDeleteUsersTask(self, **kwargs):
164 # return self.handler(**kwargs)
165
166 # def PostDeleteUsersTask(self, **kwargs):
167 # return self.handler(**kwargs)