Project import generated by Copybara.

GitOrigin-RevId: d9e9e3fb4e31372ec1fb43b178994ca78fa8fe70
diff --git a/services/issue_svc.py b/services/issue_svc.py
new file mode 100644
index 0000000..eab85ab
--- /dev/null
+++ b/services/issue_svc.py
@@ -0,0 +1,2901 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+"""A set of functions that provide persistence for Monorail issue tracking.
+
+This module provides functions to get, update, create, and (in some
+cases) delete each type of business object.  It provides a logical
+persistence layer on top of an SQL database.
+
+Business objects are described in tracker_pb2.py and tracker_bizobj.py.
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+
+import collections
+import json
+import logging
+import os
+import time
+import uuid
+
+from google.appengine.api import app_identity
+from google.appengine.api import images
+from third_party import cloudstorage
+
+import settings
+from features import filterrules_helpers
+from framework import authdata
+from framework import exceptions
+from framework import framework_bizobj
+from framework import framework_constants
+from framework import framework_helpers
+from framework import gcs_helpers
+from framework import permissions
+from framework import sql
+from infra_libs import ts_mon
+from proto import project_pb2
+from proto import tracker_pb2
+from services import caches
+from services import tracker_fulltext
+from tracker import tracker_bizobj
+from tracker import tracker_helpers
+
+# TODO(jojwang): monorail:4693, remove this after all 'stable-full'
+# gates have been renamed to 'stable'.
+FLT_EQUIVALENT_GATES = {'stable-full': 'stable',
+                        'stable': 'stable-full'}
+
+ISSUE_TABLE_NAME = 'Issue'
+ISSUESUMMARY_TABLE_NAME = 'IssueSummary'
+ISSUE2LABEL_TABLE_NAME = 'Issue2Label'
+ISSUE2COMPONENT_TABLE_NAME = 'Issue2Component'
+ISSUE2CC_TABLE_NAME = 'Issue2Cc'
+ISSUE2NOTIFY_TABLE_NAME = 'Issue2Notify'
+ISSUE2FIELDVALUE_TABLE_NAME = 'Issue2FieldValue'
+COMMENT_TABLE_NAME = 'Comment'
+COMMENTCONTENT_TABLE_NAME = 'CommentContent'
+COMMENTIMPORTER_TABLE_NAME = 'CommentImporter'
+ATTACHMENT_TABLE_NAME = 'Attachment'
+ISSUERELATION_TABLE_NAME = 'IssueRelation'
+DANGLINGRELATION_TABLE_NAME = 'DanglingIssueRelation'
+ISSUEUPDATE_TABLE_NAME = 'IssueUpdate'
+ISSUEFORMERLOCATIONS_TABLE_NAME = 'IssueFormerLocations'
+REINDEXQUEUE_TABLE_NAME = 'ReindexQueue'
+LOCALIDCOUNTER_TABLE_NAME = 'LocalIDCounter'
+ISSUESNAPSHOT_TABLE_NAME = 'IssueSnapshot'
+ISSUESNAPSHOT2CC_TABLE_NAME = 'IssueSnapshot2Cc'
+ISSUESNAPSHOT2COMPONENT_TABLE_NAME = 'IssueSnapshot2Component'
+ISSUESNAPSHOT2LABEL_TABLE_NAME = 'IssueSnapshot2Label'
+ISSUEPHASEDEF_TABLE_NAME = 'IssuePhaseDef'
+ISSUE2APPROVALVALUE_TABLE_NAME = 'Issue2ApprovalValue'
+ISSUEAPPROVAL2APPROVER_TABLE_NAME = 'IssueApproval2Approver'
+ISSUEAPPROVAL2COMMENT_TABLE_NAME = 'IssueApproval2Comment'
+
+
+ISSUE_COLS = [
+    'id', 'project_id', 'local_id', 'status_id', 'owner_id', 'reporter_id',
+    'opened', 'closed', 'modified',
+    'owner_modified', 'status_modified', 'component_modified',
+    'derived_owner_id', 'derived_status_id',
+    'deleted', 'star_count', 'attachment_count', 'is_spam']
+ISSUESUMMARY_COLS = ['issue_id', 'summary']
+ISSUE2LABEL_COLS = ['issue_id', 'label_id', 'derived']
+ISSUE2COMPONENT_COLS = ['issue_id', 'component_id', 'derived']
+ISSUE2CC_COLS = ['issue_id', 'cc_id', 'derived']
+ISSUE2NOTIFY_COLS = ['issue_id', 'email']
+ISSUE2FIELDVALUE_COLS = [
+    'issue_id', 'field_id', 'int_value', 'str_value', 'user_id', 'date_value',
+    'url_value', 'derived', 'phase_id']
+# Explicitly specify column 'Comment.id' to allow joins on other tables that
+# have an 'id' column.
+COMMENT_COLS = [
+    'Comment.id', 'issue_id', 'created', 'Comment.project_id', 'commenter_id',
+    'deleted_by', 'Comment.is_spam', 'is_description',
+    'commentcontent_id']  # Note: commentcontent_id must be last.
+COMMENTCONTENT_COLS = [
+    'CommentContent.id', 'content', 'inbound_message']
+COMMENTIMPORTER_COLS = ['comment_id', 'importer_id']
+ABBR_COMMENT_COLS = ['Comment.id', 'commenter_id', 'deleted_by',
+    'is_description']
+ATTACHMENT_COLS = [
+    'id', 'issue_id', 'comment_id', 'filename', 'filesize', 'mimetype',
+    'deleted', 'gcs_object_id']
+ISSUERELATION_COLS = ['issue_id', 'dst_issue_id', 'kind', 'rank']
+ABBR_ISSUERELATION_COLS = ['dst_issue_id', 'rank']
+DANGLINGRELATION_COLS = [
+    'issue_id', 'dst_issue_project', 'dst_issue_local_id',
+    'ext_issue_identifier', 'kind']
+ISSUEUPDATE_COLS = [
+    'id', 'issue_id', 'comment_id', 'field', 'old_value', 'new_value',
+    'added_user_id', 'removed_user_id', 'custom_field_name']
+ISSUEFORMERLOCATIONS_COLS = ['issue_id', 'project_id', 'local_id']
+REINDEXQUEUE_COLS = ['issue_id', 'created']
+ISSUESNAPSHOT_COLS = ['id', 'issue_id', 'shard', 'project_id', 'local_id',
+    'reporter_id', 'owner_id', 'status_id', 'period_start', 'period_end',
+    'is_open']
+ISSUESNAPSHOT2CC_COLS = ['issuesnapshot_id', 'cc_id']
+ISSUESNAPSHOT2COMPONENT_COLS = ['issuesnapshot_id', 'component_id']
+ISSUESNAPSHOT2LABEL_COLS = ['issuesnapshot_id', 'label_id']
+ISSUEPHASEDEF_COLS = ['id', 'name', 'rank']
+ISSUE2APPROVALVALUE_COLS = ['approval_id', 'issue_id', 'phase_id',
+                            'status', 'setter_id', 'set_on']
+ISSUEAPPROVAL2APPROVER_COLS = ['approval_id', 'approver_id', 'issue_id']
+ISSUEAPPROVAL2COMMENT_COLS = ['approval_id', 'comment_id']
+
+CHUNK_SIZE = 1000
+
+
+class IssueIDTwoLevelCache(caches.AbstractTwoLevelCache):
+  """Class to manage RAM and memcache for Issue IDs."""
+
+  def __init__(self, cache_manager, issue_service):
+    super(IssueIDTwoLevelCache, self).__init__(
+        cache_manager, 'issue_id', 'issue_id:', int,
+        max_size=settings.issue_cache_max_size)
+    self.issue_service = issue_service
+
+  def _MakeCache(self, cache_manager, kind, max_size=None):
+    """Override normal RamCache creation with ValueCentricRamCache."""
+    return caches.ValueCentricRamCache(cache_manager, kind, max_size=max_size)
+
+  def _DeserializeIssueIDs(self, project_local_issue_ids):
+    """Convert database rows into a dict {(project_id, local_id): issue_id}."""
+    return {(project_id, local_id): issue_id
+            for (project_id, local_id, issue_id) in project_local_issue_ids}
+
+  def FetchItems(self, cnxn, keys):
+    """On RAM and memcache miss, hit the database."""
+    local_ids_by_pid = collections.defaultdict(list)
+    for project_id, local_id in keys:
+      local_ids_by_pid[project_id].append(local_id)
+
+    where = []  # We OR per-project pairs of conditions together.
+    for project_id, local_ids_in_project in local_ids_by_pid.items():
+      term_str = ('(Issue.project_id = %%s AND Issue.local_id IN (%s))' %
+                  sql.PlaceHolders(local_ids_in_project))
+      where.append((term_str, [project_id] + local_ids_in_project))
+
+    rows = self.issue_service.issue_tbl.Select(
+        cnxn, cols=['project_id', 'local_id', 'id'],
+        where=where, or_where_conds=True)
+    return self._DeserializeIssueIDs(rows)
+
+  def _KeyToStr(self, key):
+    """This cache uses pairs of ints as keys. Convert them to strings."""
+    return '%d,%d' % key
+
+  def _StrToKey(self, key_str):
+    """This cache uses pairs of ints as keys. Convert them from strings."""
+    project_id_str, local_id_str = key_str.split(',')
+    return int(project_id_str), int(local_id_str)
+
+
+class IssueTwoLevelCache(caches.AbstractTwoLevelCache):
+  """Class to manage RAM and memcache for Issue PBs."""
+
+  def __init__(
+      self, cache_manager, issue_service, project_service, config_service):
+    super(IssueTwoLevelCache, self).__init__(
+        cache_manager, 'issue', 'issue:', tracker_pb2.Issue,
+        max_size=settings.issue_cache_max_size)
+    self.issue_service = issue_service
+    self.project_service = project_service
+    self.config_service = config_service
+
+  def _UnpackIssue(self, cnxn, issue_row):
+    """Partially construct an issue object using info from a DB row."""
+    (issue_id, project_id, local_id, status_id, owner_id, reporter_id,
+     opened, closed, modified, owner_modified, status_modified,
+     component_modified, derived_owner_id, derived_status_id,
+     deleted, star_count, attachment_count, is_spam) = issue_row
+
+    issue = tracker_pb2.Issue()
+    project = self.project_service.GetProject(cnxn, project_id)
+    issue.project_name = project.project_name
+    issue.issue_id = issue_id
+    issue.project_id = project_id
+    issue.local_id = local_id
+    if status_id is not None:
+      status = self.config_service.LookupStatus(cnxn, project_id, status_id)
+      issue.status = status
+    issue.owner_id = owner_id or 0
+    issue.reporter_id = reporter_id or 0
+    issue.derived_owner_id = derived_owner_id or 0
+    if derived_status_id is not None:
+      derived_status = self.config_service.LookupStatus(
+          cnxn, project_id, derived_status_id)
+      issue.derived_status = derived_status
+    issue.deleted = bool(deleted)
+    if opened:
+      issue.opened_timestamp = opened
+    if closed:
+      issue.closed_timestamp = closed
+    if modified:
+      issue.modified_timestamp = modified
+    if owner_modified:
+      issue.owner_modified_timestamp = owner_modified
+    if status_modified:
+      issue.status_modified_timestamp = status_modified
+    if component_modified:
+      issue.component_modified_timestamp = component_modified
+    issue.star_count = star_count
+    issue.attachment_count = attachment_count
+    issue.is_spam = bool(is_spam)
+    return issue
+
+  def _UnpackFieldValue(self, fv_row):
+    """Construct a field value object from a DB row."""
+    (issue_id, field_id, int_value, str_value, user_id, date_value, url_value,
+     derived, phase_id) = fv_row
+    fv = tracker_bizobj.MakeFieldValue(
+        field_id, int_value, str_value, user_id, date_value, url_value,
+        bool(derived), phase_id=phase_id)
+    return fv, issue_id
+
+  def _UnpackApprovalValue(self, av_row):
+    """Contruct an ApprovalValue PB from a DB row."""
+    (approval_id, issue_id, phase_id, status, setter_id, set_on) = av_row
+    if status:
+      status_enum = tracker_pb2.ApprovalStatus(status.upper())
+    else:
+      status_enum = tracker_pb2.ApprovalStatus.NOT_SET
+    av = tracker_pb2.ApprovalValue(
+        approval_id=approval_id, setter_id=setter_id, set_on=set_on,
+        status=status_enum, phase_id=phase_id)
+    return av, issue_id
+
+  def _UnpackPhase(self, phase_row):
+    """Construct a Phase PB from a DB row."""
+    (phase_id, name, rank) = phase_row
+    phase = tracker_pb2.Phase(
+        phase_id=phase_id, name=name, rank=rank)
+    return phase
+
+  def _DeserializeIssues(
+      self, cnxn, issue_rows, summary_rows, label_rows, component_rows,
+      cc_rows, notify_rows, fieldvalue_rows, relation_rows,
+      dangling_relation_rows, phase_rows, approvalvalue_rows,
+      av_approver_rows):
+    """Convert the given DB rows into a dict of Issue PBs."""
+    results_dict = {}
+    for issue_row in issue_rows:
+      issue = self._UnpackIssue(cnxn, issue_row)
+      results_dict[issue.issue_id] = issue
+
+    for issue_id, summary in summary_rows:
+      results_dict[issue_id].summary = summary
+
+    # TODO(jrobbins): it would be nice to order labels by rank and name.
+    for issue_id, label_id, derived in label_rows:
+      issue = results_dict.get(issue_id)
+      if not issue:
+        logging.info('Got label for an unknown issue: %r %r',
+                     label_rows, issue_rows)
+        continue
+      label = self.config_service.LookupLabel(cnxn, issue.project_id, label_id)
+      assert label, ('Label ID %r on IID %r not found in project %r' %
+                     (label_id, issue_id, issue.project_id))
+      if derived:
+        results_dict[issue_id].derived_labels.append(label)
+      else:
+        results_dict[issue_id].labels.append(label)
+
+    for issue_id, component_id, derived in component_rows:
+      if derived:
+        results_dict[issue_id].derived_component_ids.append(component_id)
+      else:
+        results_dict[issue_id].component_ids.append(component_id)
+
+    for issue_id, user_id, derived in cc_rows:
+      if derived:
+        results_dict[issue_id].derived_cc_ids.append(user_id)
+      else:
+        results_dict[issue_id].cc_ids.append(user_id)
+
+    for issue_id, email in notify_rows:
+      results_dict[issue_id].derived_notify_addrs.append(email)
+
+    for fv_row in fieldvalue_rows:
+      fv, issue_id = self._UnpackFieldValue(fv_row)
+      results_dict[issue_id].field_values.append(fv)
+
+    phases_by_id = {}
+    for phase_row in phase_rows:
+      phase = self._UnpackPhase(phase_row)
+      phases_by_id[phase.phase_id] = phase
+
+    approvers_dict = collections.defaultdict(list)
+    for approver_row in av_approver_rows:
+      approval_id, approver_id, issue_id = approver_row
+      approvers_dict[approval_id, issue_id].append(approver_id)
+
+    for av_row in approvalvalue_rows:
+      av, issue_id = self._UnpackApprovalValue(av_row)
+      av.approver_ids = approvers_dict[av.approval_id, issue_id]
+      results_dict[issue_id].approval_values.append(av)
+      if av.phase_id:
+        phase = phases_by_id[av.phase_id]
+        issue_phases = results_dict[issue_id].phases
+        if phase not in issue_phases:
+          issue_phases.append(phase)
+    # Order issue phases
+    for issue in results_dict.values():
+      if issue.phases:
+        issue.phases.sort(key=lambda phase: phase.rank)
+
+    for issue_id, dst_issue_id, kind, rank in relation_rows:
+      src_issue = results_dict.get(issue_id)
+      dst_issue = results_dict.get(dst_issue_id)
+      assert src_issue or dst_issue, (
+          'Neither source issue %r nor dest issue %r was found' %
+          (issue_id, dst_issue_id))
+      if src_issue:
+        if kind == 'blockedon':
+          src_issue.blocked_on_iids.append(dst_issue_id)
+          src_issue.blocked_on_ranks.append(rank)
+        elif kind == 'mergedinto':
+          src_issue.merged_into = dst_issue_id
+        else:
+          logging.info('unknown relation kind %r', kind)
+          continue
+
+      if dst_issue:
+        if kind == 'blockedon':
+          dst_issue.blocking_iids.append(issue_id)
+
+    for row in dangling_relation_rows:
+      issue_id, dst_issue_proj, dst_issue_id, ext_id, kind = row
+      src_issue = results_dict.get(issue_id)
+      if kind == 'blockedon':
+        src_issue.dangling_blocked_on_refs.append(
+            tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj,
+                dst_issue_id, ext_id))
+      elif kind == 'blocking':
+        src_issue.dangling_blocking_refs.append(
+            tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj, dst_issue_id,
+                ext_id))
+      elif kind == 'mergedinto':
+        src_issue.merged_into_external = ext_id
+      else:
+        logging.warn('unhandled danging relation kind %r', kind)
+        continue
+
+    return results_dict
+
+  # Note: sharding is used to here to allow us to load issues from the replicas
+  # without placing load on the primary DB. Writes are not sharded.
+  # pylint: disable=arguments-differ
+  def FetchItems(self, cnxn, issue_ids, shard_id=None):
+    """Retrieve and deserialize issues."""
+    issue_rows = self.issue_service.issue_tbl.Select(
+        cnxn, cols=ISSUE_COLS, id=issue_ids, shard_id=shard_id)
+
+    summary_rows = self.issue_service.issuesummary_tbl.Select(
+        cnxn, cols=ISSUESUMMARY_COLS, shard_id=shard_id, issue_id=issue_ids)
+    label_rows = self.issue_service.issue2label_tbl.Select(
+        cnxn, cols=ISSUE2LABEL_COLS, shard_id=shard_id, issue_id=issue_ids)
+    component_rows = self.issue_service.issue2component_tbl.Select(
+        cnxn, cols=ISSUE2COMPONENT_COLS, shard_id=shard_id, issue_id=issue_ids)
+    cc_rows = self.issue_service.issue2cc_tbl.Select(
+        cnxn, cols=ISSUE2CC_COLS, shard_id=shard_id, issue_id=issue_ids)
+    notify_rows = self.issue_service.issue2notify_tbl.Select(
+        cnxn, cols=ISSUE2NOTIFY_COLS, shard_id=shard_id, issue_id=issue_ids)
+    fieldvalue_rows = self.issue_service.issue2fieldvalue_tbl.Select(
+        cnxn, cols=ISSUE2FIELDVALUE_COLS, shard_id=shard_id,
+        issue_id=issue_ids)
+    approvalvalue_rows = self.issue_service.issue2approvalvalue_tbl.Select(
+        cnxn, cols=ISSUE2APPROVALVALUE_COLS, issue_id=issue_ids)
+    phase_ids = [av_row[2] for av_row in approvalvalue_rows]
+    phase_rows = []
+    if phase_ids:
+      phase_rows = self.issue_service.issuephasedef_tbl.Select(
+          cnxn, cols=ISSUEPHASEDEF_COLS, id=list(set(phase_ids)))
+    av_approver_rows = self.issue_service.issueapproval2approver_tbl.Select(
+        cnxn, cols=ISSUEAPPROVAL2APPROVER_COLS, issue_id=issue_ids)
+    if issue_ids:
+      ph = sql.PlaceHolders(issue_ids)
+      blocked_on_rows = self.issue_service.issuerelation_tbl.Select(
+          cnxn, cols=ISSUERELATION_COLS, issue_id=issue_ids, kind='blockedon',
+          order_by=[('issue_id', []), ('rank DESC', []), ('dst_issue_id', [])])
+      blocking_rows = self.issue_service.issuerelation_tbl.Select(
+          cnxn, cols=ISSUERELATION_COLS, dst_issue_id=issue_ids,
+          kind='blockedon', order_by=[('issue_id', []), ('dst_issue_id', [])])
+      unique_blocking = tuple(
+          row for row in blocking_rows if row not in blocked_on_rows)
+      merge_rows = self.issue_service.issuerelation_tbl.Select(
+          cnxn, cols=ISSUERELATION_COLS,
+          where=[('(issue_id IN (%s) OR dst_issue_id IN (%s))' % (ph, ph),
+                  issue_ids + issue_ids),
+                 ('kind != %s', ['blockedon'])])
+      relation_rows = blocked_on_rows + unique_blocking + merge_rows
+      dangling_relation_rows = self.issue_service.danglingrelation_tbl.Select(
+          cnxn, cols=DANGLINGRELATION_COLS, issue_id=issue_ids)
+    else:
+      relation_rows = []
+      dangling_relation_rows = []
+
+    issue_dict = self._DeserializeIssues(
+        cnxn, issue_rows, summary_rows, label_rows, component_rows, cc_rows,
+        notify_rows, fieldvalue_rows, relation_rows, dangling_relation_rows,
+        phase_rows, approvalvalue_rows, av_approver_rows)
+    logging.info('IssueTwoLevelCache.FetchItems returning: %r', issue_dict)
+    return issue_dict
+
+
+class CommentTwoLevelCache(caches.AbstractTwoLevelCache):
+  """Class to manage RAM and memcache for IssueComment PBs."""
+
+  def __init__(self, cache_manager, issue_svc):
+    super(CommentTwoLevelCache, self).__init__(
+        cache_manager, 'comment', 'comment:', tracker_pb2.IssueComment,
+        max_size=settings.comment_cache_max_size)
+    self.issue_svc = issue_svc
+
+  # pylint: disable=arguments-differ
+  def FetchItems(self, cnxn, keys, shard_id=None):
+    comment_rows = self.issue_svc.comment_tbl.Select(cnxn,
+        cols=COMMENT_COLS, id=keys, shard_id=shard_id)
+
+    if len(comment_rows) < len(keys):
+      self.issue_svc.replication_lag_retries.increment()
+      logging.info('issue3755: expected %d, but got %d rows from shard %d',
+                   len(keys), len(comment_rows), shard_id)
+      shard_id = None  # Will use Primary DB.
+      comment_rows = self.issue_svc.comment_tbl.Select(
+          cnxn, cols=COMMENT_COLS, id=keys, shard_id=None)
+      logging.info(
+          'Retry got %d comment rows from the primary DB', len(comment_rows))
+
+    cids = [row[0] for row in comment_rows]
+    commentcontent_ids = [row[-1] for row in comment_rows]
+    content_rows = self.issue_svc.commentcontent_tbl.Select(
+        cnxn, cols=COMMENTCONTENT_COLS, id=commentcontent_ids,
+        shard_id=shard_id)
+    approval_rows = self.issue_svc.issueapproval2comment_tbl.Select(
+        cnxn, cols=ISSUEAPPROVAL2COMMENT_COLS, comment_id=cids)
+    amendment_rows = self.issue_svc.issueupdate_tbl.Select(
+        cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids, shard_id=shard_id)
+    attachment_rows = self.issue_svc.attachment_tbl.Select(
+        cnxn, cols=ATTACHMENT_COLS, comment_id=cids, shard_id=shard_id)
+    importer_rows = self.issue_svc.commentimporter_tbl.Select(
+        cnxn, cols=COMMENTIMPORTER_COLS, comment_id=cids, shard_id=shard_id)
+
+    comments = self.issue_svc._DeserializeComments(
+        comment_rows, content_rows, amendment_rows, attachment_rows,
+        approval_rows, importer_rows)
+
+    comments_dict = {}
+    for comment in comments:
+      comments_dict[comment.id] = comment
+
+    return comments_dict
+
+
+class IssueService(object):
+  """The persistence layer for Monorail's issues, comments, and attachments."""
+  spam_labels = ts_mon.CounterMetric(
+      'monorail/issue_svc/spam_label',
+      'Issues created, broken down by spam label.',
+      [ts_mon.StringField('type')])
+  replication_lag_retries = ts_mon.CounterMetric(
+      'monorail/issue_svc/replication_lag_retries',
+      'Counts times that loading comments from a replica failed',
+      [])
+  issue_creations = ts_mon.CounterMetric(
+      'monorail/issue_svc/issue_creations',
+      'Counts times that issues were created',
+      [])
+  comment_creations = ts_mon.CounterMetric(
+      'monorail/issue_svc/comment_creations',
+      'Counts times that comments were created',
+      [])
+
+  def __init__(self, project_service, config_service, cache_manager,
+      chart_service):
+    """Initialize this object so that it is ready to use.
+
+    Args:
+      project_service: services object for project info.
+      config_service: services object for tracker configuration info.
+      cache_manager: local cache with distributed invalidation.
+      chart_service (ChartService): An instance of ChartService.
+    """
+    # Tables that represent issue data.
+    self.issue_tbl = sql.SQLTableManager(ISSUE_TABLE_NAME)
+    self.issuesummary_tbl = sql.SQLTableManager(ISSUESUMMARY_TABLE_NAME)
+    self.issue2label_tbl = sql.SQLTableManager(ISSUE2LABEL_TABLE_NAME)
+    self.issue2component_tbl = sql.SQLTableManager(ISSUE2COMPONENT_TABLE_NAME)
+    self.issue2cc_tbl = sql.SQLTableManager(ISSUE2CC_TABLE_NAME)
+    self.issue2notify_tbl = sql.SQLTableManager(ISSUE2NOTIFY_TABLE_NAME)
+    self.issue2fieldvalue_tbl = sql.SQLTableManager(ISSUE2FIELDVALUE_TABLE_NAME)
+    self.issuerelation_tbl = sql.SQLTableManager(ISSUERELATION_TABLE_NAME)
+    self.danglingrelation_tbl = sql.SQLTableManager(DANGLINGRELATION_TABLE_NAME)
+    self.issueformerlocations_tbl = sql.SQLTableManager(
+        ISSUEFORMERLOCATIONS_TABLE_NAME)
+    self.issuesnapshot_tbl = sql.SQLTableManager(ISSUESNAPSHOT_TABLE_NAME)
+    self.issuesnapshot2cc_tbl = sql.SQLTableManager(
+        ISSUESNAPSHOT2CC_TABLE_NAME)
+    self.issuesnapshot2component_tbl = sql.SQLTableManager(
+        ISSUESNAPSHOT2COMPONENT_TABLE_NAME)
+    self.issuesnapshot2label_tbl = sql.SQLTableManager(
+        ISSUESNAPSHOT2LABEL_TABLE_NAME)
+    self.issuephasedef_tbl = sql.SQLTableManager(ISSUEPHASEDEF_TABLE_NAME)
+    self.issue2approvalvalue_tbl = sql.SQLTableManager(
+        ISSUE2APPROVALVALUE_TABLE_NAME)
+    self.issueapproval2approver_tbl = sql.SQLTableManager(
+        ISSUEAPPROVAL2APPROVER_TABLE_NAME)
+    self.issueapproval2comment_tbl = sql.SQLTableManager(
+        ISSUEAPPROVAL2COMMENT_TABLE_NAME)
+
+    # Tables that represent comments.
+    self.comment_tbl = sql.SQLTableManager(COMMENT_TABLE_NAME)
+    self.commentcontent_tbl = sql.SQLTableManager(COMMENTCONTENT_TABLE_NAME)
+    self.commentimporter_tbl = sql.SQLTableManager(COMMENTIMPORTER_TABLE_NAME)
+    self.issueupdate_tbl = sql.SQLTableManager(ISSUEUPDATE_TABLE_NAME)
+    self.attachment_tbl = sql.SQLTableManager(ATTACHMENT_TABLE_NAME)
+
+    # Tables for cron tasks.
+    self.reindexqueue_tbl = sql.SQLTableManager(REINDEXQUEUE_TABLE_NAME)
+
+    # Tables for generating sequences of local IDs.
+    self.localidcounter_tbl = sql.SQLTableManager(LOCALIDCOUNTER_TABLE_NAME)
+
+    # Like a dictionary {(project_id, local_id): issue_id}
+    # Use value centric cache here because we cannot store a tuple in the
+    # Invalidate table.
+    self.issue_id_2lc = IssueIDTwoLevelCache(cache_manager, self)
+    # Like a dictionary {issue_id: issue}
+    self.issue_2lc = IssueTwoLevelCache(
+        cache_manager, self, project_service, config_service)
+
+    # Like a dictionary {comment_id: comment)
+    self.comment_2lc = CommentTwoLevelCache(
+        cache_manager, self)
+
+    self._config_service = config_service
+    self.chart_service = chart_service
+
+  ### Issue ID lookups
+
+  def LookupIssueIDsFollowMoves(self, cnxn, project_local_id_pairs):
+    # type: (MonorailConnection, Sequence[Tuple(int, int)]) ->
+    #     (Sequence[int], Sequence[Tuple(int, int)])
+    """Find the global issue IDs given the project ID and local ID of each.
+
+    If any (project_id, local_id) pairs refer to an issue that has been moved,
+    the issue ID will still be returned.
+
+    Args:
+      cnxn: Monorail connection.
+      project_local_id_pairs: (project_id, local_id) pairs to look up.
+
+    Returns:
+      A tuple of two items.
+      1. A sequence of global issue IDs in the `project_local_id_pairs` order.
+      2. A sequence of (project_id, local_id) containing each pair provided
+         for which no matching issue is found.
+    """
+
+    issue_id_dict, misses = self.issue_id_2lc.GetAll(
+        cnxn, project_local_id_pairs)
+    for miss in misses:
+      project_id, local_id = miss
+      issue_id = int(
+          self.issueformerlocations_tbl.SelectValue(
+              cnxn,
+              'issue_id',
+              default=0,
+              project_id=project_id,
+              local_id=local_id))
+      if issue_id:
+        misses.remove(miss)
+        issue_id_dict[miss] = issue_id
+    # Put the Issue IDs in the order specified by project_local_id_pairs
+    issue_ids = [
+        issue_id_dict[pair]
+        for pair in project_local_id_pairs
+        if pair in issue_id_dict
+    ]
+
+    return issue_ids, misses
+
+  def LookupIssueIDs(self, cnxn, project_local_id_pairs):
+    """Find the global issue IDs given the project ID and local ID of each."""
+    issue_id_dict, misses = self.issue_id_2lc.GetAll(
+        cnxn, project_local_id_pairs)
+
+    # Put the Issue IDs in the order specified by project_local_id_pairs
+    issue_ids = [issue_id_dict[pair] for pair in project_local_id_pairs
+                 if pair in issue_id_dict]
+
+    return issue_ids, misses
+
+  def LookupIssueID(self, cnxn, project_id, local_id):
+    """Find the global issue ID given the project ID and local ID."""
+    issue_ids, _misses = self.LookupIssueIDs(cnxn, [(project_id, local_id)])
+    try:
+      return issue_ids[0]
+    except IndexError:
+      raise exceptions.NoSuchIssueException()
+
+  def ResolveIssueRefs(
+      self, cnxn, ref_projects, default_project_name, refs):
+    """Look up all the referenced issues and return their issue_ids.
+
+    Args:
+      cnxn: connection to SQL database.
+      ref_projects: pre-fetched dict {project_name: project} of all projects
+          mentioned in the refs as well as the default project.
+      default_project_name: string name of the current project, this is used
+          when the project_name in a ref is None.
+      refs: list of (project_name, local_id) pairs.  These are parsed from
+          textual references in issue descriptions, comments, and the input
+          in the blocked-on field.
+
+    Returns:
+      A list of issue_ids for all the referenced issues.  References to issues
+      in deleted projects and any issues not found are simply ignored.
+    """
+    if not refs:
+      return [], []
+
+    project_local_id_pairs = []
+    for project_name, local_id in refs:
+      project = ref_projects.get(project_name or default_project_name)
+      if not project or project.state == project_pb2.ProjectState.DELETABLE:
+        continue  # ignore any refs to issues in deleted projects
+      project_local_id_pairs.append((project.project_id, local_id))
+
+    return self.LookupIssueIDs(cnxn, project_local_id_pairs)  # tuple
+
+  def LookupIssueRefs(self, cnxn, issue_ids):
+    """Return {issue_id: (project_name, local_id)} for each issue_id."""
+    issue_dict, _misses = self.GetIssuesDict(cnxn, issue_ids)
+    return {
+      issue_id: (issue.project_name, issue.local_id)
+      for issue_id, issue in issue_dict.items()}
+
+  ### Issue objects
+
+  def CreateIssue(
+      self,
+      cnxn,
+      services,
+      issue,
+      marked_description,
+      attachments=None,
+      index_now=False,
+      importer_id=None):
+    """Create and store a new issue with all the given information.
+
+    Args:
+      cnxn: connection to SQL database.
+      services: persistence layer for users, issues, and projects.
+      issue: Issue PB to create.
+      marked_description: issue description with initial HTML markup.
+      attachments: [(filename, contents, mimetype),...] attachments uploaded at
+          the time the comment was made.
+      index_now: True if the issue should be updated in the full text index.
+      importer_id: optional user ID of API client importing issues for users.
+
+    Returns:
+      A tuple (the newly created Issue PB and Comment PB for the
+      issue description).
+    """
+    project_id = issue.project_id
+    reporter_id = issue.reporter_id
+    timestamp = issue.opened_timestamp
+    config = self._config_service.GetProjectConfig(cnxn, project_id)
+
+    iids_to_invalidate = set()
+    if len(issue.blocked_on_iids) != 0:
+      iids_to_invalidate.update(issue.blocked_on_iids)
+    if len(issue.blocking_iids) != 0:
+      iids_to_invalidate.update(issue.blocking_iids)
+
+    comment = self._MakeIssueComment(
+        project_id, reporter_id, marked_description,
+        attachments=attachments, timestamp=timestamp,
+        is_description=True, importer_id=importer_id)
+
+    reporter = services.user.GetUser(cnxn, reporter_id)
+    project = services.project.GetProject(cnxn, project_id)
+    reporter_auth = authdata.AuthData.FromUserID(cnxn, reporter_id, services)
+    is_project_member = framework_bizobj.UserIsInProject(
+        project, reporter_auth.effective_ids)
+    classification = services.spam.ClassifyIssue(
+        issue, comment, reporter, is_project_member)
+
+    if classification['confidence_is_spam'] > settings.classifier_spam_thresh:
+      issue.is_spam = True
+      predicted_label = 'spam'
+    else:
+      predicted_label = 'ham'
+
+    logging.info('classified new issue as %s' % predicted_label)
+    self.spam_labels.increment({'type': predicted_label})
+
+    # Create approval surveys
+    approval_comments = []
+    if len(issue.approval_values) != 0:
+      approval_defs_by_id = {ad.approval_id: ad for ad in config.approval_defs}
+      for av in issue.approval_values:
+        ad = approval_defs_by_id.get(av.approval_id)
+        if ad:
+          survey = ''
+          if ad.survey:
+            questions = ad.survey.split('\n')
+            survey = '\n'.join(['<b>' + q + '</b>' for q in questions])
+          approval_comments.append(self._MakeIssueComment(
+              project_id, reporter_id, survey, timestamp=timestamp,
+              is_description=True, approval_id=ad.approval_id))
+        else:
+          logging.info('Could not find ApprovalDef with approval_id %r',
+              av.approval_id)
+
+    issue.local_id = self.AllocateNextLocalID(cnxn, project_id)
+    self.issue_creations.increment()
+    issue_id = self.InsertIssue(cnxn, issue)
+    comment.issue_id = issue_id
+    self.InsertComment(cnxn, comment)
+    for approval_comment in approval_comments:
+      approval_comment.issue_id = issue_id
+      self.InsertComment(cnxn, approval_comment)
+
+    issue.issue_id = issue_id
+
+    # ClassifyIssue only returns confidence_is_spam, but
+    # RecordClassifierIssueVerdict records confidence of
+    # ham or spam. Therefore if ham, invert score.
+    confidence = classification['confidence_is_spam']
+    if not issue.is_spam:
+      confidence = 1.0 - confidence
+
+    services.spam.RecordClassifierIssueVerdict(
+      cnxn, issue, predicted_label=='spam',
+      confidence, classification['failed_open'])
+
+    if permissions.HasRestrictions(issue, 'view'):
+      self._config_service.InvalidateMemcache(
+          [issue], key_prefix='nonviewable:')
+
+    # Add a comment to existing issues saying they are now blocking or
+    # blocked on this issue.
+    blocked_add_issues = self.GetIssues(cnxn, issue.blocked_on_iids)
+    for add_issue in blocked_add_issues:
+      self.CreateIssueComment(
+          cnxn, add_issue, reporter_id, content='',
+          amendments=[tracker_bizobj.MakeBlockingAmendment(
+              [(issue.project_name, issue.local_id)], [],
+              default_project_name=add_issue.project_name)])
+    blocking_add_issues = self.GetIssues(cnxn, issue.blocking_iids)
+    for add_issue in blocking_add_issues:
+      self.CreateIssueComment(
+          cnxn, add_issue, reporter_id, content='',
+          amendments=[tracker_bizobj.MakeBlockedOnAmendment(
+              [(issue.project_name, issue.local_id)], [],
+              default_project_name=add_issue.project_name)])
+
+    self._UpdateIssuesModified(
+        cnxn, iids_to_invalidate, modified_timestamp=timestamp)
+
+    if index_now:
+      tracker_fulltext.IndexIssues(
+          cnxn, [issue], services.user, self, self._config_service)
+    else:
+      self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
+
+    return issue, comment
+
+  def AllocateNewLocalIDs(self, cnxn, issues):
+    # Filter to just the issues that need new local IDs.
+    issues = [issue for issue in issues if issue.local_id < 0]
+
+    for issue in issues:
+      if issue.local_id < 0:
+        issue.local_id = self.AllocateNextLocalID(cnxn, issue.project_id)
+
+    self.UpdateIssues(cnxn, issues)
+
+    logging.info("AllocateNewLocalIDs")
+
+  def GetAllIssuesInProject(
+      self, cnxn, project_id, min_local_id=None, use_cache=True):
+    """Special query to efficiently get ALL issues in a project.
+
+    This is not done while the user is waiting, only by backround tasks.
+
+    Args:
+      cnxn: connection to SQL database.
+      project_id: the ID of the project.
+      min_local_id: optional int to start at.
+      use_cache: optional boolean to turn off using the cache.
+
+    Returns:
+      A list of Issue protocol buffers for all issues.
+    """
+    all_local_ids = self.GetAllLocalIDsInProject(
+        cnxn, project_id, min_local_id=min_local_id)
+    return self.GetIssuesByLocalIDs(
+        cnxn, project_id, all_local_ids, use_cache=use_cache)
+
+  def GetAnyOnHandIssue(self, issue_ids, start=None, end=None):
+    """Get any one issue from RAM or memcache, otherwise return None."""
+    return self.issue_2lc.GetAnyOnHandItem(issue_ids, start=start, end=end)
+
+  def GetIssuesDict(self, cnxn, issue_ids, use_cache=True, shard_id=None):
+    # type: (MonorailConnection, Collection[int], Optional[Boolean],
+    #     Optional[int]) -> (Dict[int, Issue], Sequence[int])
+    """Get a dict {iid: issue} from the DB or cache.
+
+    Returns:
+      A dict {iid: issue} from the DB or cache.
+      A sequence of iid that could not be found.
+    """
+    issue_dict, missed_iids = self.issue_2lc.GetAll(
+        cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id)
+    if not use_cache:
+      for issue in issue_dict.values():
+        issue.assume_stale = False
+    return issue_dict, missed_iids
+
+  def GetIssues(self, cnxn, issue_ids, use_cache=True, shard_id=None):
+    # type: (MonorailConnection, Sequence[int], Optional[Boolean],
+    #     Optional[int]) -> (Sequence[int])
+    """Get a list of Issue PBs from the DB or cache.
+
+    Args:
+      cnxn: connection to SQL database.
+      issue_ids: integer global issue IDs of the issues.
+      use_cache: optional boolean to turn off using the cache.
+      shard_id: optional int shard_id to limit retrieval.
+
+    Returns:
+      A list of Issue PBs in the same order as the given issue_ids.
+    """
+    issue_dict, _misses = self.GetIssuesDict(
+        cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id)
+
+    # Return a list that is ordered the same as the given issue_ids.
+    issue_list = [issue_dict[issue_id] for issue_id in issue_ids
+                  if issue_id in issue_dict]
+
+    return issue_list
+
+  def GetIssue(self, cnxn, issue_id, use_cache=True):
+    """Get one Issue PB from the DB.
+
+    Args:
+      cnxn: connection to SQL database.
+      issue_id: integer global issue ID of the issue.
+      use_cache: optional boolean to turn off using the cache.
+
+    Returns:
+      The requested Issue protocol buffer.
+
+    Raises:
+      NoSuchIssueException: the issue was not found.
+    """
+    issues = self.GetIssues(cnxn, [issue_id], use_cache=use_cache)
+    try:
+      return issues[0]
+    except IndexError:
+      raise exceptions.NoSuchIssueException()
+
+  def GetIssuesByLocalIDs(
+      self, cnxn, project_id, local_id_list, use_cache=True, shard_id=None):
+    """Get all the requested issues.
+
+    Args:
+      cnxn: connection to SQL database.
+      project_id: int ID of the project to which the issues belong.
+      local_id_list: list of integer local IDs for the requested issues.
+      use_cache: optional boolean to turn off using the cache.
+      shard_id: optional int shard_id to choose a replica.
+
+    Returns:
+      List of Issue PBs for the requested issues.  The result Issues
+      will be ordered in the same order as local_id_list.
+    """
+    issue_ids_to_fetch, _misses = self.LookupIssueIDs(
+        cnxn, [(project_id, local_id) for local_id in local_id_list])
+    issues = self.GetIssues(
+        cnxn, issue_ids_to_fetch, use_cache=use_cache, shard_id=shard_id)
+    return issues
+
+  def GetIssueByLocalID(self, cnxn, project_id, local_id, use_cache=True):
+    """Get one Issue PB from the DB.
+
+    Args:
+      cnxn: connection to SQL database.
+      project_id: the ID of the project to which the issue belongs.
+      local_id: integer local ID of the issue.
+      use_cache: optional boolean to turn off using the cache.
+
+    Returns:
+      The requested Issue protocol buffer.
+    """
+    issues = self.GetIssuesByLocalIDs(
+        cnxn, project_id, [local_id], use_cache=use_cache)
+    try:
+      return issues[0]
+    except IndexError:
+      raise exceptions.NoSuchIssueException(
+          'The issue %s:%d does not exist.' % (project_id, local_id))
+
+  def GetOpenAndClosedIssues(self, cnxn, issue_ids):
+    """Return the requested issues in separate open and closed lists.
+
+    Args:
+      cnxn: connection to SQL database.
+      issue_ids: list of int issue issue_ids.
+
+    Returns:
+      A pair of lists, the first with open issues, second with closed issues.
+    """
+    if not issue_ids:
+      return [], []  # make one common case efficient
+
+    issues = self.GetIssues(cnxn, issue_ids)
+    project_ids = {issue.project_id for issue in issues}
+    configs = self._config_service.GetProjectConfigs(cnxn, project_ids)
+    open_issues = []
+    closed_issues = []
+    for issue in issues:
+      config = configs[issue.project_id]
+      if tracker_helpers.MeansOpenInProject(
+          tracker_bizobj.GetStatus(issue), config):
+        open_issues.append(issue)
+      else:
+        closed_issues.append(issue)
+
+    return open_issues, closed_issues
+
+  # TODO(crbug.com/monorail/7822): Delete this method when V0 API retired.
+  def GetCurrentLocationOfMovedIssue(self, cnxn, project_id, local_id):
+    """Return the current location of a moved issue based on old location."""
+    issue_id = int(self.issueformerlocations_tbl.SelectValue(
+        cnxn, 'issue_id', default=0, project_id=project_id, local_id=local_id))
+    if not issue_id:
+      return None, None
+    project_id, local_id = self.issue_tbl.SelectRow(
+        cnxn, cols=['project_id', 'local_id'], id=issue_id)
+    return project_id, local_id
+
+  def GetPreviousLocations(self, cnxn, issue):
+    """Get all the previous locations of an issue."""
+    location_rows = self.issueformerlocations_tbl.Select(
+        cnxn, cols=['project_id', 'local_id'], issue_id=issue.issue_id)
+    locations = [(pid, local_id) for (pid, local_id) in location_rows
+                 if pid != issue.project_id or local_id != issue.local_id]
+    return locations
+
+  def GetCommentsByUser(self, cnxn, user_id):
+    """Get all comments created by a user"""
+    comments = self.GetComments(cnxn, commenter_id=user_id,
+        is_description=False, limit=10000)
+    return comments
+
+  def GetIssueActivity(self, cnxn, num=50, before=None, after=None,
+      project_ids=None, user_ids=None, ascending=False):
+
+    if project_ids:
+      use_clause = (
+        'USE INDEX (project_id) USE INDEX FOR ORDER BY (project_id)')
+    elif user_ids:
+      use_clause = (
+        'USE INDEX (commenter_id) USE INDEX FOR ORDER BY (commenter_id)')
+    else:
+      use_clause = ''
+
+    # TODO(jrobbins): make this into a persist method.
+    # TODO(jrobbins): this really needs permission checking in SQL, which
+    # will be slow.
+    where_conds = [('Issue.id = Comment.issue_id', [])]
+    if project_ids is not None:
+      cond_str = 'Comment.project_id IN (%s)' % sql.PlaceHolders(project_ids)
+      where_conds.append((cond_str, project_ids))
+    if user_ids is not None:
+      cond_str = 'Comment.commenter_id IN (%s)' % sql.PlaceHolders(user_ids)
+      where_conds.append((cond_str, user_ids))
+
+    if before:
+      where_conds.append(('created < %s', [before]))
+    if after:
+      where_conds.append(('created > %s', [after]))
+    if ascending:
+      order_by = [('created', [])]
+    else:
+      order_by = [('created DESC', [])]
+
+    comments = self.GetComments(
+      cnxn, joins=[('Issue', [])], deleted_by=None, where=where_conds,
+      use_clause=use_clause, order_by=order_by, limit=num + 1)
+    return comments
+
+  def GetIssueIDsReportedByUser(self, cnxn, user_id):
+    """Get all issue IDs created by a user"""
+    rows = self.issue_tbl.Select(cnxn, cols=['id'], reporter_id=user_id,
+        limit=10000)
+    return [row[0] for row in rows]
+
+  def InsertIssue(self, cnxn, issue):
+    """Store the given issue in SQL.
+
+    Args:
+      cnxn: connection to SQL database.
+      issue: Issue PB to insert into the database.
+
+    Returns:
+      The int issue_id of the newly created issue.
+    """
+    status_id = self._config_service.LookupStatusID(
+        cnxn, issue.project_id, issue.status)
+    row = (issue.project_id, issue.local_id, status_id,
+           issue.owner_id or None,
+           issue.reporter_id,
+           issue.opened_timestamp,
+           issue.closed_timestamp,
+           issue.modified_timestamp,
+           issue.owner_modified_timestamp,
+           issue.status_modified_timestamp,
+           issue.component_modified_timestamp,
+           issue.derived_owner_id or None,
+           self._config_service.LookupStatusID(
+               cnxn, issue.project_id, issue.derived_status),
+           bool(issue.deleted),
+           issue.star_count, issue.attachment_count,
+           issue.is_spam)
+    # ISSUE_COLs[1:] to skip setting the ID
+    # Insert into the Primary DB.
+    generated_ids = self.issue_tbl.InsertRows(
+        cnxn, ISSUE_COLS[1:], [row], commit=False, return_generated_ids=True)
+    issue_id = generated_ids[0]
+    issue.issue_id = issue_id
+    self.issue_tbl.Update(
+      cnxn, {'shard': issue_id % settings.num_logical_shards},
+      id=issue.issue_id, commit=False)
+
+    self._UpdateIssuesSummary(cnxn, [issue], commit=False)
+    self._UpdateIssuesLabels(cnxn, [issue], commit=False)
+    self._UpdateIssuesFields(cnxn, [issue], commit=False)
+    self._UpdateIssuesComponents(cnxn, [issue], commit=False)
+    self._UpdateIssuesCc(cnxn, [issue], commit=False)
+    self._UpdateIssuesNotify(cnxn, [issue], commit=False)
+    self._UpdateIssuesRelation(cnxn, [issue], commit=False)
+    self._UpdateIssuesApprovals(cnxn, issue, commit=False)
+    self.chart_service.StoreIssueSnapshots(cnxn, [issue], commit=False)
+    cnxn.Commit()
+    self._config_service.InvalidateMemcache([issue])
+
+    return issue_id
+
+  def UpdateIssues(
+      self, cnxn, issues, update_cols=None, just_derived=False, commit=True,
+      invalidate=True):
+    """Update the given issues in SQL.
+
+    Args:
+      cnxn: connection to SQL database.
+      issues: list of issues to update, these must have been loaded with
+          use_cache=False so that issue.assume_stale is False.
+      update_cols: optional list of just the field names to update.
+      just_derived: set to True when only updating derived fields.
+      commit: set to False to skip the DB commit and do it in the caller.
+      invalidate: set to False to leave cache invalidatation to the caller.
+    """
+    if not issues:
+      return
+
+    for issue in issues:  # slow, but mysql will not allow REPLACE rows.
+      assert not issue.assume_stale, (
+          'issue2514: Storing issue that might be stale: %r' % issue)
+      delta = {
+          'project_id': issue.project_id,
+          'local_id': issue.local_id,
+          'owner_id': issue.owner_id or None,
+          'status_id': self._config_service.LookupStatusID(
+              cnxn, issue.project_id, issue.status) or None,
+          'opened': issue.opened_timestamp,
+          'closed': issue.closed_timestamp,
+          'modified': issue.modified_timestamp,
+          'owner_modified': issue.owner_modified_timestamp,
+          'status_modified': issue.status_modified_timestamp,
+          'component_modified': issue.component_modified_timestamp,
+          'derived_owner_id': issue.derived_owner_id or None,
+          'derived_status_id': self._config_service.LookupStatusID(
+              cnxn, issue.project_id, issue.derived_status) or None,
+          'deleted': bool(issue.deleted),
+          'star_count': issue.star_count,
+          'attachment_count': issue.attachment_count,
+          'is_spam': issue.is_spam,
+          }
+      if update_cols is not None:
+        delta = {key: val for key, val in delta.items()
+                 if key in update_cols}
+      self.issue_tbl.Update(cnxn, delta, id=issue.issue_id, commit=False)
+
+    if not update_cols:
+      self._UpdateIssuesLabels(cnxn, issues, commit=False)
+      self._UpdateIssuesCc(cnxn, issues, commit=False)
+      self._UpdateIssuesFields(cnxn, issues, commit=False)
+      self._UpdateIssuesComponents(cnxn, issues, commit=False)
+      self._UpdateIssuesNotify(cnxn, issues, commit=False)
+      if not just_derived:
+        self._UpdateIssuesSummary(cnxn, issues, commit=False)
+        self._UpdateIssuesRelation(cnxn, issues, commit=False)
+
+    self.chart_service.StoreIssueSnapshots(cnxn, issues, commit=False)
+
+    iids_to_invalidate = [issue.issue_id for issue in issues]
+    if just_derived and invalidate:
+      self.issue_2lc.InvalidateAllKeys(cnxn, iids_to_invalidate)
+    elif invalidate:
+      self.issue_2lc.InvalidateKeys(cnxn, iids_to_invalidate)
+    if commit:
+      cnxn.Commit()
+    if invalidate:
+      self._config_service.InvalidateMemcache(issues)
+
+  def UpdateIssue(
+      self, cnxn, issue, update_cols=None, just_derived=False, commit=True,
+      invalidate=True):
+    """Update the given issue in SQL.
+
+    Args:
+      cnxn: connection to SQL database.
+      issue: the issue to update.
+      update_cols: optional list of just the field names to update.
+      just_derived: set to True when only updating derived fields.
+      commit: set to False to skip the DB commit and do it in the caller.
+      invalidate: set to False to leave cache invalidatation to the caller.
+    """
+    self.UpdateIssues(
+        cnxn, [issue], update_cols=update_cols, just_derived=just_derived,
+        commit=commit, invalidate=invalidate)
+
+  def _UpdateIssuesSummary(self, cnxn, issues, commit=True):
+    """Update the IssueSummary table rows for the given issues."""
+    self.issuesummary_tbl.InsertRows(
+        cnxn, ISSUESUMMARY_COLS,
+        [(issue.issue_id, issue.summary) for issue in issues],
+        replace=True, commit=commit)
+
+  def _UpdateIssuesLabels(self, cnxn, issues, commit=True):
+    """Update the Issue2Label table rows for the given issues."""
+    label_rows = []
+    for issue in issues:
+      issue_shard = issue.issue_id % settings.num_logical_shards
+      # TODO(jrobbins): If the user adds many novel labels in one issue update,
+      # that could be slow. Solution is to add all new labels in a batch first.
+      label_rows.extend(
+          (issue.issue_id,
+           self._config_service.LookupLabelID(cnxn, issue.project_id, label),
+           False,
+           issue_shard)
+          for label in issue.labels)
+      label_rows.extend(
+          (issue.issue_id,
+           self._config_service.LookupLabelID(cnxn, issue.project_id, label),
+           True,
+           issue_shard)
+          for label in issue.derived_labels)
+
+    self.issue2label_tbl.Delete(
+        cnxn, issue_id=[issue.issue_id for issue in issues],
+        commit=False)
+    self.issue2label_tbl.InsertRows(
+        cnxn, ISSUE2LABEL_COLS + ['issue_shard'],
+        label_rows, ignore=True, commit=commit)
+
+  def _UpdateIssuesFields(self, cnxn, issues, commit=True):
+    """Update the Issue2FieldValue table rows for the given issues."""
+    fieldvalue_rows = []
+    for issue in issues:
+      issue_shard = issue.issue_id % settings.num_logical_shards
+      for fv in issue.field_values:
+        fieldvalue_rows.append(
+            (issue.issue_id, fv.field_id, fv.int_value, fv.str_value,
+             fv.user_id or None, fv.date_value, fv.url_value, fv.derived,
+             fv.phase_id or None, issue_shard))
+
+    self.issue2fieldvalue_tbl.Delete(
+        cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
+    self.issue2fieldvalue_tbl.InsertRows(
+        cnxn, ISSUE2FIELDVALUE_COLS + ['issue_shard'],
+        fieldvalue_rows, commit=commit)
+
+  def _UpdateIssuesComponents(self, cnxn, issues, commit=True):
+    """Update the Issue2Component table rows for the given issues."""
+    issue2component_rows = []
+    for issue in issues:
+      issue_shard = issue.issue_id % settings.num_logical_shards
+      issue2component_rows.extend(
+          (issue.issue_id, component_id, False, issue_shard)
+          for component_id in issue.component_ids)
+      issue2component_rows.extend(
+          (issue.issue_id, component_id, True, issue_shard)
+          for component_id in issue.derived_component_ids)
+
+    self.issue2component_tbl.Delete(
+        cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
+    self.issue2component_tbl.InsertRows(
+        cnxn, ISSUE2COMPONENT_COLS + ['issue_shard'],
+        issue2component_rows, ignore=True, commit=commit)
+
+  def _UpdateIssuesCc(self, cnxn, issues, commit=True):
+    """Update the Issue2Cc table rows for the given issues."""
+    cc_rows = []
+    for issue in issues:
+      issue_shard = issue.issue_id % settings.num_logical_shards
+      cc_rows.extend(
+          (issue.issue_id, cc_id, False, issue_shard)
+          for cc_id in issue.cc_ids)
+      cc_rows.extend(
+          (issue.issue_id, cc_id, True, issue_shard)
+          for cc_id in issue.derived_cc_ids)
+
+    self.issue2cc_tbl.Delete(
+        cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
+    self.issue2cc_tbl.InsertRows(
+        cnxn, ISSUE2CC_COLS + ['issue_shard'],
+        cc_rows, ignore=True, commit=commit)
+
+  def _UpdateIssuesNotify(self, cnxn, issues, commit=True):
+    """Update the Issue2Notify table rows for the given issues."""
+    notify_rows = []
+    for issue in issues:
+      derived_rows = [[issue.issue_id, email]
+                      for email in issue.derived_notify_addrs]
+      notify_rows.extend(derived_rows)
+
+    self.issue2notify_tbl.Delete(
+        cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
+    self.issue2notify_tbl.InsertRows(
+        cnxn, ISSUE2NOTIFY_COLS, notify_rows, ignore=True, commit=commit)
+
+  def _UpdateIssuesRelation(self, cnxn, issues, commit=True):
+    """Update the IssueRelation table rows for the given issues."""
+    relation_rows = []
+    blocking_rows = []
+    dangling_relation_rows = []
+    for issue in issues:
+      for i, dst_issue_id in enumerate(issue.blocked_on_iids):
+        rank = issue.blocked_on_ranks[i]
+        relation_rows.append((issue.issue_id, dst_issue_id, 'blockedon', rank))
+      for dst_issue_id in issue.blocking_iids:
+        blocking_rows.append((dst_issue_id, issue.issue_id, 'blockedon'))
+      for dst_ref in issue.dangling_blocked_on_refs:
+        if dst_ref.ext_issue_identifier:
+          dangling_relation_rows.append((
+              issue.issue_id, None, None,
+              dst_ref.ext_issue_identifier, 'blockedon'))
+        else:
+          dangling_relation_rows.append((
+              issue.issue_id, dst_ref.project, dst_ref.issue_id,
+              None, 'blockedon'))
+      for dst_ref in issue.dangling_blocking_refs:
+        if dst_ref.ext_issue_identifier:
+          dangling_relation_rows.append((
+              issue.issue_id, None, None,
+              dst_ref.ext_issue_identifier, 'blocking'))
+        else:
+          dangling_relation_rows.append((
+              issue.issue_id, dst_ref.project, dst_ref.issue_id,
+              dst_ref.ext_issue_identifier, 'blocking'))
+      if issue.merged_into:
+        relation_rows.append((
+            issue.issue_id, issue.merged_into, 'mergedinto', None))
+      if issue.merged_into_external:
+        dangling_relation_rows.append((
+            issue.issue_id, None, None,
+            issue.merged_into_external, 'mergedinto'))
+
+    old_blocking = self.issuerelation_tbl.Select(
+        cnxn, cols=ISSUERELATION_COLS[:-1],
+        dst_issue_id=[issue.issue_id for issue in issues], kind='blockedon')
+    relation_rows.extend([
+      (row + (0,)) for row in blocking_rows if row not in old_blocking])
+    delete_rows = [row for row in old_blocking if row not in blocking_rows]
+
+    for issue_id, dst_issue_id, kind in delete_rows:
+      self.issuerelation_tbl.Delete(cnxn, issue_id=issue_id,
+          dst_issue_id=dst_issue_id, kind=kind, commit=False)
+    self.issuerelation_tbl.Delete(
+        cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
+    self.issuerelation_tbl.InsertRows(
+        cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit)
+    self.danglingrelation_tbl.Delete(
+        cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
+    self.danglingrelation_tbl.InsertRows(
+        cnxn, DANGLINGRELATION_COLS, dangling_relation_rows, ignore=True,
+        commit=commit)
+
+  def _UpdateIssuesModified(
+      self, cnxn, iids, modified_timestamp=None, invalidate=True):
+    """Store a modified timestamp for each of the specified issues."""
+    if not iids:
+      return
+    delta = {'modified': modified_timestamp or int(time.time())}
+    self.issue_tbl.Update(cnxn, delta, id=iids, commit=False)
+    if invalidate:
+      self.InvalidateIIDs(cnxn, iids)
+
+  def _UpdateIssuesApprovals(self, cnxn, issue, commit=True):
+    """Update the Issue2ApprovalValue table rows for the given issue."""
+    self.issue2approvalvalue_tbl.Delete(
+        cnxn, issue_id=issue.issue_id, commit=commit)
+    av_rows = [(av.approval_id, issue.issue_id, av.phase_id,
+                av.status.name.lower(), av.setter_id, av.set_on) for
+               av in issue.approval_values]
+    self.issue2approvalvalue_tbl.InsertRows(
+        cnxn, ISSUE2APPROVALVALUE_COLS, av_rows, commit=commit)
+
+    approver_rows = []
+    for av in issue.approval_values:
+      approver_rows.extend([(av.approval_id, approver_id, issue.issue_id)
+                            for approver_id in av.approver_ids])
+    self.issueapproval2approver_tbl.Delete(
+        cnxn, issue_id=issue.issue_id, commit=commit)
+    self.issueapproval2approver_tbl.InsertRows(
+        cnxn, ISSUEAPPROVAL2APPROVER_COLS, approver_rows, commit=commit)
+
+  def UpdateIssueStructure(self, cnxn, config, issue, template, reporter_id,
+                            comment_content, commit=True, invalidate=True):
+    """Converts the phases and approvals structure of the issue into the
+       structure of the given template."""
+    # TODO(jojwang): Remove Field defs that belong to any removed approvals.
+    approval_defs_by_id = {ad.approval_id: ad for ad in config.approval_defs}
+    issue_avs_by_id = {av.approval_id: av for av in issue.approval_values}
+
+    new_approval_surveys = []
+    new_issue_approvals = []
+
+    for template_av in template.approval_values:
+      existing_issue_av = issue_avs_by_id.get(template_av.approval_id)
+
+      # Update all approval surveys so latest ApprovalDef survey changes
+      # appear in the converted issue's approval values.
+      ad = approval_defs_by_id.get(template_av.approval_id)
+      new_av_approver_ids = []
+      if ad:
+        new_av_approver_ids = ad.approver_ids
+        new_approval_surveys.append(
+            self._MakeIssueComment(
+                issue.project_id, reporter_id, ad.survey,
+                is_description=True, approval_id=ad.approval_id))
+      else:
+        logging.info('ApprovalDef not found for approval %r', template_av)
+
+      # Keep approval values as-is if it exists in issue and template
+      if existing_issue_av:
+        new_av = tracker_bizobj.MakeApprovalValue(
+            existing_issue_av.approval_id,
+            approver_ids=existing_issue_av.approver_ids,
+            status=existing_issue_av.status,
+            setter_id=existing_issue_av.setter_id,
+            set_on=existing_issue_av.set_on,
+            phase_id=template_av.phase_id)
+        new_issue_approvals.append(new_av)
+      else:
+        new_av = tracker_bizobj.MakeApprovalValue(
+            template_av.approval_id, approver_ids=new_av_approver_ids,
+            status=template_av.status, phase_id=template_av.phase_id)
+        new_issue_approvals.append(new_av)
+
+    template_phase_by_name = {
+        phase.name.lower(): phase for phase in template.phases}
+    issue_phase_by_id = {phase.phase_id: phase for phase in issue.phases}
+    updated_fvs = []
+    # Trim issue FieldValues or update FieldValue phase_ids
+    for fv in issue.field_values:
+      # If a fv's phase has the same name as a template's phase, update
+      # the fv's phase_id to that of the template phase's. Otherwise,
+      # remove the fv.
+      if fv.phase_id:
+        issue_phase = issue_phase_by_id.get(fv.phase_id)
+        if issue_phase and issue_phase.name:
+          template_phase = template_phase_by_name.get(issue_phase.name.lower())
+          # TODO(jojwang): monorail:4693, remove this after all 'stable-full'
+          # gates have been renamed to 'stable'.
+          if not template_phase:
+            template_phase = template_phase_by_name.get(
+                FLT_EQUIVALENT_GATES.get(issue_phase.name.lower()))
+          if template_phase:
+            fv.phase_id = template_phase.phase_id
+            updated_fvs.append(fv)
+      # keep all fvs that do not belong to phases.
+      else:
+        updated_fvs.append(fv)
+
+    fd_names_by_id = {fd.field_id: fd.field_name for fd in config.field_defs}
+    amendment = tracker_bizobj.MakeApprovalStructureAmendment(
+        [fd_names_by_id.get(av.approval_id) for av in new_issue_approvals],
+        [fd_names_by_id.get(av.approval_id) for av in issue.approval_values])
+
+    # Update issue structure in RAM.
+    issue.approval_values = new_issue_approvals
+    issue.phases = template.phases
+    issue.field_values = updated_fvs
+
+    # Update issue structure in DB.
+    for survey in new_approval_surveys:
+      survey.issue_id = issue.issue_id
+      self.InsertComment(cnxn, survey, commit=False)
+    self._UpdateIssuesApprovals(cnxn, issue, commit=False)
+    self._UpdateIssuesFields(cnxn, [issue], commit=False)
+    comment_pb = self.CreateIssueComment(
+        cnxn, issue, reporter_id, comment_content,
+        amendments=[amendment], commit=False)
+
+    if commit:
+      cnxn.Commit()
+
+    if invalidate:
+      self.InvalidateIIDs(cnxn, [issue.issue_id])
+
+    return comment_pb
+
+  def DeltaUpdateIssue(
+      self, cnxn, services, reporter_id, project_id,
+      config, issue, delta, index_now=False, comment=None, attachments=None,
+      iids_to_invalidate=None, rules=None, predicate_asts=None,
+      is_description=False, timestamp=None, kept_attachments=None,
+      importer_id=None, inbound_message=None):
+    """Update the issue in the database and return a set of update tuples.
+
+    Args:
+      cnxn: connection to SQL database.
+      services: connections to persistence layer.
+      reporter_id: user ID of the user making this change.
+      project_id: int ID for the current project.
+      config: ProjectIssueConfig PB for this project.
+      issue: Issue PB of issue to update.
+      delta: IssueDelta object of fields to update.
+      index_now: True if the issue should be updated in the full text index.
+      comment: This should be the content of the comment
+          corresponding to this change.
+      attachments: List [(filename, contents, mimetype),...] of attachments.
+      iids_to_invalidate: optional set of issue IDs that need to be invalidated.
+          If provided, affected issues will be accumulated here and, the caller
+          must call InvalidateIIDs() afterwards.
+      rules: optional list of preloaded FilterRule PBs for this project.
+      predicate_asts: optional list of QueryASTs for the rules.  If rules are
+          provided, then predicate_asts should also be provided.
+      is_description: True if the comment is a new description for the issue.
+      timestamp: int timestamp set during testing, otherwise defaults to
+          int(time.time()).
+      kept_attachments: This should be a list of int attachment ids for
+          attachments kept from previous descriptions, if the comment is
+          a change to the issue description
+      importer_id: optional ID of user ID for an API client that is importing
+          issues and attributing them to other users.
+      inbound_message: optional string full text of an email that caused
+          this comment to be added.
+
+    Returns:
+      A tuple (amendments, comment_pb) with a list of Amendment PBs that
+      describe the set of metadata updates that the user made, and the
+      resulting IssueComment (or None if no comment was created).
+    """
+    timestamp = timestamp or int(time.time())
+    old_effective_owner = tracker_bizobj.GetOwnerId(issue)
+    old_effective_status = tracker_bizobj.GetStatus(issue)
+    old_components = set(issue.component_ids)
+
+    logging.info(
+        'Bulk edit to project_id %s issue.local_id %s, comment %r',
+        project_id, issue.local_id, comment)
+    if iids_to_invalidate is None:
+      iids_to_invalidate = set([issue.issue_id])
+      invalidate = True
+    else:
+      iids_to_invalidate.add(issue.issue_id)
+      invalidate = False  # Caller will do it.
+
+    # Store each updated value in the issue PB, and compute Update PBs
+    amendments, impacted_iids = tracker_bizobj.ApplyIssueDelta(
+        cnxn, self, issue, delta, config)
+    iids_to_invalidate.update(impacted_iids)
+
+    # If this was a no-op with no comment, bail out and don't save,
+    # invalidate, or re-index anything.
+    if (not amendments and (not comment or not comment.strip()) and
+        not attachments):
+      logging.info('No amendments, comment, attachments: this is a no-op.')
+      return [], None
+
+    # Note: no need to check for collisions when the user is doing a delta.
+
+    # update the modified_timestamp for any comment added, even if it was
+    # just a text comment with no issue fields changed.
+    issue.modified_timestamp = timestamp
+
+    # Update the closed timestamp before filter rules so that rules
+    # can test for closed_timestamp, and also after filter rules
+    # so that closed_timestamp will be set if the issue is closed by the rule.
+    tracker_helpers.UpdateClosedTimestamp(config, issue, old_effective_status)
+    if rules is None:
+      logging.info('Rules were not given')
+      rules = services.features.GetFilterRules(cnxn, config.project_id)
+      predicate_asts = filterrules_helpers.ParsePredicateASTs(
+          rules, config, [])
+
+    filterrules_helpers.ApplyGivenRules(
+        cnxn, services, issue, config, rules, predicate_asts)
+    tracker_helpers.UpdateClosedTimestamp(config, issue, old_effective_status)
+    if old_effective_owner != tracker_bizobj.GetOwnerId(issue):
+      issue.owner_modified_timestamp = timestamp
+    if old_effective_status != tracker_bizobj.GetStatus(issue):
+      issue.status_modified_timestamp = timestamp
+    if old_components != set(issue.component_ids):
+      issue.component_modified_timestamp = timestamp
+
+    # Store the issue in SQL.
+    self.UpdateIssue(cnxn, issue, commit=False, invalidate=False)
+
+    comment_pb = self.CreateIssueComment(
+        cnxn, issue, reporter_id, comment, amendments=amendments,
+        is_description=is_description, attachments=attachments, commit=False,
+        kept_attachments=kept_attachments, timestamp=timestamp,
+        importer_id=importer_id, inbound_message=inbound_message)
+    self._UpdateIssuesModified(
+        cnxn, iids_to_invalidate, modified_timestamp=issue.modified_timestamp,
+        invalidate=invalidate)
+
+    # Add a comment to the newly added issues saying they are now blocking
+    # this issue.
+    for add_issue in self.GetIssues(cnxn, delta.blocked_on_add):
+      self.CreateIssueComment(
+          cnxn, add_issue, reporter_id, content='',
+          amendments=[tracker_bizobj.MakeBlockingAmendment(
+              [(issue.project_name, issue.local_id)], [],
+              default_project_name=add_issue.project_name)],
+          timestamp=timestamp, importer_id=importer_id)
+    # Add a comment to the newly removed issues saying they are no longer
+    # blocking this issue.
+    for remove_issue in self.GetIssues(cnxn, delta.blocked_on_remove):
+      self.CreateIssueComment(
+          cnxn, remove_issue, reporter_id, content='',
+          amendments=[tracker_bizobj.MakeBlockingAmendment(
+              [], [(issue.project_name, issue.local_id)],
+              default_project_name=remove_issue.project_name)],
+           timestamp=timestamp, importer_id=importer_id)
+
+    # Add a comment to the newly added issues saying they are now blocked on
+    # this issue.
+    for add_issue in self.GetIssues(cnxn, delta.blocking_add):
+      self.CreateIssueComment(
+          cnxn, add_issue, reporter_id, content='',
+          amendments=[tracker_bizobj.MakeBlockedOnAmendment(
+              [(issue.project_name, issue.local_id)], [],
+              default_project_name=add_issue.project_name)],
+          timestamp=timestamp, importer_id=importer_id)
+    # Add a comment to the newly removed issues saying they are no longer
+    # blocked on this issue.
+    for remove_issue in self.GetIssues(cnxn, delta.blocking_remove):
+      self.CreateIssueComment(
+          cnxn, remove_issue, reporter_id, content='',
+          amendments=[tracker_bizobj.MakeBlockedOnAmendment(
+              [], [(issue.project_name, issue.local_id)],
+              default_project_name=remove_issue.project_name)],
+          timestamp=timestamp, importer_id=importer_id)
+
+    if not invalidate:
+      cnxn.Commit()
+
+    if index_now:
+      tracker_fulltext.IndexIssues(
+          cnxn, [issue], services.user_service, self, self._config_service)
+    else:
+      self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
+
+    return amendments, comment_pb
+
+  def InvalidateIIDs(self, cnxn, iids_to_invalidate):
+    """Invalidate the specified issues in the Invalidate table and memcache."""
+    issues_to_invalidate = self.GetIssues(cnxn, iids_to_invalidate)
+    self.InvalidateIssues(cnxn, issues_to_invalidate)
+
+  def InvalidateIssues(self, cnxn, issues):
+    """Invalidate the specified issues in the Invalidate table and memcache."""
+    iids = [issue.issue_id for issue in issues]
+    self.issue_2lc.InvalidateKeys(cnxn, iids)
+    self._config_service.InvalidateMemcache(issues)
+
+  def RelateIssues(self, cnxn, issue_relation_dict, commit=True):
+    """Update the IssueRelation table rows for the given relationships.
+
+    issue_relation_dict is a mapping of 'source' issues to 'destination' issues,
+    paired with the kind of relationship connecting the two.
+    """
+    relation_rows = []
+    for src_iid, dests in issue_relation_dict.items():
+      for dst_iid, kind in dests:
+        if kind == 'blocking':
+          relation_rows.append((dst_iid, src_iid, 'blockedon', 0))
+        elif kind == 'blockedon':
+          relation_rows.append((src_iid, dst_iid, 'blockedon', 0))
+        elif kind == 'mergedinto':
+          relation_rows.append((src_iid, dst_iid, 'mergedinto', None))
+
+    self.issuerelation_tbl.InsertRows(
+        cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit)
+
+  def CopyIssues(self, cnxn, dest_project, issues, user_service, copier_id):
+    """Copy the given issues into the destination project."""
+    created_issues = []
+    iids_to_invalidate = set()
+
+    for target_issue in issues:
+      assert not target_issue.assume_stale, (
+          'issue2514: Copying issue that might be stale: %r' % target_issue)
+      new_issue = tracker_pb2.Issue()
+      new_issue.project_id = dest_project.project_id
+      new_issue.project_name = dest_project.project_name
+      new_issue.summary = target_issue.summary
+      new_issue.labels.extend(target_issue.labels)
+      new_issue.field_values.extend(target_issue.field_values)
+      new_issue.reporter_id = copier_id
+
+      timestamp = int(time.time())
+      new_issue.opened_timestamp = timestamp
+      new_issue.modified_timestamp = timestamp
+
+      target_comments = self.GetCommentsForIssue(cnxn, target_issue.issue_id)
+      initial_summary_comment = target_comments[0]
+
+      # Note that blocking and merge_into are not copied.
+      if target_issue.blocked_on_iids:
+        blocked_on = target_issue.blocked_on_iids
+        iids_to_invalidate.update(blocked_on)
+        new_issue.blocked_on_iids = blocked_on
+
+      # Gather list of attachments from the target issue's summary comment.
+      # MakeIssueComments expects a list of [(filename, contents, mimetype),...]
+      attachments = []
+      for attachment in initial_summary_comment.attachments:
+        object_path = ('/' + app_identity.get_default_gcs_bucket_name() +
+                       attachment.gcs_object_id)
+        with cloudstorage.open(object_path, 'r') as f:
+          content = f.read()
+          attachments.append(
+              [attachment.filename, content, attachment.mimetype])
+
+      if attachments:
+        new_issue.attachment_count = len(attachments)
+
+      # Create the same summary comment as the target issue.
+      comment = self._MakeIssueComment(
+          dest_project.project_id, copier_id, initial_summary_comment.content,
+          attachments=attachments, timestamp=timestamp, is_description=True)
+
+      new_issue.local_id = self.AllocateNextLocalID(
+          cnxn, dest_project.project_id)
+      issue_id = self.InsertIssue(cnxn, new_issue)
+      comment.issue_id = issue_id
+      self.InsertComment(cnxn, comment)
+
+      if permissions.HasRestrictions(new_issue, 'view'):
+        self._config_service.InvalidateMemcache(
+            [new_issue], key_prefix='nonviewable:')
+
+      tracker_fulltext.IndexIssues(
+          cnxn, [new_issue], user_service, self, self._config_service)
+      created_issues.append(new_issue)
+
+    # The referenced issues are all modified when the relationship is added.
+    self._UpdateIssuesModified(
+      cnxn, iids_to_invalidate, modified_timestamp=timestamp)
+
+    return created_issues
+
+  def MoveIssues(self, cnxn, dest_project, issues, user_service):
+    """Move the given issues into the destination project."""
+    old_location_rows = [
+        (issue.issue_id, issue.project_id, issue.local_id)
+        for issue in issues]
+    moved_back_iids = set()
+
+    former_locations_in_project = self.issueformerlocations_tbl.Select(
+        cnxn, cols=ISSUEFORMERLOCATIONS_COLS,
+        project_id=dest_project.project_id,
+        issue_id=[issue.issue_id for issue in issues])
+    former_locations = {
+        issue_id: local_id
+        for issue_id, project_id, local_id in former_locations_in_project}
+
+    # Remove the issue id from issue_id_2lc so that it does not stay
+    # around in cache and memcache.
+    # The Key of IssueIDTwoLevelCache is (project_id, local_id).
+    self.issue_id_2lc.InvalidateKeys(
+        cnxn, [(issue.project_id, issue.local_id) for issue in issues])
+    self.InvalidateIssues(cnxn, issues)
+
+    for issue in issues:
+      if issue.issue_id in former_locations:
+        dest_id = former_locations[issue.issue_id]
+        moved_back_iids.add(issue.issue_id)
+      else:
+        dest_id = self.AllocateNextLocalID(cnxn, dest_project.project_id)
+
+      issue.local_id = dest_id
+      issue.project_id = dest_project.project_id
+      issue.project_name = dest_project.project_name
+
+    # Rewrite each whole issue so that status and label IDs are looked up
+    # in the context of the destination project.
+    self.UpdateIssues(cnxn, issues)
+
+    # Comments also have the project_id because it is needed for an index.
+    self.comment_tbl.Update(
+        cnxn, {'project_id': dest_project.project_id},
+        issue_id=[issue.issue_id for issue in issues], commit=False)
+
+    # Record old locations so that we can offer links if the user looks there.
+    self.issueformerlocations_tbl.InsertRows(
+        cnxn, ISSUEFORMERLOCATIONS_COLS, old_location_rows, ignore=True,
+        commit=False)
+    cnxn.Commit()
+
+    tracker_fulltext.IndexIssues(
+        cnxn, issues, user_service, self, self._config_service)
+
+    return moved_back_iids
+
+  def ExpungeFormerLocations(self, cnxn, project_id):
+    """Delete history of issues that were in this project but moved out."""
+    self.issueformerlocations_tbl.Delete(cnxn, project_id=project_id)
+
+  def ExpungeIssues(self, cnxn, issue_ids):
+    """Completely delete the specified issues from the database."""
+    logging.info('expunging the issues %r', issue_ids)
+    tracker_fulltext.UnindexIssues(issue_ids)
+
+    remaining_iids = issue_ids[:]
+
+    # Note: these are purposely not done in a transaction to allow
+    # incremental progress in what might be a very large change.
+    # We are not concerned about non-atomic deletes because all
+    # this data will be gone eventually anyway.
+    while remaining_iids:
+      iids_in_chunk = remaining_iids[:CHUNK_SIZE]
+      remaining_iids = remaining_iids[CHUNK_SIZE:]
+      self.issuesummary_tbl.Delete(cnxn, issue_id=iids_in_chunk)
+      self.issue2label_tbl.Delete(cnxn, issue_id=iids_in_chunk)
+      self.issue2component_tbl.Delete(cnxn, issue_id=iids_in_chunk)
+      self.issue2cc_tbl.Delete(cnxn, issue_id=iids_in_chunk)
+      self.issue2notify_tbl.Delete(cnxn, issue_id=iids_in_chunk)
+      self.issueupdate_tbl.Delete(cnxn, issue_id=iids_in_chunk)
+      self.attachment_tbl.Delete(cnxn, issue_id=iids_in_chunk)
+      self.comment_tbl.Delete(cnxn, issue_id=iids_in_chunk)
+      self.issuerelation_tbl.Delete(cnxn, issue_id=iids_in_chunk)
+      self.issuerelation_tbl.Delete(cnxn, dst_issue_id=iids_in_chunk)
+      self.danglingrelation_tbl.Delete(cnxn, issue_id=iids_in_chunk)
+      self.issueformerlocations_tbl.Delete(cnxn, issue_id=iids_in_chunk)
+      self.reindexqueue_tbl.Delete(cnxn, issue_id=iids_in_chunk)
+      self.issue_tbl.Delete(cnxn, id=iids_in_chunk)
+
+  def SoftDeleteIssue(self, cnxn, project_id, local_id, deleted, user_service):
+    """Set the deleted boolean on the indicated issue and store it.
+
+    Args:
+      cnxn: connection to SQL database.
+      project_id: int project ID for the current project.
+      local_id: int local ID of the issue to freeze/unfreeze.
+      deleted: boolean, True to soft-delete, False to undelete.
+      user_service: persistence layer for users, used to lookup user IDs.
+    """
+    issue = self.GetIssueByLocalID(cnxn, project_id, local_id, use_cache=False)
+    issue.deleted = deleted
+    self.UpdateIssue(cnxn, issue, update_cols=['deleted'])
+    tracker_fulltext.IndexIssues(
+        cnxn, [issue], user_service, self, self._config_service)
+
+  def DeleteComponentReferences(self, cnxn, component_id):
+    """Delete any references to the specified component."""
+    # TODO(jrobbins): add tasks to re-index any affected issues.
+    # Note: if this call fails, some data could be left
+    # behind, but it would not be displayed, and it could always be
+    # GC'd from the DB later.
+    self.issue2component_tbl.Delete(cnxn, component_id=component_id)
+
+  ### Local ID generation
+
+  def InitializeLocalID(self, cnxn, project_id):
+    """Initialize the local ID counter for the specified project to zero.
+
+    Args:
+      cnxn: connection to SQL database.
+      project_id: int ID of the project.
+    """
+    self.localidcounter_tbl.InsertRow(
+        cnxn, project_id=project_id, used_local_id=0, used_spam_id=0)
+
+  def SetUsedLocalID(self, cnxn, project_id):
+    """Set the local ID counter based on existing issues.
+
+    Args:
+      cnxn: connection to SQL database.
+      project_id: int ID of the project.
+    """
+    highest_id = self.GetHighestLocalID(cnxn, project_id)
+    self.localidcounter_tbl.InsertRow(
+        cnxn, replace=True, used_local_id=highest_id, project_id=project_id)
+    return highest_id
+
+  def AllocateNextLocalID(self, cnxn, project_id):
+    """Return the next available issue ID in the specified project.
+
+    Args:
+      cnxn: connection to SQL database.
+      project_id: int ID of the project.
+
+    Returns:
+      The next local ID.
+    """
+    try:
+      next_local_id = self.localidcounter_tbl.IncrementCounterValue(
+          cnxn, 'used_local_id', project_id=project_id)
+    except AssertionError as e:
+      logging.info('exception incrementing local_id counter: %s', e)
+      next_local_id = self.SetUsedLocalID(cnxn, project_id) + 1
+    return next_local_id
+
+  def GetHighestLocalID(self, cnxn, project_id):
+    """Return the highest used issue ID in the specified project.
+
+    Args:
+      cnxn: connection to SQL database.
+      project_id: int ID of the project.
+
+    Returns:
+      The highest local ID for an active or moved issues.
+    """
+    highest = self.issue_tbl.SelectValue(
+        cnxn, 'MAX(local_id)', project_id=project_id)
+    highest = highest or 0  # It will be None if the project has no issues.
+    highest_former = self.issueformerlocations_tbl.SelectValue(
+        cnxn, 'MAX(local_id)', project_id=project_id)
+    highest_former = highest_former or 0
+    return max(highest, highest_former)
+
+  def GetAllLocalIDsInProject(self, cnxn, project_id, min_local_id=None):
+    """Return the list of local IDs only, not the actual issues.
+
+    Args:
+      cnxn: connection to SQL database.
+      project_id: the ID of the project to which the issue belongs.
+      min_local_id: point to start at.
+
+    Returns:
+      A range object of local IDs from 1 to N, or from min_local_id to N.  It
+      may be the case that some of those local IDs are no longer used, e.g.,
+      if some issues were moved out of this project.
+    """
+    if not min_local_id:
+      min_local_id = 1
+    highest_local_id = self.GetHighestLocalID(cnxn, project_id)
+    return list(range(min_local_id, highest_local_id + 1))
+
+  def ExpungeLocalIDCounters(self, cnxn, project_id):
+    """Delete history of local ids that were in this project."""
+    self.localidcounter_tbl.Delete(cnxn, project_id=project_id)
+
+  ### Comments
+
+  def _UnpackComment(
+      self, comment_row, content_dict, inbound_message_dict, approval_dict,
+      importer_dict):
+    """Partially construct a Comment PB from a DB row."""
+    (comment_id, issue_id, created, project_id, commenter_id,
+     deleted_by, is_spam, is_description, commentcontent_id) = comment_row
+    comment = tracker_pb2.IssueComment()
+    comment.id = comment_id
+    comment.issue_id = issue_id
+    comment.timestamp = created
+    comment.project_id = project_id
+    comment.user_id = commenter_id
+    comment.content = content_dict.get(commentcontent_id, '')
+    comment.inbound_message = inbound_message_dict.get(commentcontent_id, '')
+    comment.deleted_by = deleted_by or 0
+    comment.is_spam = bool(is_spam)
+    comment.is_description = bool(is_description)
+    comment.approval_id = approval_dict.get(comment_id)
+    comment.importer_id = importer_dict.get(comment_id)
+    return comment
+
+  def _UnpackAmendment(self, amendment_row):
+    """Construct an Amendment PB from a DB row."""
+    (_id, _issue_id, comment_id, field_name,
+     old_value, new_value, added_user_id, removed_user_id,
+     custom_field_name) = amendment_row
+    amendment = tracker_pb2.Amendment()
+    field_enum = tracker_pb2.FieldID(field_name.upper())
+    amendment.field = field_enum
+
+    # TODO(jrobbins): display old values in more cases.
+    if new_value is not None:
+      amendment.newvalue = new_value
+    if old_value is not None:
+      amendment.oldvalue = old_value
+    if added_user_id:
+      amendment.added_user_ids.append(added_user_id)
+    if removed_user_id:
+      amendment.removed_user_ids.append(removed_user_id)
+    if custom_field_name:
+      amendment.custom_field_name = custom_field_name
+    return amendment, comment_id
+
+  def _ConsolidateAmendments(self, amendments):
+    """Consoliodate amendments of the same field in one comment into one
+    amendment PB."""
+
+    fields_dict = {}
+    result = []
+
+    for amendment in amendments:
+      key = amendment.field, amendment.custom_field_name
+      fields_dict.setdefault(key, []).append(amendment)
+    for (field, _custom_name), sorted_amendments in sorted(fields_dict.items()):
+      new_amendment = tracker_pb2.Amendment()
+      new_amendment.field = field
+      for amendment in sorted_amendments:
+        if amendment.newvalue is not None:
+          if new_amendment.newvalue is not None:
+            # NOTE: see crbug/monorail/8272. BLOCKEDON and BLOCKING changes
+            # are all stored in newvalue e.g. (newvalue = -b/123 b/124) and
+            # external bugs and monorail bugs are stored in separate amendments.
+            # Without this, the values of external bug amendments and monorail
+            # blocker bug amendments may overwrite each other.
+            new_amendment.newvalue += (' ' + amendment.newvalue)
+          else:
+            new_amendment.newvalue = amendment.newvalue
+        if amendment.oldvalue is not None:
+          new_amendment.oldvalue = amendment.oldvalue
+        if amendment.added_user_ids:
+          new_amendment.added_user_ids.extend(amendment.added_user_ids)
+        if amendment.removed_user_ids:
+          new_amendment.removed_user_ids.extend(amendment.removed_user_ids)
+        if amendment.custom_field_name:
+          new_amendment.custom_field_name = amendment.custom_field_name
+      result.append(new_amendment)
+    return result
+
+  def _UnpackAttachment(self, attachment_row):
+    """Construct an Attachment PB from a DB row."""
+    (attachment_id, _issue_id, comment_id, filename, filesize, mimetype,
+     deleted, gcs_object_id) = attachment_row
+    attach = tracker_pb2.Attachment()
+    attach.attachment_id = attachment_id
+    attach.filename = filename
+    attach.filesize = filesize
+    attach.mimetype = mimetype
+    attach.deleted = bool(deleted)
+    attach.gcs_object_id = gcs_object_id
+    return attach, comment_id
+
+  def _DeserializeComments(
+      self, comment_rows, commentcontent_rows, amendment_rows, attachment_rows,
+      approval_rows, importer_rows):
+    """Turn rows into IssueComment PBs."""
+    results = []  # keep objects in the same order as the rows
+    results_dict = {}  # for fast access when joining.
+
+    content_dict = dict(
+        (commentcontent_id, content) for
+        commentcontent_id, content, _ in commentcontent_rows)
+    inbound_message_dict = dict(
+        (commentcontent_id, inbound_message) for
+        commentcontent_id, _, inbound_message in commentcontent_rows)
+    approval_dict = dict(
+        (comment_id, approval_id) for approval_id, comment_id in
+        approval_rows)
+    importer_dict = dict(importer_rows)
+
+    for comment_row in comment_rows:
+      comment = self._UnpackComment(
+          comment_row, content_dict, inbound_message_dict, approval_dict,
+          importer_dict)
+      results.append(comment)
+      results_dict[comment.id] = comment
+
+    for amendment_row in amendment_rows:
+      amendment, comment_id = self._UnpackAmendment(amendment_row)
+      try:
+        results_dict[comment_id].amendments.extend([amendment])
+      except KeyError:
+        logging.error('Found amendment for missing comment: %r', comment_id)
+
+    for attachment_row in attachment_rows:
+      attach, comment_id = self._UnpackAttachment(attachment_row)
+      try:
+        results_dict[comment_id].attachments.append(attach)
+      except KeyError:
+        logging.error('Found attachment for missing comment: %r', comment_id)
+
+    for c in results:
+      c.amendments = self._ConsolidateAmendments(c.amendments)
+
+    return results
+
+  # TODO(jrobbins): make this a private method and expose just the interface
+  # needed by activities.py.
+  def GetComments(
+      self, cnxn, where=None, order_by=None, content_only=False, **kwargs):
+    """Retrieve comments from SQL."""
+    shard_id = sql.RandomShardID()
+    order_by = order_by or [('created', [])]
+    comment_rows = self.comment_tbl.Select(
+        cnxn, cols=COMMENT_COLS, where=where,
+        order_by=order_by, shard_id=shard_id, **kwargs)
+    cids = [row[0] for row in comment_rows]
+    commentcontent_ids = [row[-1] for row in comment_rows]
+    content_rows = self.commentcontent_tbl.Select(
+        cnxn, cols=COMMENTCONTENT_COLS, id=commentcontent_ids,
+        shard_id=shard_id)
+    approval_rows = self.issueapproval2comment_tbl.Select(
+        cnxn, cols=ISSUEAPPROVAL2COMMENT_COLS, comment_id=cids)
+    amendment_rows = []
+    attachment_rows = []
+    importer_rows = []
+    if not content_only:
+      amendment_rows = self.issueupdate_tbl.Select(
+          cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids, shard_id=shard_id)
+      attachment_rows = self.attachment_tbl.Select(
+          cnxn, cols=ATTACHMENT_COLS, comment_id=cids, shard_id=shard_id)
+      importer_rows = self.commentimporter_tbl.Select(
+          cnxn, cols=COMMENTIMPORTER_COLS, comment_id=cids, shard_id=shard_id)
+
+    comments = self._DeserializeComments(
+        comment_rows, content_rows, amendment_rows, attachment_rows,
+        approval_rows, importer_rows)
+    return comments
+
+  def GetComment(self, cnxn, comment_id):
+    """Get the requested comment, or raise an exception."""
+    comments = self.GetComments(cnxn, id=comment_id)
+    try:
+      return comments[0]
+    except IndexError:
+      raise exceptions.NoSuchCommentException()
+
+  def GetCommentsForIssue(self, cnxn, issue_id):
+    """Return all IssueComment PBs for the specified issue.
+
+    Args:
+      cnxn: connection to SQL database.
+      issue_id: int global ID of the issue.
+
+    Returns:
+      A list of the IssueComment protocol buffers for the description
+      and comments on this issue.
+    """
+    comments = self.GetComments(cnxn, issue_id=[issue_id])
+    for i, comment in enumerate(comments):
+      comment.sequence = i
+
+    return comments
+
+
+  def GetCommentsByID(self, cnxn, comment_ids, sequences, use_cache=True,
+      shard_id=None):
+    """Return all IssueComment PBs by comment ids.
+
+    Args:
+      cnxn: connection to SQL database.
+      comment_ids: a list of comment ids.
+      sequences: sequence of the comments.
+      use_cache: optional boolean to enable the cache.
+      shard_id: optional int shard_id to limit retrieval.
+
+    Returns:
+      A list of the IssueComment protocol buffers for comment_ids.
+    """
+    # Try loading issue comments from a random shard to reduce load on
+    # primary DB.
+    if shard_id is None:
+      shard_id = sql.RandomShardID()
+
+    comment_dict, _missed_comments = self.comment_2lc.GetAll(cnxn, comment_ids,
+          use_cache=use_cache, shard_id=shard_id)
+
+    comments = sorted(list(comment_dict.values()), key=lambda x: x.timestamp)
+
+    for i in range(len(comment_ids)):
+      comments[i].sequence = sequences[i]
+
+    return comments
+
+  # TODO(jrobbins): remove this method because it is too slow when an issue
+  # has a huge number of comments.
+  def GetCommentsForIssues(self, cnxn, issue_ids, content_only=False):
+    """Return all IssueComment PBs for each issue ID in the given list.
+
+    Args:
+      cnxn: connection to SQL database.
+      issue_ids: list of integer global issue IDs.
+      content_only: optional boolean, set true for faster loading of
+          comment content without attachments and amendments.
+
+    Returns:
+      Dict {issue_id: [IssueComment, ...]} with IssueComment protocol
+      buffers for the description and comments on each issue.
+    """
+    comments = self.GetComments(
+        cnxn, issue_id=issue_ids, content_only=content_only)
+
+    comments_dict = collections.defaultdict(list)
+    for comment in comments:
+      comment.sequence = len(comments_dict[comment.issue_id])
+      comments_dict[comment.issue_id].append(comment)
+
+    return comments_dict
+
+  def InsertComment(self, cnxn, comment, commit=True):
+    """Store the given issue comment in SQL.
+
+    Args:
+      cnxn: connection to SQL database.
+      comment: IssueComment PB to insert into the database.
+      commit: set to False to avoid doing the commit for now.
+    """
+    commentcontent_id = self.commentcontent_tbl.InsertRow(
+        cnxn, content=comment.content,
+        inbound_message=comment.inbound_message, commit=False)
+    comment_id = self.comment_tbl.InsertRow(
+        cnxn, issue_id=comment.issue_id, created=comment.timestamp,
+        project_id=comment.project_id,
+        commenter_id=comment.user_id,
+        deleted_by=comment.deleted_by or None,
+        is_spam=comment.is_spam, is_description=comment.is_description,
+        commentcontent_id=commentcontent_id,
+        commit=False)
+    comment.id = comment_id
+    if comment.importer_id:
+      self.commentimporter_tbl.InsertRow(
+          cnxn, comment_id=comment_id, importer_id=comment.importer_id)
+
+    amendment_rows = []
+    for amendment in comment.amendments:
+      field_enum = str(amendment.field).lower()
+      if (amendment.get_assigned_value('newvalue') is not None and
+          not amendment.added_user_ids and not amendment.removed_user_ids):
+        amendment_rows.append((
+            comment.issue_id, comment_id, field_enum,
+            amendment.oldvalue, amendment.newvalue,
+            None, None, amendment.custom_field_name))
+      for added_user_id in amendment.added_user_ids:
+        amendment_rows.append((
+            comment.issue_id, comment_id, field_enum, None, None,
+            added_user_id, None, amendment.custom_field_name))
+      for removed_user_id in amendment.removed_user_ids:
+        amendment_rows.append((
+            comment.issue_id, comment_id, field_enum, None, None,
+            None, removed_user_id, amendment.custom_field_name))
+    # ISSUEUPDATE_COLS[1:] to skip id column.
+    self.issueupdate_tbl.InsertRows(
+        cnxn, ISSUEUPDATE_COLS[1:], amendment_rows, commit=False)
+
+    attachment_rows = []
+    for attach in comment.attachments:
+      attachment_rows.append([
+          comment.issue_id, comment.id, attach.filename, attach.filesize,
+          attach.mimetype, attach.deleted, attach.gcs_object_id])
+    self.attachment_tbl.InsertRows(
+        cnxn, ATTACHMENT_COLS[1:], attachment_rows, commit=False)
+
+    if comment.approval_id:
+      self.issueapproval2comment_tbl.InsertRows(
+          cnxn, ISSUEAPPROVAL2COMMENT_COLS,
+          [(comment.approval_id, comment_id)], commit=False)
+
+    if commit:
+      cnxn.Commit()
+
+  def _UpdateComment(self, cnxn, comment, update_cols=None):
+    """Update the given issue comment in SQL.
+
+    Args:
+      cnxn: connection to SQL database.
+      comment: IssueComment PB to update in the database.
+      update_cols: optional list of just the field names to update.
+    """
+    delta = {
+        'commenter_id': comment.user_id,
+        'deleted_by': comment.deleted_by or None,
+        'is_spam': comment.is_spam,
+        }
+    if update_cols is not None:
+      delta = {key: val for key, val in delta.items()
+               if key in update_cols}
+
+    self.comment_tbl.Update(cnxn, delta, id=comment.id)
+    self.comment_2lc.InvalidateKeys(cnxn, [comment.id])
+
+  def _MakeIssueComment(
+      self, project_id, user_id, content, inbound_message=None,
+      amendments=None, attachments=None, kept_attachments=None, timestamp=None,
+      is_spam=False, is_description=False, approval_id=None, importer_id=None):
+    """Create in IssueComment protocol buffer in RAM.
+
+    Args:
+      project_id: Project with the issue.
+      user_id: the user ID of the user who entered the comment.
+      content: string body of the comment.
+      inbound_message: optional string full text of an email that
+          caused this comment to be added.
+      amendments: list of Amendment PBs describing the
+          metadata changes that the user made along w/ comment.
+      attachments: [(filename, contents, mimetype),...] attachments uploaded at
+          the time the comment was made.
+      kept_attachments: list of Attachment PBs for attachments kept from
+          previous descriptions, if the comment is a description
+      timestamp: time at which the comment was made, defaults to now.
+      is_spam: True if the comment was classified as spam.
+      is_description: True if the comment is a description for the issue.
+      approval_id: id, if any, of the APPROVAL_TYPE FieldDef this comment
+          belongs to.
+      importer_id: optional User ID of script that imported the comment on
+          behalf of a user.
+
+    Returns:
+      The new IssueComment protocol buffer.
+
+    The content may have some markup done during input processing.
+
+    Any attachments are immediately stored.
+    """
+    comment = tracker_pb2.IssueComment()
+    comment.project_id = project_id
+    comment.user_id = user_id
+    comment.content = content or ''
+    comment.is_spam = is_spam
+    comment.is_description = is_description
+    if not timestamp:
+      timestamp = int(time.time())
+    comment.timestamp = int(timestamp)
+    if inbound_message:
+      comment.inbound_message = inbound_message
+    if amendments:
+      logging.info('amendments is %r', amendments)
+      comment.amendments.extend(amendments)
+    if approval_id:
+      comment.approval_id = approval_id
+
+    if attachments:
+      for filename, body, mimetype in attachments:
+        gcs_object_id = gcs_helpers.StoreObjectInGCS(
+            body, mimetype, project_id, filename=filename)
+        attach = tracker_pb2.Attachment()
+        # attachment id is determined later by the SQL DB.
+        attach.filename = filename
+        attach.filesize = len(body)
+        attach.mimetype = mimetype
+        attach.gcs_object_id = gcs_object_id
+        comment.attachments.extend([attach])
+        logging.info("Save attachment with object_id: %s" % gcs_object_id)
+
+    if kept_attachments:
+      for kept_attach in kept_attachments:
+        (filename, filesize, mimetype, deleted,
+         gcs_object_id) = kept_attach[3:]
+        new_attach = tracker_pb2.Attachment(
+            filename=filename, filesize=filesize, mimetype=mimetype,
+            deleted=bool(deleted), gcs_object_id=gcs_object_id)
+        comment.attachments.append(new_attach)
+        logging.info("Copy attachment with object_id: %s" % gcs_object_id)
+
+    if importer_id:
+      comment.importer_id = importer_id
+
+    return comment
+
+  def CreateIssueComment(
+      self, cnxn, issue, user_id, content, inbound_message=None,
+      amendments=None, attachments=None, kept_attachments=None, timestamp=None,
+      is_spam=False, is_description=False, approval_id=None, commit=True,
+      importer_id=None):
+    """Create and store a new comment on the specified issue.
+
+    Args:
+      cnxn: connection to SQL database.
+      issue: the issue on which to add the comment, must be loaded from
+          database with use_cache=False so that assume_stale == False.
+      user_id: the user ID of the user who entered the comment.
+      content: string body of the comment.
+      inbound_message: optional string full text of an email that caused
+          this comment to be added.
+      amendments: list of Amendment PBs describing the
+          metadata changes that the user made along w/ comment.
+      attachments: [(filename, contents, mimetype),...] attachments uploaded at
+          the time the comment was made.
+      kept_attachments: list of attachment ids for attachments kept from
+          previous descriptions, if the comment is an update to the description
+      timestamp: time at which the comment was made, defaults to now.
+      is_spam: True if the comment is classified as spam.
+      is_description: True if the comment is a description for the issue.
+      approval_id: id, if any, of the APPROVAL_TYPE FieldDef this comment
+          belongs to.
+      commit: set to False to not commit to DB yet.
+      importer_id: user ID of an API client that is importing issues.
+
+    Returns:
+      The new IssueComment protocol buffer.
+
+    Note that we assume that the content is safe to echo out
+    again. The content may have some markup done during input
+    processing.
+    """
+    if is_description:
+      kept_attachments = self.GetAttachmentsByID(cnxn, kept_attachments)
+    else:
+      kept_attachments = []
+
+    comment = self._MakeIssueComment(
+        issue.project_id, user_id, content, amendments=amendments,
+        inbound_message=inbound_message, attachments=attachments,
+        timestamp=timestamp, is_spam=is_spam, is_description=is_description,
+        kept_attachments=kept_attachments, approval_id=approval_id,
+        importer_id=importer_id)
+    comment.issue_id = issue.issue_id
+
+    if attachments or kept_attachments:
+      issue.attachment_count = (
+          issue.attachment_count + len(attachments) + len(kept_attachments))
+      self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
+
+    self.comment_creations.increment()
+    self.InsertComment(cnxn, comment, commit=commit)
+
+    return comment
+
+  def SoftDeleteComment(
+      self, cnxn, issue, issue_comment, deleted_by_user_id,
+      user_service, delete=True, reindex=False, is_spam=False):
+    """Mark comment as un/deleted, which shows/hides it from average users."""
+    # Update number of attachments
+    attachments = 0
+    if issue_comment.attachments:
+      for attachment in issue_comment.attachments:
+        if not attachment.deleted:
+          attachments += 1
+
+    # Delete only if it's not in deleted state
+    if delete:
+      if not issue_comment.deleted_by:
+        issue_comment.deleted_by = deleted_by_user_id
+        issue.attachment_count = issue.attachment_count - attachments
+
+    # Undelete only if it's in deleted state
+    elif issue_comment.deleted_by:
+      issue_comment.deleted_by = 0
+      issue.attachment_count = issue.attachment_count + attachments
+
+    issue_comment.is_spam = is_spam
+    self._UpdateComment(
+        cnxn, issue_comment, update_cols=['deleted_by', 'is_spam'])
+    self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
+
+    # Reindex the issue to take the comment deletion/undeletion into account.
+    if reindex:
+      tracker_fulltext.IndexIssues(
+          cnxn, [issue], user_service, self, self._config_service)
+    else:
+      self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
+
+  ### Approvals
+
+  def GetIssueApproval(self, cnxn, issue_id, approval_id, use_cache=True):
+    """Retrieve the specified approval for the specified issue."""
+    issue = self.GetIssue(cnxn, issue_id, use_cache=use_cache)
+    approval = tracker_bizobj.FindApprovalValueByID(
+        approval_id, issue.approval_values)
+    if approval:
+      return issue, approval
+    raise exceptions.NoSuchIssueApprovalException()
+
+  def DeltaUpdateIssueApproval(
+      self, cnxn, modifier_id, config, issue, approval, approval_delta,
+      comment_content=None, is_description=False, attachments=None,
+      commit=True, kept_attachments=None):
+    """Update the issue's approval in the database."""
+    amendments = []
+
+    # Update status in RAM and DB and create status amendment.
+    if approval_delta.status:
+      approval.status = approval_delta.status
+      approval.set_on = approval_delta.set_on or int(time.time())
+      approval.setter_id = modifier_id
+      status_amendment = tracker_bizobj.MakeApprovalStatusAmendment(
+          approval_delta.status)
+      amendments.append(status_amendment)
+
+      self._UpdateIssueApprovalStatus(
+        cnxn, issue.issue_id, approval.approval_id, approval.status,
+        approval.setter_id, approval.set_on)
+
+    # Update approver_ids in RAM and DB and create approver amendment.
+    approvers_add = [approver for approver in approval_delta.approver_ids_add
+                     if approver not in approval.approver_ids]
+    approvers_remove = [approver for approver in
+                        approval_delta.approver_ids_remove
+                        if approver in approval.approver_ids]
+    if approvers_add or approvers_remove:
+      approver_ids = [approver for approver in
+                      list(approval.approver_ids) + approvers_add
+                      if approver not in approvers_remove]
+      approval.approver_ids = approver_ids
+      approvers_amendment = tracker_bizobj.MakeApprovalApproversAmendment(
+          approvers_add, approvers_remove)
+      amendments.append(approvers_amendment)
+
+      self._UpdateIssueApprovalApprovers(
+          cnxn, issue.issue_id, approval.approval_id, approver_ids)
+
+    fv_amendments = tracker_bizobj.ApplyFieldValueChanges(
+        issue, config, approval_delta.subfield_vals_add,
+        approval_delta.subfield_vals_remove, approval_delta.subfields_clear)
+    amendments.extend(fv_amendments)
+    if fv_amendments:
+      self._UpdateIssuesFields(cnxn, [issue], commit=False)
+
+    label_amendment = tracker_bizobj.ApplyLabelChanges(
+        issue, config, approval_delta.labels_add, approval_delta.labels_remove)
+    if label_amendment:
+      amendments.append(label_amendment)
+      self._UpdateIssuesLabels(cnxn, [issue], commit=False)
+
+    comment_pb = self.CreateIssueComment(
+        cnxn, issue, modifier_id, comment_content, amendments=amendments,
+        approval_id=approval.approval_id, is_description=is_description,
+        attachments=attachments, commit=False,
+        kept_attachments=kept_attachments)
+
+    if commit:
+      cnxn.Commit()
+    self.issue_2lc.InvalidateKeys(cnxn, [issue.issue_id])
+
+    return comment_pb
+
+  def _UpdateIssueApprovalStatus(
+      self, cnxn, issue_id, approval_id, status, setter_id, set_on):
+    """Update the approvalvalue for the given issue_id's issue."""
+    set_on = set_on or int(time.time())
+    delta = {
+        'status': status.name.lower(),
+        'setter_id': setter_id,
+        'set_on': set_on,
+        }
+    self.issue2approvalvalue_tbl.Update(
+        cnxn, delta, approval_id=approval_id, issue_id=issue_id,
+        commit=False)
+
+  def _UpdateIssueApprovalApprovers(
+      self, cnxn, issue_id, approval_id, approver_ids):
+    """Update the list of approvers allowed to approve an issue's approval."""
+    self.issueapproval2approver_tbl.Delete(
+        cnxn, issue_id=issue_id, approval_id=approval_id, commit=False)
+    self.issueapproval2approver_tbl.InsertRows(
+        cnxn, ISSUEAPPROVAL2APPROVER_COLS, [(approval_id, approver_id, issue_id)
+                                            for approver_id in approver_ids],
+        commit=False)
+
+  ### Attachments
+
+  def GetAttachmentAndContext(self, cnxn, attachment_id):
+    """Load a IssueAttachment from database, and its comment ID and IID.
+
+    Args:
+      cnxn: connection to SQL database.
+      attachment_id: long integer unique ID of desired issue attachment.
+
+    Returns:
+      An Attachment protocol buffer that contains metadata about the attached
+      file, or None if it doesn't exist.  Also, the comment ID and issue IID
+      of the comment and issue that contain this attachment.
+
+    Raises:
+      NoSuchAttachmentException: the attachment was not found.
+    """
+    if attachment_id is None:
+      raise exceptions.NoSuchAttachmentException()
+
+    attachment_row = self.attachment_tbl.SelectRow(
+        cnxn, cols=ATTACHMENT_COLS, id=attachment_id)
+    if attachment_row:
+      (attach_id, issue_id, comment_id, filename, filesize, mimetype,
+       deleted, gcs_object_id) = attachment_row
+      if not deleted:
+        attachment = tracker_pb2.Attachment(
+            attachment_id=attach_id, filename=filename, filesize=filesize,
+            mimetype=mimetype, deleted=bool(deleted),
+            gcs_object_id=gcs_object_id)
+        return attachment, comment_id, issue_id
+
+    raise exceptions.NoSuchAttachmentException()
+
+  def GetAttachmentsByID(self, cnxn, attachment_ids):
+    """Return all Attachment PBs by attachment ids.
+
+    Args:
+      cnxn: connection to SQL database.
+      attachment_ids: a list of comment ids.
+
+    Returns:
+      A list of the Attachment protocol buffers for the attachments with
+      these ids.
+    """
+    attachment_rows = self.attachment_tbl.Select(
+        cnxn, cols=ATTACHMENT_COLS, id=attachment_ids)
+
+    return attachment_rows
+
+  def _UpdateAttachment(self, cnxn, comment, attach, update_cols=None):
+    """Update attachment metadata in the DB.
+
+    Args:
+      cnxn: connection to SQL database.
+      comment: IssueComment PB to invalidate in the cache.
+      attach: IssueAttachment PB to update in the DB.
+      update_cols: optional list of just the field names to update.
+    """
+    delta = {
+        'filename': attach.filename,
+        'filesize': attach.filesize,
+        'mimetype': attach.mimetype,
+        'deleted': bool(attach.deleted),
+        }
+    if update_cols is not None:
+      delta = {key: val for key, val in delta.items()
+               if key in update_cols}
+
+    self.attachment_tbl.Update(cnxn, delta, id=attach.attachment_id)
+    self.comment_2lc.InvalidateKeys(cnxn, [comment.id])
+
+  def SoftDeleteAttachment(
+      self, cnxn, issue, issue_comment, attach_id, user_service, delete=True,
+      index_now=False):
+    """Mark attachment as un/deleted, which shows/hides it from avg users."""
+    attachment = None
+    for attach in issue_comment.attachments:
+      if attach.attachment_id == attach_id:
+        attachment = attach
+
+    if not attachment:
+      logging.warning(
+          'Tried to (un)delete non-existent attachment #%s in project '
+          '%s issue %s', attach_id, issue.project_id, issue.local_id)
+      return
+
+    if not issue_comment.deleted_by:
+      # Decrement attachment count only if it's not in deleted state
+      if delete:
+        if not attachment.deleted:
+          issue.attachment_count = issue.attachment_count - 1
+
+      # Increment attachment count only if it's in deleted state
+      elif attachment.deleted:
+        issue.attachment_count = issue.attachment_count + 1
+
+    logging.info('attachment.deleted was %s', attachment.deleted)
+
+    attachment.deleted = delete
+
+    logging.info('attachment.deleted is %s', attachment.deleted)
+
+    self._UpdateAttachment(
+        cnxn, issue_comment, attachment, update_cols=['deleted'])
+    self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
+
+    if index_now:
+      tracker_fulltext.IndexIssues(
+          cnxn, [issue], user_service, self, self._config_service)
+    else:
+      self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
+
+  ### Reindex queue
+
+  def EnqueueIssuesForIndexing(self, cnxn, issue_ids, commit=True):
+    # type: (MonorailConnection, Collection[int], Optional[bool]) -> None
+    """Add the given issue IDs to the ReindexQueue table."""
+    reindex_rows = [(issue_id,) for issue_id in issue_ids]
+    self.reindexqueue_tbl.InsertRows(
+        cnxn, ['issue_id'], reindex_rows, ignore=True, commit=commit)
+
+  def ReindexIssues(self, cnxn, num_to_reindex, user_service):
+    """Reindex some issues specified in the IndexQueue table."""
+    rows = self.reindexqueue_tbl.Select(
+        cnxn, order_by=[('created', [])], limit=num_to_reindex)
+    issue_ids = [row[0] for row in rows]
+
+    if issue_ids:
+      issues = self.GetIssues(cnxn, issue_ids)
+      tracker_fulltext.IndexIssues(
+          cnxn, issues, user_service, self, self._config_service)
+      self.reindexqueue_tbl.Delete(cnxn, issue_id=issue_ids)
+
+    return len(issue_ids)
+
+  ### Search functions
+
+  def RunIssueQuery(
+      self, cnxn, left_joins, where, order_by, shard_id=None, limit=None):
+    """Run a SQL query to find matching issue IDs.
+
+    Args:
+      cnxn: connection to SQL database.
+      left_joins: list of SQL LEFT JOIN clauses.
+      where: list of SQL WHERE clauses.
+      order_by: list of SQL ORDER BY clauses.
+      shard_id: int shard ID to focus the search.
+      limit: int maximum number of results, defaults to
+          settings.search_limit_per_shard.
+
+    Returns:
+      (issue_ids, capped) where issue_ids is a list of the result issue IDs,
+      and capped is True if the number of results reached the limit.
+    """
+    limit = limit or settings.search_limit_per_shard
+    where = where + [('Issue.deleted = %s', [False])]
+    rows = self.issue_tbl.Select(
+        cnxn, shard_id=shard_id, distinct=True, cols=['Issue.id'],
+        left_joins=left_joins, where=where, order_by=order_by,
+        limit=limit)
+    issue_ids = [row[0] for row in rows]
+    capped = len(issue_ids) >= limit
+    return issue_ids, capped
+
+  def GetIIDsByLabelIDs(self, cnxn, label_ids, project_id, shard_id):
+    """Return a list of IIDs for issues with any of the given label IDs."""
+    if not label_ids:
+      return []
+    where = []
+    if shard_id is not None:
+      slice_term = ('shard = %s', [shard_id])
+      where.append(slice_term)
+
+    rows = self.issue_tbl.Select(
+        cnxn, shard_id=shard_id, cols=['id'],
+        left_joins=[('Issue2Label ON Issue.id = Issue2Label.issue_id', [])],
+        label_id=label_ids, project_id=project_id, where=where)
+    return [row[0] for row in rows]
+
+  def GetIIDsByParticipant(self, cnxn, user_ids, project_ids, shard_id):
+    """Return IIDs for issues where any of the given users participate."""
+    iids = []
+    where = []
+    if shard_id is not None:
+      where.append(('shard = %s', [shard_id]))
+    if project_ids:
+      cond_str = 'Issue.project_id IN (%s)' % sql.PlaceHolders(project_ids)
+      where.append((cond_str, project_ids))
+
+    # TODO(jrobbins): Combine these 3 queries into one with ORs.   It currently
+    # is not the bottleneck.
+    rows = self.issue_tbl.Select(
+        cnxn, cols=['id'], reporter_id=user_ids,
+        where=where, shard_id=shard_id)
+    for row in rows:
+      iids.append(row[0])
+
+    rows = self.issue_tbl.Select(
+        cnxn, cols=['id'], owner_id=user_ids,
+        where=where, shard_id=shard_id)
+    for row in rows:
+      iids.append(row[0])
+
+    rows = self.issue_tbl.Select(
+        cnxn, cols=['id'], derived_owner_id=user_ids,
+        where=where, shard_id=shard_id)
+    for row in rows:
+      iids.append(row[0])
+
+    rows = self.issue_tbl.Select(
+        cnxn, cols=['id'],
+        left_joins=[('Issue2Cc ON Issue2Cc.issue_id = Issue.id', [])],
+        cc_id=user_ids,
+        where=where + [('cc_id IS NOT NULL', [])],
+        shard_id=shard_id)
+    for row in rows:
+      iids.append(row[0])
+
+    rows = self.issue_tbl.Select(
+        cnxn, cols=['Issue.id'],
+        left_joins=[
+            ('Issue2FieldValue ON Issue.id = Issue2FieldValue.issue_id', []),
+            ('FieldDef ON Issue2FieldValue.field_id = FieldDef.id', [])],
+        user_id=user_ids, grants_perm='View',
+        where=where + [('user_id IS NOT NULL', [])],
+        shard_id=shard_id)
+    for row in rows:
+      iids.append(row[0])
+
+    return iids
+
+  ### Issue Dependency Rankings
+
+  def SortBlockedOn(self, cnxn, issue, blocked_on_iids):
+    """Sort blocked_on dependencies by rank and dst_issue_id.
+
+    Args:
+      cnxn: connection to SQL database.
+      issue: the issue being blocked.
+      blocked_on_iids: the iids of all the issue's blockers
+
+    Returns:
+      a tuple (ids, ranks), where ids is the sorted list of
+      blocked_on_iids and ranks is the list of corresponding ranks
+    """
+    rows = self.issuerelation_tbl.Select(
+        cnxn, cols=ISSUERELATION_COLS, issue_id=issue.issue_id,
+        dst_issue_id=blocked_on_iids, kind='blockedon',
+        order_by=[('rank DESC', []), ('dst_issue_id', [])])
+    ids = [row[1] for row in rows]
+    ids.extend([iid for iid in blocked_on_iids if iid not in ids])
+    ranks = [row[3] for row in rows]
+    ranks.extend([0] * (len(blocked_on_iids) - len(ranks)))
+    return ids, ranks
+
+  def ApplyIssueRerank(
+      self, cnxn, parent_id, relations_to_change, commit=True, invalidate=True):
+    """Updates rankings of blocked on issue relations to new values
+
+    Args:
+      cnxn: connection to SQL database.
+      parent_id: the global ID of the blocked issue to update
+      relations_to_change: This should be a list of
+        [(blocker_id, new_rank),...] of relations that need to be changed
+      commit: set to False to skip the DB commit and do it in the caller.
+      invalidate: set to False to leave cache invalidatation to the caller.
+    """
+    blocker_ids = [blocker for (blocker, rank) in relations_to_change]
+    self.issuerelation_tbl.Delete(
+        cnxn, issue_id=parent_id, dst_issue_id=blocker_ids, commit=False)
+    insert_rows = [(parent_id, blocker, 'blockedon', rank)
+                   for (blocker, rank) in relations_to_change]
+    self.issuerelation_tbl.InsertRows(
+        cnxn, cols=ISSUERELATION_COLS, row_values=insert_rows, commit=commit)
+    if invalidate:
+      self.InvalidateIIDs(cnxn, [parent_id])
+
+  # Expunge Users from Issues system.
+  def ExpungeUsersInIssues(self, cnxn, user_ids_by_email, limit=None):
+    """Removes all references to given users from issue DB tables.
+
+    This method will not commit the operations. This method will
+    not make changes to in-memory data.
+
+    Args:
+      cnxn: connection to SQL database.
+      user_ids_by_email: dict of {email: user_id} of all users we want
+        to expunge.
+      limit: Optional, the limit for each operation.
+
+    Returns:
+      A list of issue_ids that need to be reindexed.
+    """
+    commit = False
+    user_ids = list(user_ids_by_email.values())
+    user_emails = list(user_ids_by_email.keys())
+    # Track issue_ids for issues that will have different search documents
+    # as a result of removing users.
+    affected_issue_ids = []
+
+    # Reassign commenter_id and delete inbound_messages.
+    shard_id = sql.RandomShardID()
+    comment_content_id_rows = self.comment_tbl.Select(
+        cnxn, cols=['Comment.id', 'Comment.issue_id', 'commentcontent_id'],
+        commenter_id=user_ids, shard_id=shard_id, limit=limit)
+    comment_ids = [row[0] for row in comment_content_id_rows]
+    commentcontent_ids = [row[2] for row in comment_content_id_rows]
+    if commentcontent_ids:
+      self.commentcontent_tbl.Update(
+          cnxn, {'inbound_message': None}, id=commentcontent_ids, commit=commit)
+    if comment_ids:
+      self.comment_tbl.Update(
+          cnxn, {'commenter_id': framework_constants.DELETED_USER_ID},
+          id=comment_ids,
+          commit=commit)
+    affected_issue_ids.extend([row[1] for row in comment_content_id_rows])
+
+    # Reassign deleted_by comments deleted_by.
+    self.comment_tbl.Update(
+        cnxn,
+        {'deleted_by': framework_constants.DELETED_USER_ID},
+        deleted_by=user_ids,
+        commit=commit, limit=limit)
+
+    # Remove users in field values.
+    fv_issue_id_rows = self.issue2fieldvalue_tbl.Select(
+        cnxn, cols=['issue_id'], user_id=user_ids, limit=limit)
+    fv_issue_ids = [row[0] for row in fv_issue_id_rows]
+    self.issue2fieldvalue_tbl.Delete(
+        cnxn, user_id=user_ids, limit=limit, commit=commit)
+    affected_issue_ids.extend(fv_issue_ids)
+
+    # Remove users in approval values.
+    self.issueapproval2approver_tbl.Delete(
+        cnxn, approver_id=user_ids, commit=commit, limit=limit)
+    self.issue2approvalvalue_tbl.Update(
+        cnxn,
+        {'setter_id': framework_constants.DELETED_USER_ID},
+        setter_id=user_ids,
+        commit=commit, limit=limit)
+
+    # Remove users in issue Ccs.
+    cc_issue_id_rows = self.issue2cc_tbl.Select(
+        cnxn, cols=['issue_id'], cc_id=user_ids, limit=limit)
+    cc_issue_ids = [row[0] for row in cc_issue_id_rows]
+    self.issue2cc_tbl.Delete(
+        cnxn, cc_id=user_ids, limit=limit, commit=commit)
+    affected_issue_ids.extend(cc_issue_ids)
+
+    # Remove users in issue owners.
+    owner_issue_id_rows = self.issue_tbl.Select(
+        cnxn, cols=['id'], owner_id=user_ids, limit=limit)
+    owner_issue_ids = [row[0] for row in owner_issue_id_rows]
+    if owner_issue_ids:
+      self.issue_tbl.Update(
+          cnxn, {'owner_id': None}, id=owner_issue_ids, commit=commit)
+    affected_issue_ids.extend(owner_issue_ids)
+    derived_owner_issue_id_rows = self.issue_tbl.Select(
+        cnxn, cols=['id'], derived_owner_id=user_ids, limit=limit)
+    derived_owner_issue_ids = [row[0] for row in derived_owner_issue_id_rows]
+    if derived_owner_issue_ids:
+      self.issue_tbl.Update(
+          cnxn, {'derived_owner_id': None},
+          id=derived_owner_issue_ids,
+          commit=commit)
+    affected_issue_ids.extend(derived_owner_issue_ids)
+
+    # Remove users in issue reporters.
+    reporter_issue_id_rows = self.issue_tbl.Select(
+        cnxn, cols=['id'], reporter_id=user_ids, limit=limit)
+    reporter_issue_ids = [row[0] for row in reporter_issue_id_rows]
+    if reporter_issue_ids:
+      self.issue_tbl.Update(
+          cnxn, {'reporter_id': framework_constants.DELETED_USER_ID},
+          id=reporter_issue_ids,
+          commit=commit)
+    affected_issue_ids.extend(reporter_issue_ids)
+
+    # Note: issueupdate_tbl's and issue2notify's user_id columns do not
+    # reference the User table. So all values need to updated here before
+    # User rows can be deleted safely. No limit will be applied.
+
+    # Remove users in issue updates.
+    self.issueupdate_tbl.Update(
+        cnxn,
+        {'added_user_id': framework_constants.DELETED_USER_ID},
+        added_user_id=user_ids,
+        commit=commit)
+    self.issueupdate_tbl.Update(
+        cnxn,
+        {'removed_user_id': framework_constants.DELETED_USER_ID},
+        removed_user_id=user_ids,
+        commit=commit)
+
+    # Remove users in issue notify.
+    self.issue2notify_tbl.Delete(
+        cnxn, email=user_emails, commit=commit)
+
+    # Remove users in issue snapshots.
+    self.issuesnapshot_tbl.Update(
+        cnxn,
+        {'owner_id': framework_constants.DELETED_USER_ID},
+        owner_id=user_ids,
+        commit=commit, limit=limit)
+    self.issuesnapshot_tbl.Update(
+        cnxn,
+        {'reporter_id': framework_constants.DELETED_USER_ID},
+        reporter_id=user_ids,
+        commit=commit, limit=limit)
+    self.issuesnapshot2cc_tbl.Delete(
+        cnxn, cc_id=user_ids, commit=commit, limit=limit)
+
+    return list(set(affected_issue_ids))