Project import generated by Copybara.

GitOrigin-RevId: d9e9e3fb4e31372ec1fb43b178994ca78fa8fe70
diff --git a/framework/deleteusers.py b/framework/deleteusers.py
new file mode 100644
index 0000000..0c23ac5
--- /dev/null
+++ b/framework/deleteusers.py
@@ -0,0 +1,139 @@
+# Copyright 2019 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file.
+
+"""Cron and task handlers for syncing with wipeoute-lite and deleting users."""
+
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+
+import json
+import logging
+import httplib2
+
+from google.appengine.api import app_identity
+
+from businesslogic import work_env
+from framework import cloud_tasks_helpers
+from framework import framework_constants
+from framework import jsonfeed
+from framework import urls
+from oauth2client.client import GoogleCredentials
+
+WIPEOUT_ENDPOINT = 'https://emporia-pa.googleapis.com/v1/apps/%s'
+MAX_BATCH_SIZE = 10000
+MAX_DELETE_USERS_SIZE = 1000
+
+
+def authorize():
+  credentials = GoogleCredentials.get_application_default()
+  credentials = credentials.create_scoped(framework_constants.OAUTH_SCOPE)
+  return credentials.authorize(httplib2.Http(timeout=60))
+
+
+class WipeoutSyncCron(jsonfeed.InternalTask):
+  """Enqueue tasks for sending user lists to wipeout-lite and deleting deleted
+     users fetched from wipeout-lite."""
+
+  def HandleRequest(self, mr):
+    batch_param = mr.GetIntParam('batchsize', default_value=MAX_BATCH_SIZE)
+    # Use batch_param as batch_size unless it is None or 0.
+    batch_size = min(batch_param, MAX_BATCH_SIZE)
+    total_users = self.services.user.TotalUsersCount(mr.cnxn)
+    total_batches = int(total_users / batch_size)
+    # Add an extra batch to process remainder user emails.
+    if total_users % batch_size:
+      total_batches += 1
+    if not total_batches:
+      logging.info('No users to report.')
+      return
+
+    for i in range(total_batches):
+      params = dict(limit=batch_size, offset=i * batch_size)
+      task = cloud_tasks_helpers.generate_simple_task(
+          urls.SEND_WIPEOUT_USER_LISTS_TASK + '.do', params)
+      cloud_tasks_helpers.create_task(
+          task, queue=framework_constants.QUEUE_SEND_WIPEOUT_USER_LISTS)
+
+    task = cloud_tasks_helpers.generate_simple_task(
+        urls.DELETE_WIPEOUT_USERS_TASK + '.do', {})
+    cloud_tasks_helpers.create_task(
+        task, queue=framework_constants.QUEUE_FETCH_WIPEOUT_DELETED_USERS)
+
+
+class SendWipeoutUserListsTask(jsonfeed.InternalTask):
+  """Sends a batch of monorail users to wipeout-lite."""
+
+  def HandleRequest(self, mr):
+    limit = mr.GetIntParam('limit')
+    assert limit != None, 'Missing param limit'
+    offset = mr.GetIntParam('offset')
+    assert offset != None, 'Missing param offset'
+    emails = self.services.user.GetAllUserEmailsBatch(
+        mr.cnxn, limit=limit, offset=offset)
+    accounts = [{'id': email} for email in emails]
+    service = authorize()
+    self.sendUserLists(service, accounts)
+
+  def sendUserLists(self, service, accounts):
+    app_id = app_identity.get_application_id()
+    endpoint = WIPEOUT_ENDPOINT % app_id
+    resp, data = service.request(
+        '%s/verifiedaccounts' % endpoint,
+        method='POST',
+        headers={'Content-Type': 'application/json; charset=UTF-8'},
+        body=json.dumps(accounts))
+    logging.info(
+        'Received response, %s with contents, %s', resp, data)
+
+
+class DeleteWipeoutUsersTask(jsonfeed.InternalTask):
+  """Fetches deleted users from wipeout-lite and enqueues tasks to delete
+     those users from Monorail's DB."""
+
+  def HandleRequest(self, mr):
+    limit = mr.GetIntParam('limit', MAX_DELETE_USERS_SIZE)
+    limit = min(limit, MAX_DELETE_USERS_SIZE)
+    service = authorize()
+    deleted_user_data = self.fetchDeletedUsers(service)
+    deleted_emails = [user_object['id'] for user_object in deleted_user_data]
+    total_batches = int(len(deleted_emails) / limit)
+    if len(deleted_emails) % limit:
+      total_batches += 1
+
+    for i in range(total_batches):
+      start = i * limit
+      end = start + limit
+      params = dict(emails=','.join(deleted_emails[start:end]))
+      task = cloud_tasks_helpers.generate_simple_task(
+          urls.DELETE_USERS_TASK + '.do', params)
+      cloud_tasks_helpers.create_task(
+          task, queue=framework_constants.QUEUE_DELETE_USERS)
+
+  def fetchDeletedUsers(self, service):
+    app_id = app_identity.get_application_id()
+    endpoint = WIPEOUT_ENDPOINT % app_id
+    resp, data = service.request(
+        '%s/deletedaccounts' % endpoint,
+        method='GET',
+        headers={'Content-Type': 'application/json; charset=UTF-8'})
+    logging.info(
+        'Received response, %s with contents, %s', resp, data)
+    return json.loads(data)
+
+
+class DeleteUsersTask(jsonfeed.InternalTask):
+  """Deletes users from Monorail's DB."""
+
+  def HandleRequest(self, mr):
+    """Delete users with the emails given in the 'emails' param."""
+    emails = mr.GetListParam('emails', default_value=[])
+    assert len(emails) <= MAX_DELETE_USERS_SIZE, (
+        'We cannot delete more than %d users at once, current users: %d' %
+        (MAX_DELETE_USERS_SIZE, len(emails)))
+    if len(emails) == 0:
+      logging.info("No user emails found in deletion request")
+      return
+    with work_env.WorkEnv(mr, self.services) as we:
+      we.ExpungeUsers(emails, check_perms=False)