blob: 5c16189d275a4e492d9e7107a3ef9c1299e8cf40 [file] [log] [blame]
# Copyright 2019 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Cron and task handlers for syncing with wipeoute-lite and deleting users."""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import json
import logging
import httplib2
from google.appengine.api import app_identity
from businesslogic import work_env
from framework import cloud_tasks_helpers
from framework import framework_constants
from framework import jsonfeed
from framework import urls
from oauth2client.client import GoogleCredentials
WIPEOUT_ENDPOINT = 'https://emporia-pa.googleapis.com/v1/apps/%s'
MAX_BATCH_SIZE = 10000
MAX_DELETE_USERS_SIZE = 1000
def authorize():
credentials = GoogleCredentials.get_application_default()
credentials = credentials.create_scoped(framework_constants.OAUTH_SCOPE)
return credentials.authorize(httplib2.Http(timeout=60))
class WipeoutSyncCron(jsonfeed.InternalTask):
"""Enqueue tasks for sending user lists to wipeout-lite and deleting deleted
users fetched from wipeout-lite."""
def HandleRequest(self, mr):
batch_param = mr.GetIntParam('batchsize', default_value=MAX_BATCH_SIZE)
# Use batch_param as batch_size unless it is None or 0.
batch_size = min(batch_param, MAX_BATCH_SIZE)
total_users = self.services.user.TotalUsersCount(mr.cnxn)
total_batches = int(total_users / batch_size)
# Add an extra batch to process remainder user emails.
if total_users % batch_size:
total_batches += 1
if not total_batches:
logging.info('No users to report.')
return
for i in range(total_batches):
params = dict(limit=batch_size, offset=i * batch_size)
task = cloud_tasks_helpers.generate_simple_task(
urls.SEND_WIPEOUT_USER_LISTS_TASK + '.do', params)
cloud_tasks_helpers.create_task(
task, queue=framework_constants.QUEUE_SEND_WIPEOUT_USER_LISTS)
task = cloud_tasks_helpers.generate_simple_task(
urls.DELETE_WIPEOUT_USERS_TASK + '.do', {})
cloud_tasks_helpers.create_task(
task, queue=framework_constants.QUEUE_FETCH_WIPEOUT_DELETED_USERS)
def GetWipeoutSyncCron(self, **kwargs):
return self.handler(**kwargs)
class SendWipeoutUserListsTask(jsonfeed.InternalTask):
"""Sends a batch of monorail users to wipeout-lite."""
def HandleRequest(self, mr):
limit = mr.GetIntParam('limit')
assert limit != None, 'Missing param limit'
offset = mr.GetIntParam('offset')
assert offset != None, 'Missing param offset'
emails = self.services.user.GetAllUserEmailsBatch(
mr.cnxn, limit=limit, offset=offset)
accounts = [{'id': email} for email in emails]
service = authorize()
self.sendUserLists(service, accounts)
def sendUserLists(self, service, accounts):
app_id = app_identity.get_application_id()
endpoint = WIPEOUT_ENDPOINT % app_id
resp, data = service.request(
'%s/verifiedaccounts' % endpoint,
method='POST',
headers={'Content-Type': 'application/json; charset=UTF-8'},
body=json.dumps(accounts))
logging.info(
'Received response, %s with contents, %s', resp, data)
def PostSendWipeoutUserListsTask(self, **kwargs):
return self.handler(**kwargs)
class DeleteWipeoutUsersTask(jsonfeed.InternalTask):
"""Fetches deleted users from wipeout-lite and enqueues tasks to delete
those users from Monorail's DB."""
def HandleRequest(self, mr):
limit = mr.GetIntParam('limit', MAX_DELETE_USERS_SIZE)
limit = min(limit, MAX_DELETE_USERS_SIZE)
service = authorize()
deleted_user_data = self.fetchDeletedUsers(service)
deleted_emails = [user_object['id'] for user_object in deleted_user_data]
total_batches = int(len(deleted_emails) / limit)
if len(deleted_emails) % limit:
total_batches += 1
for i in range(total_batches):
start = i * limit
end = start + limit
params = dict(emails=','.join(deleted_emails[start:end]))
task = cloud_tasks_helpers.generate_simple_task(
urls.DELETE_USERS_TASK + '.do', params)
cloud_tasks_helpers.create_task(
task, queue=framework_constants.QUEUE_DELETE_USERS)
def fetchDeletedUsers(self, service):
app_id = app_identity.get_application_id()
endpoint = WIPEOUT_ENDPOINT % app_id
resp, data = service.request(
'%s/deletedaccounts' % endpoint,
method='GET',
headers={'Content-Type': 'application/json; charset=UTF-8'})
logging.info(
'Received response, %s with contents, %s', resp, data)
return json.loads(data)
def PostDeleteWipeoutUsersTask(self, **kwargs):
return self.handler(**kwargs)
class DeleteUsersTask(jsonfeed.InternalTask):
"""Deletes users from Monorail's DB."""
def HandleRequest(self, mr):
"""Delete users with the emails given in the 'emails' param."""
emails = mr.GetListParam('emails', default_value=[])
assert len(emails) <= MAX_DELETE_USERS_SIZE, (
'We cannot delete more than %d users at once, current users: %d' %
(MAX_DELETE_USERS_SIZE, len(emails)))
if len(emails) == 0:
logging.info("No user emails found in deletion request")
return
with work_env.WorkEnv(mr, self.services) as we:
we.ExpungeUsers(emails, check_perms=False)
def PostDeleteUsersTask(self, **kwargs):
return self.handler(**kwargs)