| # Copyright 2016 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """A set of functions that provide persistence for Monorail issue tracking. |
| |
| This module provides functions to get, update, create, and (in some |
| cases) delete each type of business object. It provides a logical |
| persistence layer on top of an SQL database. |
| |
| Business objects are described in tracker_pb2.py and tracker_bizobj.py. |
| """ |
| from __future__ import print_function |
| from __future__ import division |
| from __future__ import absolute_import |
| |
| import collections |
| import json |
| import logging |
| import os |
| import time |
| import uuid |
| |
| from google.appengine.api import app_identity |
| from google.appengine.api import images |
| from google.cloud import storage |
| |
| import settings |
| from features import filterrules_helpers |
| from framework import authdata |
| from framework import exceptions |
| from framework import framework_bizobj |
| from framework import framework_constants |
| from framework import framework_helpers |
| from framework import gcs_helpers |
| from framework import permissions |
| from framework import sql |
| from infra_libs import ts_mon |
| from mrproto import project_pb2 |
| from mrproto import tracker_pb2 |
| from services import caches |
| from services import tracker_fulltext |
| from tracker import tracker_bizobj |
| from tracker import tracker_helpers |
| |
| # TODO(jojwang): monorail:4693, remove this after all 'stable-full' |
| # gates have been renamed to 'stable'. |
| FLT_EQUIVALENT_GATES = {'stable-full': 'stable', |
| 'stable': 'stable-full'} |
| |
| ISSUE_TABLE_NAME = 'Issue' |
| ISSUESUMMARY_TABLE_NAME = 'IssueSummary' |
| ISSUE2LABEL_TABLE_NAME = 'Issue2Label' |
| ISSUE2COMPONENT_TABLE_NAME = 'Issue2Component' |
| ISSUE2CC_TABLE_NAME = 'Issue2Cc' |
| ISSUE2NOTIFY_TABLE_NAME = 'Issue2Notify' |
| ISSUE2FIELDVALUE_TABLE_NAME = 'Issue2FieldValue' |
| COMMENT_TABLE_NAME = 'Comment' |
| COMMENTCONTENT_TABLE_NAME = 'CommentContent' |
| COMMENTIMPORTER_TABLE_NAME = 'CommentImporter' |
| ATTACHMENT_TABLE_NAME = 'Attachment' |
| ISSUERELATION_TABLE_NAME = 'IssueRelation' |
| DANGLINGRELATION_TABLE_NAME = 'DanglingIssueRelation' |
| ISSUEUPDATE_TABLE_NAME = 'IssueUpdate' |
| ISSUEFORMERLOCATIONS_TABLE_NAME = 'IssueFormerLocations' |
| REINDEXQUEUE_TABLE_NAME = 'ReindexQueue' |
| LOCALIDCOUNTER_TABLE_NAME = 'LocalIDCounter' |
| ISSUESNAPSHOT_TABLE_NAME = 'IssueSnapshot' |
| ISSUESNAPSHOT2CC_TABLE_NAME = 'IssueSnapshot2Cc' |
| ISSUESNAPSHOT2COMPONENT_TABLE_NAME = 'IssueSnapshot2Component' |
| ISSUESNAPSHOT2LABEL_TABLE_NAME = 'IssueSnapshot2Label' |
| ISSUEPHASEDEF_TABLE_NAME = 'IssuePhaseDef' |
| ISSUE2APPROVALVALUE_TABLE_NAME = 'Issue2ApprovalValue' |
| ISSUEAPPROVAL2APPROVER_TABLE_NAME = 'IssueApproval2Approver' |
| ISSUEAPPROVAL2COMMENT_TABLE_NAME = 'IssueApproval2Comment' |
| |
| |
| ISSUE_COLS = [ |
| 'id', 'project_id', 'local_id', 'status_id', 'owner_id', 'reporter_id', |
| 'opened', 'closed', 'modified', 'owner_modified', 'status_modified', |
| 'component_modified', 'migration_modified', 'derived_owner_id', |
| 'derived_status_id', 'deleted', 'star_count', 'attachment_count', 'is_spam' |
| ] |
| ISSUESUMMARY_COLS = ['issue_id', 'summary'] |
| ISSUE2LABEL_COLS = ['issue_id', 'label_id', 'derived'] |
| ISSUE2COMPONENT_COLS = ['issue_id', 'component_id', 'derived'] |
| ISSUE2CC_COLS = ['issue_id', 'cc_id', 'derived'] |
| ISSUE2NOTIFY_COLS = ['issue_id', 'email'] |
| ISSUE2FIELDVALUE_COLS = [ |
| 'issue_id', 'field_id', 'int_value', 'str_value', 'user_id', 'date_value', |
| 'url_value', 'derived', 'phase_id'] |
| # Explicitly specify column 'Comment.id' to allow joins on other tables that |
| # have an 'id' column. |
| COMMENT_COLS = [ |
| 'Comment.id', 'issue_id', 'created', 'Comment.project_id', 'commenter_id', |
| 'deleted_by', 'Comment.is_spam', 'is_description', |
| 'commentcontent_id'] # Note: commentcontent_id must be last. |
| COMMENTCONTENT_COLS = [ |
| 'CommentContent.id', 'content', 'inbound_message'] |
| COMMENTIMPORTER_COLS = ['comment_id', 'importer_id'] |
| ABBR_COMMENT_COLS = ['Comment.id', 'commenter_id', 'deleted_by', |
| 'is_description'] |
| ATTACHMENT_COLS = [ |
| 'id', 'issue_id', 'comment_id', 'filename', 'filesize', 'mimetype', |
| 'deleted', 'gcs_object_id'] |
| ISSUERELATION_COLS = ['issue_id', 'dst_issue_id', 'kind', 'rank'] |
| ABBR_ISSUERELATION_COLS = ['dst_issue_id', 'rank'] |
| DANGLINGRELATION_COLS = [ |
| 'issue_id', 'dst_issue_project', 'dst_issue_local_id', |
| 'ext_issue_identifier', 'kind'] |
| ISSUEUPDATE_COLS = [ |
| 'id', 'issue_id', 'comment_id', 'field', 'old_value', 'new_value', |
| 'added_user_id', 'removed_user_id', 'custom_field_name', |
| 'added_component_id', 'removed_component_id' |
| ] |
| ISSUEFORMERLOCATIONS_COLS = ['issue_id', 'project_id', 'local_id'] |
| REINDEXQUEUE_COLS = ['issue_id', 'created'] |
| ISSUESNAPSHOT_COLS = ['id', 'issue_id', 'shard', 'project_id', 'local_id', |
| 'reporter_id', 'owner_id', 'status_id', 'period_start', 'period_end', |
| 'is_open'] |
| ISSUESNAPSHOT2CC_COLS = ['issuesnapshot_id', 'cc_id'] |
| ISSUESNAPSHOT2COMPONENT_COLS = ['issuesnapshot_id', 'component_id'] |
| ISSUESNAPSHOT2LABEL_COLS = ['issuesnapshot_id', 'label_id'] |
| ISSUEPHASEDEF_COLS = ['id', 'name', 'rank'] |
| ISSUE2APPROVALVALUE_COLS = ['approval_id', 'issue_id', 'phase_id', |
| 'status', 'setter_id', 'set_on'] |
| ISSUEAPPROVAL2APPROVER_COLS = ['approval_id', 'approver_id', 'issue_id'] |
| ISSUEAPPROVAL2COMMENT_COLS = ['approval_id', 'comment_id'] |
| |
| CHUNK_SIZE = 1000 |
| |
| |
| class IssueIDTwoLevelCache(caches.AbstractTwoLevelCache): |
| """Class to manage RAM and memcache for Issue IDs.""" |
| |
| def __init__(self, cache_manager, issue_service): |
| super(IssueIDTwoLevelCache, self).__init__( |
| cache_manager, 'issue_id', 'issue_id:', int, |
| max_size=settings.issue_cache_max_size) |
| self.issue_service = issue_service |
| |
| def _MakeCache(self, cache_manager, kind, max_size=None): |
| """Override normal RamCache creation with ValueCentricRamCache.""" |
| return caches.ValueCentricRamCache(cache_manager, kind, max_size=max_size) |
| |
| def _DeserializeIssueIDs(self, project_local_issue_ids): |
| """Convert database rows into a dict {(project_id, local_id): issue_id}.""" |
| return {(project_id, local_id): issue_id |
| for (project_id, local_id, issue_id) in project_local_issue_ids} |
| |
| def FetchItems(self, cnxn, keys): |
| """On RAM and memcache miss, hit the database.""" |
| local_ids_by_pid = collections.defaultdict(list) |
| for project_id, local_id in keys: |
| local_ids_by_pid[project_id].append(local_id) |
| |
| where = [] # We OR per-project pairs of conditions together. |
| for project_id, local_ids_in_project in local_ids_by_pid.items(): |
| term_str = ('(Issue.project_id = %%s AND Issue.local_id IN (%s))' % |
| sql.PlaceHolders(local_ids_in_project)) |
| where.append((term_str, [project_id] + local_ids_in_project)) |
| |
| rows = self.issue_service.issue_tbl.Select( |
| cnxn, cols=['project_id', 'local_id', 'id'], |
| where=where, or_where_conds=True) |
| return self._DeserializeIssueIDs(rows) |
| |
| def _KeyToStr(self, key): |
| """This cache uses pairs of ints as keys. Convert them to strings.""" |
| return '%d,%d' % key |
| |
| def _StrToKey(self, key_str): |
| """This cache uses pairs of ints as keys. Convert them from strings.""" |
| project_id_str, local_id_str = key_str.split(',') |
| return int(project_id_str), int(local_id_str) |
| |
| |
| class IssueTwoLevelCache(caches.AbstractTwoLevelCache): |
| """Class to manage RAM and memcache for Issue PBs.""" |
| |
| def __init__( |
| self, cache_manager, issue_service, project_service, config_service): |
| super(IssueTwoLevelCache, self).__init__( |
| cache_manager, 'issue', 'issue:', tracker_pb2.Issue, |
| max_size=settings.issue_cache_max_size) |
| self.issue_service = issue_service |
| self.project_service = project_service |
| self.config_service = config_service |
| |
| def _UnpackIssue(self, cnxn, issue_row): |
| """Partially construct an issue object using info from a DB row.""" |
| ( |
| issue_id, project_id, local_id, status_id, owner_id, reporter_id, |
| opened, closed, modified, owner_modified, status_modified, |
| component_modified, migration_modified, derived_owner_id, |
| derived_status_id, deleted, star_count, attachment_count, |
| is_spam) = issue_row |
| |
| issue = tracker_pb2.Issue() |
| project = self.project_service.GetProject(cnxn, project_id) |
| issue.project_name = project.project_name |
| issue.issue_id = issue_id |
| issue.project_id = project_id |
| issue.local_id = local_id |
| if status_id is not None: |
| status = self.config_service.LookupStatus(cnxn, project_id, status_id) |
| issue.status = status |
| issue.owner_id = owner_id or 0 |
| issue.reporter_id = reporter_id or 0 |
| issue.derived_owner_id = derived_owner_id or 0 |
| if derived_status_id is not None: |
| derived_status = self.config_service.LookupStatus( |
| cnxn, project_id, derived_status_id) |
| issue.derived_status = derived_status |
| issue.deleted = bool(deleted) |
| if opened: |
| issue.opened_timestamp = opened |
| if closed: |
| issue.closed_timestamp = closed |
| if modified: |
| issue.modified_timestamp = modified |
| if owner_modified: |
| issue.owner_modified_timestamp = owner_modified |
| if status_modified: |
| issue.status_modified_timestamp = status_modified |
| if component_modified: |
| issue.component_modified_timestamp = component_modified |
| if migration_modified: |
| issue.migration_modified_timestamp = migration_modified |
| issue.star_count = star_count |
| issue.attachment_count = attachment_count |
| issue.is_spam = bool(is_spam) |
| return issue |
| |
| def _UnpackFieldValue(self, fv_row): |
| """Construct a field value object from a DB row.""" |
| (issue_id, field_id, int_value, str_value, user_id, date_value, url_value, |
| derived, phase_id) = fv_row |
| fv = tracker_bizobj.MakeFieldValue( |
| field_id, int_value, str_value, user_id, date_value, url_value, |
| bool(derived), phase_id=phase_id) |
| return fv, issue_id |
| |
| def _UnpackApprovalValue(self, av_row): |
| """Contruct an ApprovalValue PB from a DB row.""" |
| (approval_id, issue_id, phase_id, status, setter_id, set_on) = av_row |
| if status: |
| status_enum = tracker_pb2.ApprovalStatus(status.upper()) |
| else: |
| status_enum = tracker_pb2.ApprovalStatus.NOT_SET |
| av = tracker_pb2.ApprovalValue( |
| approval_id=approval_id, setter_id=setter_id, set_on=set_on, |
| status=status_enum, phase_id=phase_id) |
| return av, issue_id |
| |
| def _UnpackPhase(self, phase_row): |
| """Construct a Phase PB from a DB row.""" |
| (phase_id, name, rank) = phase_row |
| phase = tracker_pb2.Phase( |
| phase_id=phase_id, name=name, rank=rank) |
| return phase |
| |
| def _DeserializeIssues( |
| self, cnxn, issue_rows, summary_rows, label_rows, component_rows, |
| cc_rows, notify_rows, fieldvalue_rows, relation_rows, |
| dangling_relation_rows, phase_rows, approvalvalue_rows, |
| av_approver_rows): |
| """Convert the given DB rows into a dict of Issue PBs.""" |
| results_dict = {} |
| for issue_row in issue_rows: |
| issue = self._UnpackIssue(cnxn, issue_row) |
| results_dict[issue.issue_id] = issue |
| |
| for issue_id, summary in summary_rows: |
| results_dict[issue_id].summary = summary |
| |
| # TODO(jrobbins): it would be nice to order labels by rank and name. |
| for issue_id, label_id, derived in label_rows: |
| issue = results_dict.get(issue_id) |
| if not issue: |
| logging.info('Got label for an unknown issue: %r %r', |
| label_rows, issue_rows) |
| continue |
| label = self.config_service.LookupLabel(cnxn, issue.project_id, label_id) |
| assert label, ('Label ID %r on IID %r not found in project %r' % |
| (label_id, issue_id, issue.project_id)) |
| if derived: |
| results_dict[issue_id].derived_labels.append(label) |
| else: |
| results_dict[issue_id].labels.append(label) |
| |
| for issue_id, component_id, derived in component_rows: |
| if derived: |
| results_dict[issue_id].derived_component_ids.append(component_id) |
| else: |
| results_dict[issue_id].component_ids.append(component_id) |
| |
| for issue_id, user_id, derived in cc_rows: |
| if derived: |
| results_dict[issue_id].derived_cc_ids.append(user_id) |
| else: |
| results_dict[issue_id].cc_ids.append(user_id) |
| |
| for issue_id, email in notify_rows: |
| results_dict[issue_id].derived_notify_addrs.append(email) |
| |
| for fv_row in fieldvalue_rows: |
| fv, issue_id = self._UnpackFieldValue(fv_row) |
| results_dict[issue_id].field_values.append(fv) |
| |
| phases_by_id = {} |
| for phase_row in phase_rows: |
| phase = self._UnpackPhase(phase_row) |
| phases_by_id[phase.phase_id] = phase |
| |
| approvers_dict = collections.defaultdict(list) |
| for approver_row in av_approver_rows: |
| approval_id, approver_id, issue_id = approver_row |
| approvers_dict[approval_id, issue_id].append(approver_id) |
| |
| for av_row in approvalvalue_rows: |
| av, issue_id = self._UnpackApprovalValue(av_row) |
| av.approver_ids = approvers_dict[av.approval_id, issue_id] |
| results_dict[issue_id].approval_values.append(av) |
| if av.phase_id: |
| phase = phases_by_id[av.phase_id] |
| issue_phases = results_dict[issue_id].phases |
| if phase not in issue_phases: |
| issue_phases.append(phase) |
| # Order issue phases |
| for issue in results_dict.values(): |
| if issue.phases: |
| issue.phases.sort(key=lambda phase: phase.rank) |
| |
| for issue_id, dst_issue_id, kind, rank in relation_rows: |
| src_issue = results_dict.get(issue_id) |
| dst_issue = results_dict.get(dst_issue_id) |
| assert src_issue or dst_issue, ( |
| 'Neither source issue %r nor dest issue %r was found' % |
| (issue_id, dst_issue_id)) |
| if src_issue: |
| if kind == 'blockedon': |
| src_issue.blocked_on_iids.append(dst_issue_id) |
| src_issue.blocked_on_ranks.append(rank) |
| elif kind == 'mergedinto': |
| src_issue.merged_into = dst_issue_id |
| else: |
| logging.info('unknown relation kind %r', kind) |
| continue |
| |
| if dst_issue: |
| if kind == 'blockedon': |
| dst_issue.blocking_iids.append(issue_id) |
| |
| for row in dangling_relation_rows: |
| issue_id, dst_issue_proj, dst_issue_id, ext_id, kind = row |
| src_issue = results_dict.get(issue_id) |
| if kind == 'blockedon': |
| src_issue.dangling_blocked_on_refs.append( |
| tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj, |
| dst_issue_id, ext_id)) |
| elif kind == 'blocking': |
| src_issue.dangling_blocking_refs.append( |
| tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj, dst_issue_id, |
| ext_id)) |
| elif kind == 'mergedinto': |
| src_issue.merged_into_external = ext_id |
| else: |
| logging.warning('unhandled danging relation kind %r', kind) |
| continue |
| |
| return results_dict |
| |
| # Note: sharding is used to here to allow us to load issues from the replicas |
| # without placing load on the primary DB. Writes are not sharded. |
| # pylint: disable=arguments-differ |
| def FetchItems(self, cnxn, issue_ids, shard_id=None): |
| """Retrieve and deserialize issues.""" |
| issue_rows = self.issue_service.issue_tbl.Select( |
| cnxn, cols=ISSUE_COLS, id=issue_ids, shard_id=shard_id) |
| |
| summary_rows = self.issue_service.issuesummary_tbl.Select( |
| cnxn, cols=ISSUESUMMARY_COLS, shard_id=shard_id, issue_id=issue_ids) |
| label_rows = self.issue_service.issue2label_tbl.Select( |
| cnxn, cols=ISSUE2LABEL_COLS, shard_id=shard_id, issue_id=issue_ids) |
| component_rows = self.issue_service.issue2component_tbl.Select( |
| cnxn, cols=ISSUE2COMPONENT_COLS, shard_id=shard_id, issue_id=issue_ids) |
| cc_rows = self.issue_service.issue2cc_tbl.Select( |
| cnxn, cols=ISSUE2CC_COLS, shard_id=shard_id, issue_id=issue_ids) |
| notify_rows = self.issue_service.issue2notify_tbl.Select( |
| cnxn, cols=ISSUE2NOTIFY_COLS, shard_id=shard_id, issue_id=issue_ids) |
| fieldvalue_rows = self.issue_service.issue2fieldvalue_tbl.Select( |
| cnxn, cols=ISSUE2FIELDVALUE_COLS, shard_id=shard_id, |
| issue_id=issue_ids) |
| approvalvalue_rows = self.issue_service.issue2approvalvalue_tbl.Select( |
| cnxn, cols=ISSUE2APPROVALVALUE_COLS, issue_id=issue_ids) |
| phase_ids = [av_row[2] for av_row in approvalvalue_rows] |
| phase_rows = [] |
| if phase_ids: |
| phase_rows = self.issue_service.issuephasedef_tbl.Select( |
| cnxn, cols=ISSUEPHASEDEF_COLS, id=list(set(phase_ids))) |
| av_approver_rows = self.issue_service.issueapproval2approver_tbl.Select( |
| cnxn, cols=ISSUEAPPROVAL2APPROVER_COLS, issue_id=issue_ids) |
| if issue_ids: |
| ph = sql.PlaceHolders(issue_ids) |
| blocked_on_rows = self.issue_service.issuerelation_tbl.Select( |
| cnxn, cols=ISSUERELATION_COLS, issue_id=issue_ids, kind='blockedon', |
| order_by=[('issue_id', []), ('rank DESC', []), ('dst_issue_id', [])]) |
| blocking_rows = self.issue_service.issuerelation_tbl.Select( |
| cnxn, cols=ISSUERELATION_COLS, dst_issue_id=issue_ids, |
| kind='blockedon', order_by=[('issue_id', []), ('dst_issue_id', [])]) |
| unique_blocking = tuple( |
| row for row in blocking_rows if row not in blocked_on_rows) |
| merge_rows = self.issue_service.issuerelation_tbl.Select( |
| cnxn, cols=ISSUERELATION_COLS, |
| where=[('(issue_id IN (%s) OR dst_issue_id IN (%s))' % (ph, ph), |
| issue_ids + issue_ids), |
| ('kind != %s', ['blockedon'])]) |
| relation_rows = blocked_on_rows + unique_blocking + merge_rows |
| dangling_relation_rows = self.issue_service.danglingrelation_tbl.Select( |
| cnxn, cols=DANGLINGRELATION_COLS, issue_id=issue_ids) |
| else: |
| relation_rows = [] |
| dangling_relation_rows = [] |
| |
| issue_dict = self._DeserializeIssues( |
| cnxn, issue_rows, summary_rows, label_rows, component_rows, cc_rows, |
| notify_rows, fieldvalue_rows, relation_rows, dangling_relation_rows, |
| phase_rows, approvalvalue_rows, av_approver_rows) |
| logging.info('IssueTwoLevelCache.FetchItems returning: %r', issue_dict) |
| return issue_dict |
| |
| |
| class CommentTwoLevelCache(caches.AbstractTwoLevelCache): |
| """Class to manage RAM and memcache for IssueComment PBs.""" |
| |
| def __init__(self, cache_manager, issue_svc): |
| super(CommentTwoLevelCache, self).__init__( |
| cache_manager, 'comment', 'comment:', tracker_pb2.IssueComment, |
| max_size=settings.comment_cache_max_size) |
| self.issue_svc = issue_svc |
| |
| # pylint: disable=arguments-differ |
| def FetchItems(self, cnxn, keys, shard_id=None): |
| comment_rows = self.issue_svc.comment_tbl.Select(cnxn, |
| cols=COMMENT_COLS, id=keys, shard_id=shard_id) |
| |
| if len(comment_rows) < len(keys): |
| self.issue_svc.replication_lag_retries.increment() |
| logging.info('issue3755: expected %d, but got %d rows from shard %d', |
| len(keys), len(comment_rows), shard_id) |
| shard_id = None # Will use Primary DB. |
| comment_rows = self.issue_svc.comment_tbl.Select( |
| cnxn, cols=COMMENT_COLS, id=keys, shard_id=None) |
| logging.info( |
| 'Retry got %d comment rows from the primary DB', len(comment_rows)) |
| |
| cids = [row[0] for row in comment_rows] |
| commentcontent_ids = [row[-1] for row in comment_rows] |
| content_rows = self.issue_svc.commentcontent_tbl.Select( |
| cnxn, cols=COMMENTCONTENT_COLS, id=commentcontent_ids, |
| shard_id=shard_id) |
| approval_rows = self.issue_svc.issueapproval2comment_tbl.Select( |
| cnxn, cols=ISSUEAPPROVAL2COMMENT_COLS, comment_id=cids) |
| amendment_rows = self.issue_svc.issueupdate_tbl.Select( |
| cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids, shard_id=shard_id) |
| attachment_rows = self.issue_svc.attachment_tbl.Select( |
| cnxn, cols=ATTACHMENT_COLS, comment_id=cids, shard_id=shard_id) |
| importer_rows = self.issue_svc.commentimporter_tbl.Select( |
| cnxn, cols=COMMENTIMPORTER_COLS, comment_id=cids, shard_id=shard_id) |
| |
| comments = self.issue_svc._DeserializeComments( |
| comment_rows, content_rows, amendment_rows, attachment_rows, |
| approval_rows, importer_rows) |
| |
| comments_dict = {} |
| for comment in comments: |
| comments_dict[comment.id] = comment |
| |
| return comments_dict |
| |
| |
| class IssueService(object): |
| """The persistence layer for Monorail's issues, comments, and attachments.""" |
| spam_labels = ts_mon.CounterMetric( |
| 'monorail/issue_svc/spam_label', |
| 'Issues created, broken down by spam label.', |
| [ts_mon.StringField('type')]) |
| replication_lag_retries = ts_mon.CounterMetric( |
| 'monorail/issue_svc/replication_lag_retries', |
| 'Counts times that loading comments from a replica failed', |
| []) |
| issue_creations = ts_mon.CounterMetric( |
| 'monorail/issue_svc/issue_creations', |
| 'Counts times that issues were created', |
| []) |
| comment_creations = ts_mon.CounterMetric( |
| 'monorail/issue_svc/comment_creations', |
| 'Counts times that comments were created', |
| []) |
| |
| def __init__(self, project_service, config_service, cache_manager, |
| chart_service): |
| """Initialize this object so that it is ready to use. |
| |
| Args: |
| project_service: services object for project info. |
| config_service: services object for tracker configuration info. |
| cache_manager: local cache with distributed invalidation. |
| chart_service (ChartService): An instance of ChartService. |
| """ |
| # Tables that represent issue data. |
| self.issue_tbl = sql.SQLTableManager(ISSUE_TABLE_NAME) |
| self.issuesummary_tbl = sql.SQLTableManager(ISSUESUMMARY_TABLE_NAME) |
| self.issue2label_tbl = sql.SQLTableManager(ISSUE2LABEL_TABLE_NAME) |
| self.issue2component_tbl = sql.SQLTableManager(ISSUE2COMPONENT_TABLE_NAME) |
| self.issue2cc_tbl = sql.SQLTableManager(ISSUE2CC_TABLE_NAME) |
| self.issue2notify_tbl = sql.SQLTableManager(ISSUE2NOTIFY_TABLE_NAME) |
| self.issue2fieldvalue_tbl = sql.SQLTableManager(ISSUE2FIELDVALUE_TABLE_NAME) |
| self.issuerelation_tbl = sql.SQLTableManager(ISSUERELATION_TABLE_NAME) |
| self.danglingrelation_tbl = sql.SQLTableManager(DANGLINGRELATION_TABLE_NAME) |
| self.issueformerlocations_tbl = sql.SQLTableManager( |
| ISSUEFORMERLOCATIONS_TABLE_NAME) |
| self.issuesnapshot_tbl = sql.SQLTableManager(ISSUESNAPSHOT_TABLE_NAME) |
| self.issuesnapshot2cc_tbl = sql.SQLTableManager( |
| ISSUESNAPSHOT2CC_TABLE_NAME) |
| self.issuesnapshot2component_tbl = sql.SQLTableManager( |
| ISSUESNAPSHOT2COMPONENT_TABLE_NAME) |
| self.issuesnapshot2label_tbl = sql.SQLTableManager( |
| ISSUESNAPSHOT2LABEL_TABLE_NAME) |
| self.issuephasedef_tbl = sql.SQLTableManager(ISSUEPHASEDEF_TABLE_NAME) |
| self.issue2approvalvalue_tbl = sql.SQLTableManager( |
| ISSUE2APPROVALVALUE_TABLE_NAME) |
| self.issueapproval2approver_tbl = sql.SQLTableManager( |
| ISSUEAPPROVAL2APPROVER_TABLE_NAME) |
| self.issueapproval2comment_tbl = sql.SQLTableManager( |
| ISSUEAPPROVAL2COMMENT_TABLE_NAME) |
| |
| # Tables that represent comments. |
| self.comment_tbl = sql.SQLTableManager(COMMENT_TABLE_NAME) |
| self.commentcontent_tbl = sql.SQLTableManager(COMMENTCONTENT_TABLE_NAME) |
| self.commentimporter_tbl = sql.SQLTableManager(COMMENTIMPORTER_TABLE_NAME) |
| self.issueupdate_tbl = sql.SQLTableManager(ISSUEUPDATE_TABLE_NAME) |
| self.attachment_tbl = sql.SQLTableManager(ATTACHMENT_TABLE_NAME) |
| |
| # Tables for cron tasks. |
| self.reindexqueue_tbl = sql.SQLTableManager(REINDEXQUEUE_TABLE_NAME) |
| |
| # Tables for generating sequences of local IDs. |
| self.localidcounter_tbl = sql.SQLTableManager(LOCALIDCOUNTER_TABLE_NAME) |
| |
| # Like a dictionary {(project_id, local_id): issue_id} |
| # Use value centric cache here because we cannot store a tuple in the |
| # Invalidate table. |
| self.issue_id_2lc = IssueIDTwoLevelCache(cache_manager, self) |
| # Like a dictionary {issue_id: issue} |
| self.issue_2lc = IssueTwoLevelCache( |
| cache_manager, self, project_service, config_service) |
| |
| # Like a dictionary {comment_id: comment) |
| self.comment_2lc = CommentTwoLevelCache( |
| cache_manager, self) |
| |
| self._config_service = config_service |
| self.chart_service = chart_service |
| |
| ### Issue ID lookups |
| |
| def LookupIssueIDsFollowMoves(self, cnxn, project_local_id_pairs): |
| # type: (MonorailConnection, Sequence[Tuple(int, int)]) -> |
| # (Sequence[int], Sequence[Tuple(int, int)]) |
| """Find the global issue IDs given the project ID and local ID of each. |
| |
| If any (project_id, local_id) pairs refer to an issue that has been moved, |
| the issue ID will still be returned. |
| |
| Args: |
| cnxn: Monorail connection. |
| project_local_id_pairs: (project_id, local_id) pairs to look up. |
| |
| Returns: |
| A tuple of two items. |
| 1. A sequence of global issue IDs in the `project_local_id_pairs` order. |
| 2. A sequence of (project_id, local_id) containing each pair provided |
| for which no matching issue is found. |
| """ |
| |
| issue_id_dict, misses = self.issue_id_2lc.GetAll( |
| cnxn, project_local_id_pairs) |
| for miss in misses: |
| project_id, local_id = miss |
| issue_id = int( |
| self.issueformerlocations_tbl.SelectValue( |
| cnxn, |
| 'issue_id', |
| default=0, |
| project_id=project_id, |
| local_id=local_id)) |
| if issue_id: |
| misses.remove(miss) |
| issue_id_dict[miss] = issue_id |
| # Put the Issue IDs in the order specified by project_local_id_pairs |
| issue_ids = [ |
| issue_id_dict[pair] |
| for pair in project_local_id_pairs |
| if pair in issue_id_dict |
| ] |
| |
| return issue_ids, misses |
| |
| def LookupIssueIDs(self, cnxn, project_local_id_pairs): |
| """Find the global issue IDs given the project ID and local ID of each.""" |
| issue_id_dict, misses = self.issue_id_2lc.GetAll( |
| cnxn, project_local_id_pairs) |
| |
| # Put the Issue IDs in the order specified by project_local_id_pairs |
| issue_ids = [issue_id_dict[pair] for pair in project_local_id_pairs |
| if pair in issue_id_dict] |
| |
| return issue_ids, misses |
| |
| def LookupIssueID(self, cnxn, project_id, local_id): |
| """Find the global issue ID given the project ID and local ID.""" |
| issue_ids, _misses = self.LookupIssueIDs(cnxn, [(project_id, local_id)]) |
| try: |
| return issue_ids[0] |
| except IndexError: |
| raise exceptions.NoSuchIssueException() |
| |
| def ResolveIssueRefs( |
| self, cnxn, ref_projects, default_project_name, refs): |
| """Look up all the referenced issues and return their issue_ids. |
| |
| Args: |
| cnxn: connection to SQL database. |
| ref_projects: pre-fetched dict {project_name: project} of all projects |
| mentioned in the refs as well as the default project. |
| default_project_name: string name of the current project, this is used |
| when the project_name in a ref is None. |
| refs: list of (project_name, local_id) pairs. These are parsed from |
| textual references in issue descriptions, comments, and the input |
| in the blocked-on field. |
| |
| Returns: |
| A list of issue_ids for all the referenced issues. References to issues |
| in deleted projects and any issues not found are simply ignored. |
| """ |
| if not refs: |
| return [], [] |
| |
| project_local_id_pairs = [] |
| for project_name, local_id in refs: |
| project = ref_projects.get(project_name or default_project_name) |
| if not project or project.state == project_pb2.ProjectState.DELETABLE: |
| continue # ignore any refs to issues in deleted projects |
| project_local_id_pairs.append((project.project_id, local_id)) |
| |
| return self.LookupIssueIDs(cnxn, project_local_id_pairs) # tuple |
| |
| def LookupIssueRefs(self, cnxn, issue_ids): |
| """Return {issue_id: (project_name, local_id)} for each issue_id.""" |
| issue_dict, _misses = self.GetIssuesDict(cnxn, issue_ids) |
| return { |
| issue_id: (issue.project_name, issue.local_id) |
| for issue_id, issue in issue_dict.items()} |
| |
| ### Issue objects |
| |
| def CreateIssue( |
| self, |
| cnxn, |
| services, |
| issue, |
| marked_description, |
| attachments=None, |
| index_now=False, |
| importer_id=None): |
| """Create and store a new issue with all the given information. |
| |
| Args: |
| cnxn: connection to SQL database. |
| services: persistence layer for users, issues, and projects. |
| issue: Issue PB to create. |
| marked_description: issue description with initial HTML markup. |
| attachments: [(filename, contents, mimetype),...] attachments uploaded at |
| the time the comment was made. |
| index_now: True if the issue should be updated in the full text index. |
| importer_id: optional user ID of API client importing issues for users. |
| |
| Returns: |
| A tuple (the newly created Issue PB and Comment PB for the |
| issue description). |
| """ |
| project_id = issue.project_id |
| reporter_id = issue.reporter_id |
| timestamp = issue.opened_timestamp |
| config = self._config_service.GetProjectConfig(cnxn, project_id) |
| |
| iids_to_invalidate = set() |
| if len(issue.blocked_on_iids) != 0: |
| iids_to_invalidate.update(issue.blocked_on_iids) |
| if len(issue.blocking_iids) != 0: |
| iids_to_invalidate.update(issue.blocking_iids) |
| |
| comment = self._MakeIssueComment( |
| project_id, reporter_id, marked_description, |
| attachments=attachments, timestamp=timestamp, |
| is_description=True, importer_id=importer_id) |
| |
| reporter = services.user.GetUser(cnxn, reporter_id) |
| project = services.project.GetProject(cnxn, project_id) |
| reporter_auth = authdata.AuthData.FromUserID(cnxn, reporter_id, services) |
| is_project_member = framework_bizobj.UserIsInProject( |
| project, reporter_auth.effective_ids) |
| classification = services.spam.ClassifyIssue( |
| issue, comment, reporter, is_project_member) |
| |
| if classification['confidence_is_spam'] > settings.classifier_spam_thresh: |
| issue.is_spam = True |
| predicted_label = 'spam' |
| else: |
| predicted_label = 'ham' |
| |
| logging.info('classified new issue as %s' % predicted_label) |
| self.spam_labels.increment({'type': predicted_label}) |
| |
| # Create approval surveys |
| approval_comments = [] |
| if len(issue.approval_values) != 0: |
| approval_defs_by_id = {ad.approval_id: ad for ad in config.approval_defs} |
| for av in issue.approval_values: |
| ad = approval_defs_by_id.get(av.approval_id) |
| if ad: |
| survey = '' |
| if ad.survey: |
| questions = ad.survey.split('\n') |
| survey = '\n'.join(['<b>' + q + '</b>' for q in questions]) |
| approval_comments.append(self._MakeIssueComment( |
| project_id, reporter_id, survey, timestamp=timestamp, |
| is_description=True, approval_id=ad.approval_id)) |
| else: |
| logging.info('Could not find ApprovalDef with approval_id %r', |
| av.approval_id) |
| |
| issue.local_id = self.AllocateNextLocalID(cnxn, project_id) |
| self.issue_creations.increment() |
| issue_id = self.InsertIssue(cnxn, issue) |
| comment.issue_id = issue_id |
| self.InsertComment(cnxn, comment) |
| for approval_comment in approval_comments: |
| approval_comment.issue_id = issue_id |
| self.InsertComment(cnxn, approval_comment) |
| |
| issue.issue_id = issue_id |
| |
| # ClassifyIssue only returns confidence_is_spam, but |
| # RecordClassifierIssueVerdict records confidence of |
| # ham or spam. Therefore if ham, invert score. |
| confidence = classification['confidence_is_spam'] |
| if not issue.is_spam: |
| confidence = 1.0 - confidence |
| |
| services.spam.RecordClassifierIssueVerdict( |
| cnxn, issue, predicted_label=='spam', |
| confidence, classification['failed_open']) |
| |
| if permissions.HasRestrictions(issue, 'view'): |
| self._config_service.InvalidateMemcache( |
| [issue], key_prefix='nonviewable:') |
| |
| # Add a comment to existing issues saying they are now blocking or |
| # blocked on this issue. |
| blocked_add_issues = self.GetIssues(cnxn, issue.blocked_on_iids) |
| for add_issue in blocked_add_issues: |
| self.CreateIssueComment( |
| cnxn, add_issue, reporter_id, content='', |
| amendments=[tracker_bizobj.MakeBlockingAmendment( |
| [(issue.project_name, issue.local_id)], [], |
| default_project_name=add_issue.project_name)]) |
| blocking_add_issues = self.GetIssues(cnxn, issue.blocking_iids) |
| for add_issue in blocking_add_issues: |
| self.CreateIssueComment( |
| cnxn, add_issue, reporter_id, content='', |
| amendments=[tracker_bizobj.MakeBlockedOnAmendment( |
| [(issue.project_name, issue.local_id)], [], |
| default_project_name=add_issue.project_name)]) |
| |
| self._UpdateIssuesModified( |
| cnxn, iids_to_invalidate, modified_timestamp=timestamp) |
| |
| if index_now: |
| tracker_fulltext.IndexIssues( |
| cnxn, [issue], services.user, self, self._config_service) |
| else: |
| self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id]) |
| |
| return issue, comment |
| |
| def AllocateNewLocalIDs(self, cnxn, issues): |
| # Filter to just the issues that need new local IDs. |
| issues = [issue for issue in issues if issue.local_id < 0] |
| |
| for issue in issues: |
| if issue.local_id < 0: |
| issue.local_id = self.AllocateNextLocalID(cnxn, issue.project_id) |
| |
| self.UpdateIssues(cnxn, issues) |
| |
| logging.info("AllocateNewLocalIDs") |
| |
| def GetAllIssuesInProject( |
| self, cnxn, project_id, min_local_id=None, use_cache=True): |
| """Special query to efficiently get ALL issues in a project. |
| |
| This is not done while the user is waiting, only by backround tasks. |
| |
| Args: |
| cnxn: connection to SQL database. |
| project_id: the ID of the project. |
| min_local_id: optional int to start at. |
| use_cache: optional boolean to turn off using the cache. |
| |
| Returns: |
| A list of Issue protocol buffers for all issues. |
| """ |
| all_local_ids = self.GetAllLocalIDsInProject( |
| cnxn, project_id, min_local_id=min_local_id) |
| return self.GetIssuesByLocalIDs( |
| cnxn, project_id, all_local_ids, use_cache=use_cache) |
| |
| def GetAnyOnHandIssue(self, issue_ids, start=None, end=None): |
| """Get any one issue from RAM or memcache, otherwise return None.""" |
| return self.issue_2lc.GetAnyOnHandItem(issue_ids, start=start, end=end) |
| |
| def GetIssuesDict(self, cnxn, issue_ids, use_cache=True, shard_id=None): |
| # type: (MonorailConnection, Collection[int], Optional[Boolean], |
| # Optional[int]) -> (Dict[int, Issue], Sequence[int]) |
| """Get a dict {iid: issue} from the DB or cache. |
| |
| Returns: |
| A dict {iid: issue} from the DB or cache. |
| A sequence of iid that could not be found. |
| """ |
| issue_dict, missed_iids = self.issue_2lc.GetAll( |
| cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id) |
| if not use_cache: |
| for issue in issue_dict.values(): |
| issue.assume_stale = False |
| return issue_dict, missed_iids |
| |
| def GetIssues(self, cnxn, issue_ids, use_cache=True, shard_id=None): |
| # type: (MonorailConnection, Sequence[int], Optional[Boolean], |
| # Optional[int]) -> (Sequence[int]) |
| """Get a list of Issue PBs from the DB or cache. |
| |
| Args: |
| cnxn: connection to SQL database. |
| issue_ids: integer global issue IDs of the issues. |
| use_cache: optional boolean to turn off using the cache. |
| shard_id: optional int shard_id to limit retrieval. |
| |
| Returns: |
| A list of Issue PBs in the same order as the given issue_ids. |
| """ |
| issue_dict, _misses = self.GetIssuesDict( |
| cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id) |
| |
| # Return a list that is ordered the same as the given issue_ids. |
| issue_list = [issue_dict[issue_id] for issue_id in issue_ids |
| if issue_id in issue_dict] |
| |
| return issue_list |
| |
| def GetIssue(self, cnxn, issue_id, use_cache=True): |
| """Get one Issue PB from the DB. |
| |
| Args: |
| cnxn: connection to SQL database. |
| issue_id: integer global issue ID of the issue. |
| use_cache: optional boolean to turn off using the cache. |
| |
| Returns: |
| The requested Issue protocol buffer. |
| |
| Raises: |
| NoSuchIssueException: the issue was not found. |
| """ |
| issues = self.GetIssues(cnxn, [issue_id], use_cache=use_cache) |
| try: |
| return issues[0] |
| except IndexError: |
| raise exceptions.NoSuchIssueException() |
| |
| def GetIssuesByLocalIDs( |
| self, cnxn, project_id, local_id_list, use_cache=True, shard_id=None): |
| """Get all the requested issues. |
| |
| Args: |
| cnxn: connection to SQL database. |
| project_id: int ID of the project to which the issues belong. |
| local_id_list: list of integer local IDs for the requested issues. |
| use_cache: optional boolean to turn off using the cache. |
| shard_id: optional int shard_id to choose a replica. |
| |
| Returns: |
| List of Issue PBs for the requested issues. The result Issues |
| will be ordered in the same order as local_id_list. |
| """ |
| issue_ids_to_fetch, _misses = self.LookupIssueIDs( |
| cnxn, [(project_id, local_id) for local_id in local_id_list]) |
| issues = self.GetIssues( |
| cnxn, issue_ids_to_fetch, use_cache=use_cache, shard_id=shard_id) |
| return issues |
| |
| def GetIssueByLocalID(self, cnxn, project_id, local_id, use_cache=True): |
| """Get one Issue PB from the DB. |
| |
| Args: |
| cnxn: connection to SQL database. |
| project_id: the ID of the project to which the issue belongs. |
| local_id: integer local ID of the issue. |
| use_cache: optional boolean to turn off using the cache. |
| |
| Returns: |
| The requested Issue protocol buffer. |
| """ |
| issues = self.GetIssuesByLocalIDs( |
| cnxn, project_id, [local_id], use_cache=use_cache) |
| try: |
| return issues[0] |
| except IndexError: |
| raise exceptions.NoSuchIssueException( |
| 'The issue %s:%d does not exist.' % (project_id, local_id)) |
| |
| def GetOpenAndClosedIssues(self, cnxn, issue_ids): |
| """Return the requested issues in separate open and closed lists. |
| |
| Args: |
| cnxn: connection to SQL database. |
| issue_ids: list of int issue issue_ids. |
| |
| Returns: |
| A pair of lists, the first with open issues, second with closed issues. |
| """ |
| if not issue_ids: |
| return [], [] # make one common case efficient |
| |
| issues = self.GetIssues(cnxn, issue_ids) |
| project_ids = {issue.project_id for issue in issues} |
| configs = self._config_service.GetProjectConfigs(cnxn, project_ids) |
| open_issues = [] |
| closed_issues = [] |
| for issue in issues: |
| config = configs[issue.project_id] |
| if tracker_helpers.MeansOpenInProject( |
| tracker_bizobj.GetStatus(issue), config): |
| open_issues.append(issue) |
| else: |
| closed_issues.append(issue) |
| |
| return open_issues, closed_issues |
| |
| # TODO(crbug.com/monorail/7822): Delete this method when V0 API retired. |
| def GetCurrentLocationOfMovedIssue(self, cnxn, project_id, local_id): |
| """Return the current location of a moved issue based on old location.""" |
| issue_id = int(self.issueformerlocations_tbl.SelectValue( |
| cnxn, 'issue_id', default=0, project_id=project_id, local_id=local_id)) |
| if not issue_id: |
| return None, None |
| project_id, local_id = self.issue_tbl.SelectRow( |
| cnxn, cols=['project_id', 'local_id'], id=issue_id) |
| return project_id, local_id |
| |
| def GetPreviousLocations(self, cnxn, issue): |
| """Get all the previous locations of an issue.""" |
| location_rows = self.issueformerlocations_tbl.Select( |
| cnxn, cols=['project_id', 'local_id'], issue_id=issue.issue_id) |
| locations = [(pid, local_id) for (pid, local_id) in location_rows |
| if pid != issue.project_id or local_id != issue.local_id] |
| return locations |
| |
| def GetCommentsByUser(self, cnxn, user_id): |
| """Get all comments created by a user""" |
| comments = self.GetComments(cnxn, commenter_id=user_id, |
| is_description=False, limit=10000) |
| return comments |
| |
| def GetIssueActivity(self, cnxn, num=50, before=None, after=None, |
| project_ids=None, user_ids=None, ascending=False): |
| |
| if project_ids: |
| use_clause = ( |
| 'USE INDEX (project_id) USE INDEX FOR ORDER BY (project_id)') |
| elif user_ids: |
| use_clause = ( |
| 'USE INDEX (commenter_id) USE INDEX FOR ORDER BY (commenter_id)') |
| else: |
| use_clause = '' |
| |
| # TODO(jrobbins): make this into a persist method. |
| # TODO(jrobbins): this really needs permission checking in SQL, which |
| # will be slow. |
| where_conds = [('Issue.id = Comment.issue_id', [])] |
| if project_ids is not None: |
| cond_str = 'Comment.project_id IN (%s)' % sql.PlaceHolders(project_ids) |
| where_conds.append((cond_str, project_ids)) |
| if user_ids is not None: |
| cond_str = 'Comment.commenter_id IN (%s)' % sql.PlaceHolders(user_ids) |
| where_conds.append((cond_str, user_ids)) |
| |
| if before: |
| where_conds.append(('created < %s', [before])) |
| if after: |
| where_conds.append(('created > %s', [after])) |
| if ascending: |
| order_by = [('created', [])] |
| else: |
| order_by = [('created DESC', [])] |
| |
| comments = self.GetComments( |
| cnxn, joins=[('Issue', [])], deleted_by=None, where=where_conds, |
| use_clause=use_clause, order_by=order_by, limit=num + 1) |
| return comments |
| |
| def GetIssueIDsReportedByUser(self, cnxn, user_id): |
| """Get all issue IDs created by a user""" |
| rows = self.issue_tbl.Select(cnxn, cols=['id'], reporter_id=user_id, |
| limit=10000) |
| return [row[0] for row in rows] |
| |
| def InsertIssue(self, cnxn, issue): |
| """Store the given issue in SQL. |
| |
| Args: |
| cnxn: connection to SQL database. |
| issue: Issue PB to insert into the database. |
| |
| Returns: |
| The int issue_id of the newly created issue. |
| """ |
| status_id = self._config_service.LookupStatusID( |
| cnxn, issue.project_id, issue.status) |
| row = ( |
| issue.project_id, issue.local_id, status_id, issue.owner_id or |
| None, issue.reporter_id, issue.opened_timestamp, issue.closed_timestamp, |
| issue.modified_timestamp, issue.owner_modified_timestamp, |
| issue.status_modified_timestamp, issue.component_modified_timestamp, |
| issue.migration_modified_timestamp, issue.derived_owner_id or None, |
| self._config_service.LookupStatusID( |
| cnxn, issue.project_id, issue.derived_status), bool(issue.deleted), |
| issue.star_count, issue.attachment_count, issue.is_spam) |
| # ISSUE_COLs[1:] to skip setting the ID |
| # Insert into the Primary DB. |
| generated_ids = self.issue_tbl.InsertRows( |
| cnxn, ISSUE_COLS[1:], [row], commit=False, return_generated_ids=True) |
| issue_id = generated_ids[0] |
| issue.issue_id = issue_id |
| self.issue_tbl.Update( |
| cnxn, {'shard': issue_id % settings.num_logical_shards}, |
| id=issue.issue_id, commit=False) |
| |
| self._UpdateIssuesSummary(cnxn, [issue], commit=False) |
| self._UpdateIssuesLabels(cnxn, [issue], commit=False) |
| self._UpdateIssuesFields(cnxn, [issue], commit=False) |
| self._UpdateIssuesComponents(cnxn, [issue], commit=False) |
| self._UpdateIssuesCc(cnxn, [issue], commit=False) |
| self._UpdateIssuesNotify(cnxn, [issue], commit=False) |
| self._UpdateIssuesRelation(cnxn, [issue], commit=False) |
| self._UpdateIssuesApprovals(cnxn, issue, commit=False) |
| self.chart_service.StoreIssueSnapshots(cnxn, [issue], commit=False) |
| cnxn.Commit() |
| self._config_service.InvalidateMemcache([issue]) |
| |
| return issue_id |
| |
| def UpdateIssues( |
| self, cnxn, issues, update_cols=None, just_derived=False, commit=True, |
| invalidate=True): |
| """Update the given issues in SQL. |
| |
| Args: |
| cnxn: connection to SQL database. |
| issues: list of issues to update, these must have been loaded with |
| use_cache=False so that issue.assume_stale is False. |
| update_cols: optional list of just the field names to update. |
| just_derived: set to True when only updating derived fields. |
| commit: set to False to skip the DB commit and do it in the caller. |
| invalidate: set to False to leave cache invalidatation to the caller. |
| """ |
| if not issues: |
| return |
| |
| for issue in issues: # slow, but mysql will not allow REPLACE rows. |
| assert not issue.assume_stale, ( |
| 'issue2514: Storing issue that might be stale: %r' % issue) |
| delta = { |
| 'project_id': |
| issue.project_id, |
| 'local_id': |
| issue.local_id, |
| 'owner_id': |
| issue.owner_id or None, |
| 'status_id': |
| self._config_service.LookupStatusID( |
| cnxn, issue.project_id, issue.status) or None, |
| 'opened': |
| issue.opened_timestamp, |
| 'closed': |
| issue.closed_timestamp, |
| 'modified': |
| issue.modified_timestamp, |
| 'owner_modified': |
| issue.owner_modified_timestamp, |
| 'status_modified': |
| issue.status_modified_timestamp, |
| 'component_modified': |
| issue.component_modified_timestamp, |
| 'migration_modified': |
| issue.migration_modified_timestamp, |
| 'derived_owner_id': |
| issue.derived_owner_id or None, |
| 'derived_status_id': |
| self._config_service.LookupStatusID( |
| cnxn, issue.project_id, issue.derived_status) or None, |
| 'deleted': |
| bool(issue.deleted), |
| 'star_count': |
| issue.star_count, |
| 'attachment_count': |
| issue.attachment_count, |
| 'is_spam': |
| issue.is_spam, |
| } |
| if update_cols is not None: |
| delta = {key: val for key, val in delta.items() |
| if key in update_cols} |
| self.issue_tbl.Update(cnxn, delta, id=issue.issue_id, commit=False) |
| |
| if not update_cols: |
| self._UpdateIssuesLabels(cnxn, issues, commit=False) |
| self._UpdateIssuesCc(cnxn, issues, commit=False) |
| self._UpdateIssuesFields(cnxn, issues, commit=False) |
| self._UpdateIssuesComponents(cnxn, issues, commit=False) |
| self._UpdateIssuesNotify(cnxn, issues, commit=False) |
| if not just_derived: |
| self._UpdateIssuesSummary(cnxn, issues, commit=False) |
| self._UpdateIssuesRelation(cnxn, issues, commit=False) |
| |
| self.chart_service.StoreIssueSnapshots(cnxn, issues, commit=False) |
| |
| iids_to_invalidate = [issue.issue_id for issue in issues] |
| if just_derived and invalidate: |
| self.issue_2lc.InvalidateAllKeys(cnxn, iids_to_invalidate) |
| elif invalidate: |
| self.issue_2lc.InvalidateKeys(cnxn, iids_to_invalidate) |
| if commit: |
| cnxn.Commit() |
| if invalidate: |
| self._config_service.InvalidateMemcache(issues) |
| |
| def UpdateIssue( |
| self, cnxn, issue, update_cols=None, just_derived=False, commit=True, |
| invalidate=True): |
| """Update the given issue in SQL. |
| |
| Args: |
| cnxn: connection to SQL database. |
| issue: the issue to update. |
| update_cols: optional list of just the field names to update. |
| just_derived: set to True when only updating derived fields. |
| commit: set to False to skip the DB commit and do it in the caller. |
| invalidate: set to False to leave cache invalidatation to the caller. |
| """ |
| self.UpdateIssues( |
| cnxn, [issue], update_cols=update_cols, just_derived=just_derived, |
| commit=commit, invalidate=invalidate) |
| |
| def _UpdateIssuesSummary(self, cnxn, issues, commit=True): |
| """Update the IssueSummary table rows for the given issues.""" |
| self.issuesummary_tbl.InsertRows( |
| cnxn, ISSUESUMMARY_COLS, |
| [(issue.issue_id, issue.summary) for issue in issues], |
| replace=True, commit=commit) |
| |
| def _UpdateIssuesLabels(self, cnxn, issues, commit=True): |
| """Update the Issue2Label table rows for the given issues.""" |
| label_rows = [] |
| for issue in issues: |
| issue_shard = issue.issue_id % settings.num_logical_shards |
| # TODO(jrobbins): If the user adds many novel labels in one issue update, |
| # that could be slow. Solution is to add all new labels in a batch first. |
| label_rows.extend( |
| (issue.issue_id, |
| self._config_service.LookupLabelID(cnxn, issue.project_id, label), |
| False, |
| issue_shard) |
| for label in issue.labels) |
| label_rows.extend( |
| (issue.issue_id, |
| self._config_service.LookupLabelID(cnxn, issue.project_id, label), |
| True, |
| issue_shard) |
| for label in issue.derived_labels) |
| |
| self.issue2label_tbl.Delete( |
| cnxn, issue_id=[issue.issue_id for issue in issues], |
| commit=False) |
| self.issue2label_tbl.InsertRows( |
| cnxn, ISSUE2LABEL_COLS + ['issue_shard'], |
| label_rows, ignore=True, commit=commit) |
| |
| def _UpdateIssuesFields(self, cnxn, issues, commit=True): |
| """Update the Issue2FieldValue table rows for the given issues.""" |
| fieldvalue_rows = [] |
| for issue in issues: |
| issue_shard = issue.issue_id % settings.num_logical_shards |
| for fv in issue.field_values: |
| fieldvalue_rows.append( |
| (issue.issue_id, fv.field_id, fv.int_value, fv.str_value, |
| fv.user_id or None, fv.date_value, fv.url_value, fv.derived, |
| fv.phase_id or None, issue_shard)) |
| |
| self.issue2fieldvalue_tbl.Delete( |
| cnxn, issue_id=[issue.issue_id for issue in issues], commit=False) |
| self.issue2fieldvalue_tbl.InsertRows( |
| cnxn, ISSUE2FIELDVALUE_COLS + ['issue_shard'], |
| fieldvalue_rows, commit=commit) |
| |
| def _UpdateIssuesComponents(self, cnxn, issues, commit=True): |
| """Update the Issue2Component table rows for the given issues.""" |
| issue2component_rows = [] |
| for issue in issues: |
| issue_shard = issue.issue_id % settings.num_logical_shards |
| issue2component_rows.extend( |
| (issue.issue_id, component_id, False, issue_shard) |
| for component_id in issue.component_ids) |
| issue2component_rows.extend( |
| (issue.issue_id, component_id, True, issue_shard) |
| for component_id in issue.derived_component_ids) |
| |
| self.issue2component_tbl.Delete( |
| cnxn, issue_id=[issue.issue_id for issue in issues], commit=False) |
| self.issue2component_tbl.InsertRows( |
| cnxn, ISSUE2COMPONENT_COLS + ['issue_shard'], |
| issue2component_rows, ignore=True, commit=commit) |
| |
| def _UpdateIssuesCc(self, cnxn, issues, commit=True): |
| """Update the Issue2Cc table rows for the given issues.""" |
| cc_rows = [] |
| for issue in issues: |
| issue_shard = issue.issue_id % settings.num_logical_shards |
| cc_rows.extend( |
| (issue.issue_id, cc_id, False, issue_shard) |
| for cc_id in issue.cc_ids) |
| cc_rows.extend( |
| (issue.issue_id, cc_id, True, issue_shard) |
| for cc_id in issue.derived_cc_ids) |
| |
| self.issue2cc_tbl.Delete( |
| cnxn, issue_id=[issue.issue_id for issue in issues], commit=False) |
| self.issue2cc_tbl.InsertRows( |
| cnxn, ISSUE2CC_COLS + ['issue_shard'], |
| cc_rows, ignore=True, commit=commit) |
| |
| def _UpdateIssuesNotify(self, cnxn, issues, commit=True): |
| """Update the Issue2Notify table rows for the given issues.""" |
| notify_rows = [] |
| for issue in issues: |
| derived_rows = [[issue.issue_id, email] |
| for email in issue.derived_notify_addrs] |
| notify_rows.extend(derived_rows) |
| |
| self.issue2notify_tbl.Delete( |
| cnxn, issue_id=[issue.issue_id for issue in issues], commit=False) |
| self.issue2notify_tbl.InsertRows( |
| cnxn, ISSUE2NOTIFY_COLS, notify_rows, ignore=True, commit=commit) |
| |
| def _UpdateIssuesRelation(self, cnxn, issues, commit=True): |
| """Update the IssueRelation table rows for the given issues.""" |
| relation_rows = [] |
| blocking_rows = [] |
| dangling_relation_rows = [] |
| for issue in issues: |
| for i, dst_issue_id in enumerate(issue.blocked_on_iids): |
| rank = issue.blocked_on_ranks[i] |
| relation_rows.append((issue.issue_id, dst_issue_id, 'blockedon', rank)) |
| for dst_issue_id in issue.blocking_iids: |
| blocking_rows.append((dst_issue_id, issue.issue_id, 'blockedon')) |
| for dst_ref in issue.dangling_blocked_on_refs: |
| if dst_ref.ext_issue_identifier: |
| dangling_relation_rows.append(( |
| issue.issue_id, None, None, |
| dst_ref.ext_issue_identifier, 'blockedon')) |
| else: |
| dangling_relation_rows.append(( |
| issue.issue_id, dst_ref.project, dst_ref.issue_id, |
| None, 'blockedon')) |
| for dst_ref in issue.dangling_blocking_refs: |
| if dst_ref.ext_issue_identifier: |
| dangling_relation_rows.append(( |
| issue.issue_id, None, None, |
| dst_ref.ext_issue_identifier, 'blocking')) |
| else: |
| dangling_relation_rows.append(( |
| issue.issue_id, dst_ref.project, dst_ref.issue_id, |
| dst_ref.ext_issue_identifier, 'blocking')) |
| if issue.merged_into: |
| relation_rows.append(( |
| issue.issue_id, issue.merged_into, 'mergedinto', None)) |
| if issue.merged_into_external: |
| dangling_relation_rows.append(( |
| issue.issue_id, None, None, |
| issue.merged_into_external, 'mergedinto')) |
| |
| old_blocking = self.issuerelation_tbl.Select( |
| cnxn, cols=ISSUERELATION_COLS[:-1], |
| dst_issue_id=[issue.issue_id for issue in issues], kind='blockedon') |
| relation_rows.extend([ |
| (row + (0,)) for row in blocking_rows if row not in old_blocking]) |
| delete_rows = [row for row in old_blocking if row not in blocking_rows] |
| |
| for issue_id, dst_issue_id, kind in delete_rows: |
| self.issuerelation_tbl.Delete(cnxn, issue_id=issue_id, |
| dst_issue_id=dst_issue_id, kind=kind, commit=False) |
| self.issuerelation_tbl.Delete( |
| cnxn, issue_id=[issue.issue_id for issue in issues], commit=False) |
| self.issuerelation_tbl.InsertRows( |
| cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit) |
| self.danglingrelation_tbl.Delete( |
| cnxn, issue_id=[issue.issue_id for issue in issues], commit=False) |
| self.danglingrelation_tbl.InsertRows( |
| cnxn, DANGLINGRELATION_COLS, dangling_relation_rows, ignore=True, |
| commit=commit) |
| |
| def _UpdateIssuesModified( |
| self, cnxn, iids, modified_timestamp=None, invalidate=True): |
| """Store a modified timestamp for each of the specified issues.""" |
| if not iids: |
| return |
| delta = {'modified': modified_timestamp or int(time.time())} |
| self.issue_tbl.Update(cnxn, delta, id=iids, commit=False) |
| if invalidate: |
| self.InvalidateIIDs(cnxn, iids) |
| |
| def _UpdateIssuesApprovals(self, cnxn, issue, commit=True): |
| """Update the Issue2ApprovalValue table rows for the given issue.""" |
| self.issue2approvalvalue_tbl.Delete( |
| cnxn, issue_id=issue.issue_id, commit=commit) |
| av_rows = [(av.approval_id, issue.issue_id, av.phase_id, |
| av.status.name.lower(), av.setter_id, av.set_on) for |
| av in issue.approval_values] |
| self.issue2approvalvalue_tbl.InsertRows( |
| cnxn, ISSUE2APPROVALVALUE_COLS, av_rows, commit=commit) |
| |
| approver_rows = [] |
| for av in issue.approval_values: |
| approver_rows.extend([(av.approval_id, approver_id, issue.issue_id) |
| for approver_id in av.approver_ids]) |
| self.issueapproval2approver_tbl.Delete( |
| cnxn, issue_id=issue.issue_id, commit=commit) |
| self.issueapproval2approver_tbl.InsertRows( |
| cnxn, ISSUEAPPROVAL2APPROVER_COLS, approver_rows, commit=commit) |
| |
| def UpdateIssueStructure(self, cnxn, config, issue, template, reporter_id, |
| comment_content, commit=True, invalidate=True): |
| """Converts the phases and approvals structure of the issue into the |
| structure of the given template.""" |
| # TODO(jojwang): Remove Field defs that belong to any removed approvals. |
| approval_defs_by_id = {ad.approval_id: ad for ad in config.approval_defs} |
| issue_avs_by_id = {av.approval_id: av for av in issue.approval_values} |
| |
| new_approval_surveys = [] |
| new_issue_approvals = [] |
| |
| for template_av in template.approval_values: |
| existing_issue_av = issue_avs_by_id.get(template_av.approval_id) |
| |
| # Update all approval surveys so latest ApprovalDef survey changes |
| # appear in the converted issue's approval values. |
| ad = approval_defs_by_id.get(template_av.approval_id) |
| new_av_approver_ids = [] |
| if ad: |
| new_av_approver_ids = ad.approver_ids |
| new_approval_surveys.append( |
| self._MakeIssueComment( |
| issue.project_id, reporter_id, ad.survey, |
| is_description=True, approval_id=ad.approval_id)) |
| else: |
| logging.info('ApprovalDef not found for approval %r', template_av) |
| |
| # Keep approval values as-is if it exists in issue and template |
| if existing_issue_av: |
| new_av = tracker_bizobj.MakeApprovalValue( |
| existing_issue_av.approval_id, |
| approver_ids=existing_issue_av.approver_ids, |
| status=existing_issue_av.status, |
| setter_id=existing_issue_av.setter_id, |
| set_on=existing_issue_av.set_on, |
| phase_id=template_av.phase_id) |
| new_issue_approvals.append(new_av) |
| else: |
| new_av = tracker_bizobj.MakeApprovalValue( |
| template_av.approval_id, approver_ids=new_av_approver_ids, |
| status=template_av.status, phase_id=template_av.phase_id) |
| new_issue_approvals.append(new_av) |
| |
| template_phase_by_name = { |
| phase.name.lower(): phase for phase in template.phases} |
| issue_phase_by_id = {phase.phase_id: phase for phase in issue.phases} |
| updated_fvs = [] |
| # Trim issue FieldValues or update FieldValue phase_ids |
| for fv in issue.field_values: |
| # If a fv's phase has the same name as a template's phase, update |
| # the fv's phase_id to that of the template phase's. Otherwise, |
| # remove the fv. |
| if fv.phase_id: |
| issue_phase = issue_phase_by_id.get(fv.phase_id) |
| if issue_phase and issue_phase.name: |
| template_phase = template_phase_by_name.get(issue_phase.name.lower()) |
| # TODO(jojwang): monorail:4693, remove this after all 'stable-full' |
| # gates have been renamed to 'stable'. |
| if not template_phase: |
| template_phase = template_phase_by_name.get( |
| FLT_EQUIVALENT_GATES.get(issue_phase.name.lower())) |
| if template_phase: |
| fv.phase_id = template_phase.phase_id |
| updated_fvs.append(fv) |
| # keep all fvs that do not belong to phases. |
| else: |
| updated_fvs.append(fv) |
| |
| fd_names_by_id = {fd.field_id: fd.field_name for fd in config.field_defs} |
| amendment = tracker_bizobj.MakeApprovalStructureAmendment( |
| [fd_names_by_id.get(av.approval_id) for av in new_issue_approvals], |
| [fd_names_by_id.get(av.approval_id) for av in issue.approval_values]) |
| |
| # Update issue structure in RAM. |
| issue.approval_values = new_issue_approvals |
| issue.phases = template.phases |
| issue.field_values = updated_fvs |
| |
| # Update issue structure in DB. |
| for survey in new_approval_surveys: |
| survey.issue_id = issue.issue_id |
| self.InsertComment(cnxn, survey, commit=False) |
| self._UpdateIssuesApprovals(cnxn, issue, commit=False) |
| self._UpdateIssuesFields(cnxn, [issue], commit=False) |
| comment_pb = self.CreateIssueComment( |
| cnxn, issue, reporter_id, comment_content, |
| amendments=[amendment], commit=False) |
| |
| if commit: |
| cnxn.Commit() |
| |
| if invalidate: |
| self.InvalidateIIDs(cnxn, [issue.issue_id]) |
| |
| return comment_pb |
| |
| def DeltaUpdateIssue( |
| self, cnxn, services, reporter_id, project_id, |
| config, issue, delta, index_now=False, comment=None, attachments=None, |
| iids_to_invalidate=None, rules=None, predicate_asts=None, |
| is_description=False, timestamp=None, kept_attachments=None, |
| importer_id=None, inbound_message=None): |
| """Update the issue in the database and return a set of update tuples. |
| |
| Args: |
| cnxn: connection to SQL database. |
| services: connections to persistence layer. |
| reporter_id: user ID of the user making this change. |
| project_id: int ID for the current project. |
| config: ProjectIssueConfig PB for this project. |
| issue: Issue PB of issue to update. |
| delta: IssueDelta object of fields to update. |
| index_now: True if the issue should be updated in the full text index. |
| comment: This should be the content of the comment |
| corresponding to this change. |
| attachments: List [(filename, contents, mimetype),...] of attachments. |
| iids_to_invalidate: optional set of issue IDs that need to be invalidated. |
| If provided, affected issues will be accumulated here and, the caller |
| must call InvalidateIIDs() afterwards. |
| rules: optional list of preloaded FilterRule PBs for this project. |
| predicate_asts: optional list of QueryASTs for the rules. If rules are |
| provided, then predicate_asts should also be provided. |
| is_description: True if the comment is a new description for the issue. |
| timestamp: int timestamp set during testing, otherwise defaults to |
| int(time.time()). |
| kept_attachments: This should be a list of int attachment ids for |
| attachments kept from previous descriptions, if the comment is |
| a change to the issue description |
| importer_id: optional ID of user ID for an API client that is importing |
| issues and attributing them to other users. |
| inbound_message: optional string full text of an email that caused |
| this comment to be added. |
| |
| Returns: |
| A tuple (amendments, comment_pb) with a list of Amendment PBs that |
| describe the set of metadata updates that the user made, and the |
| resulting IssueComment (or None if no comment was created). |
| """ |
| timestamp = timestamp or int(time.time()) |
| old_effective_owner = tracker_bizobj.GetOwnerId(issue) |
| old_effective_status = tracker_bizobj.GetStatus(issue) |
| old_components = set(issue.component_ids) |
| |
| logging.info( |
| 'Bulk edit to project_id %s issue.local_id %s, comment %r', |
| project_id, issue.local_id, comment) |
| if iids_to_invalidate is None: |
| iids_to_invalidate = set([issue.issue_id]) |
| invalidate = True |
| else: |
| iids_to_invalidate.add(issue.issue_id) |
| invalidate = False # Caller will do it. |
| |
| # Store each updated value in the issue PB, and compute Update PBs |
| amendments, impacted_iids = tracker_bizobj.ApplyIssueDelta( |
| cnxn, self, issue, delta, config) |
| iids_to_invalidate.update(impacted_iids) |
| |
| # If this was a no-op with no comment, bail out and don't save, |
| # invalidate, or re-index anything. |
| if (not amendments and (not comment or not comment.strip()) and |
| not attachments): |
| logging.info('No amendments, comment, attachments: this is a no-op.') |
| return [], None |
| |
| # Note: no need to check for collisions when the user is doing a delta. |
| |
| # update the modified_timestamp for any comment added, even if it was |
| # just a text comment with no issue fields changed. |
| issue.modified_timestamp = timestamp |
| issue.migration_modified_timestamp = timestamp |
| |
| # Update the closed timestamp before filter rules so that rules |
| # can test for closed_timestamp, and also after filter rules |
| # so that closed_timestamp will be set if the issue is closed by the rule. |
| tracker_helpers.UpdateClosedTimestamp(config, issue, old_effective_status) |
| if rules is None: |
| logging.info('Rules were not given') |
| rules = services.features.GetFilterRules(cnxn, config.project_id) |
| predicate_asts = filterrules_helpers.ParsePredicateASTs( |
| rules, config, []) |
| |
| filterrules_helpers.ApplyGivenRules( |
| cnxn, services, issue, config, rules, predicate_asts) |
| tracker_helpers.UpdateClosedTimestamp(config, issue, old_effective_status) |
| if old_effective_owner != tracker_bizobj.GetOwnerId(issue): |
| issue.owner_modified_timestamp = timestamp |
| if old_effective_status != tracker_bizobj.GetStatus(issue): |
| issue.status_modified_timestamp = timestamp |
| if old_components != set(issue.component_ids): |
| issue.component_modified_timestamp = timestamp |
| |
| # Store the issue in SQL. |
| self.UpdateIssue(cnxn, issue, commit=False, invalidate=False) |
| |
| comment_pb = self.CreateIssueComment( |
| cnxn, issue, reporter_id, comment, amendments=amendments, |
| is_description=is_description, attachments=attachments, commit=False, |
| kept_attachments=kept_attachments, timestamp=timestamp, |
| importer_id=importer_id, inbound_message=inbound_message) |
| self._UpdateIssuesModified( |
| cnxn, iids_to_invalidate, modified_timestamp=issue.modified_timestamp, |
| invalidate=invalidate) |
| |
| # Add a comment to the newly added issues saying they are now blocking |
| # this issue. |
| for add_issue in self.GetIssues(cnxn, delta.blocked_on_add): |
| self.CreateIssueComment( |
| cnxn, add_issue, reporter_id, content='', |
| amendments=[tracker_bizobj.MakeBlockingAmendment( |
| [(issue.project_name, issue.local_id)], [], |
| default_project_name=add_issue.project_name)], |
| timestamp=timestamp, importer_id=importer_id) |
| # Add a comment to the newly removed issues saying they are no longer |
| # blocking this issue. |
| for remove_issue in self.GetIssues(cnxn, delta.blocked_on_remove): |
| self.CreateIssueComment( |
| cnxn, remove_issue, reporter_id, content='', |
| amendments=[tracker_bizobj.MakeBlockingAmendment( |
| [], [(issue.project_name, issue.local_id)], |
| default_project_name=remove_issue.project_name)], |
| timestamp=timestamp, importer_id=importer_id) |
| |
| # Add a comment to the newly added issues saying they are now blocked on |
| # this issue. |
| for add_issue in self.GetIssues(cnxn, delta.blocking_add): |
| self.CreateIssueComment( |
| cnxn, add_issue, reporter_id, content='', |
| amendments=[tracker_bizobj.MakeBlockedOnAmendment( |
| [(issue.project_name, issue.local_id)], [], |
| default_project_name=add_issue.project_name)], |
| timestamp=timestamp, importer_id=importer_id) |
| # Add a comment to the newly removed issues saying they are no longer |
| # blocked on this issue. |
| for remove_issue in self.GetIssues(cnxn, delta.blocking_remove): |
| self.CreateIssueComment( |
| cnxn, remove_issue, reporter_id, content='', |
| amendments=[tracker_bizobj.MakeBlockedOnAmendment( |
| [], [(issue.project_name, issue.local_id)], |
| default_project_name=remove_issue.project_name)], |
| timestamp=timestamp, importer_id=importer_id) |
| |
| if not invalidate: |
| cnxn.Commit() |
| |
| if index_now: |
| tracker_fulltext.IndexIssues( |
| cnxn, [issue], services.user_service, self, self._config_service) |
| else: |
| self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id]) |
| |
| return amendments, comment_pb |
| |
| def InvalidateIIDs(self, cnxn, iids_to_invalidate): |
| """Invalidate the specified issues in the Invalidate table and memcache.""" |
| issues_to_invalidate = self.GetIssues(cnxn, iids_to_invalidate) |
| self.InvalidateIssues(cnxn, issues_to_invalidate) |
| |
| def InvalidateIssues(self, cnxn, issues): |
| """Invalidate the specified issues in the Invalidate table and memcache.""" |
| iids = [issue.issue_id for issue in issues] |
| self.issue_2lc.InvalidateKeys(cnxn, iids) |
| self._config_service.InvalidateMemcache(issues) |
| |
| def RelateIssues(self, cnxn, issue_relation_dict, commit=True): |
| """Update the IssueRelation table rows for the given relationships. |
| |
| issue_relation_dict is a mapping of 'source' issues to 'destination' issues, |
| paired with the kind of relationship connecting the two. |
| """ |
| relation_rows = [] |
| for src_iid, dests in issue_relation_dict.items(): |
| for dst_iid, kind in dests: |
| if kind == 'blocking': |
| relation_rows.append((dst_iid, src_iid, 'blockedon', 0)) |
| elif kind == 'blockedon': |
| relation_rows.append((src_iid, dst_iid, 'blockedon', 0)) |
| elif kind == 'mergedinto': |
| relation_rows.append((src_iid, dst_iid, 'mergedinto', None)) |
| |
| self.issuerelation_tbl.InsertRows( |
| cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit) |
| |
| def CopyIssues(self, cnxn, dest_project, issues, user_service, copier_id): |
| """Copy the given issues into the destination project.""" |
| created_issues = [] |
| iids_to_invalidate = set() |
| |
| for target_issue in issues: |
| assert not target_issue.assume_stale, ( |
| 'issue2514: Copying issue that might be stale: %r' % target_issue) |
| new_issue = tracker_pb2.Issue() |
| new_issue.project_id = dest_project.project_id |
| new_issue.project_name = dest_project.project_name |
| new_issue.summary = target_issue.summary |
| new_issue.labels.extend(target_issue.labels) |
| new_issue.field_values.extend(target_issue.field_values) |
| new_issue.reporter_id = copier_id |
| |
| timestamp = int(time.time()) |
| new_issue.opened_timestamp = timestamp |
| new_issue.modified_timestamp = timestamp |
| |
| target_comments = self.GetCommentsForIssue(cnxn, target_issue.issue_id) |
| initial_summary_comment = target_comments[0] |
| |
| # Note that blocking and merge_into are not copied. |
| if target_issue.blocked_on_iids: |
| blocked_on = target_issue.blocked_on_iids |
| iids_to_invalidate.update(blocked_on) |
| new_issue.blocked_on_iids = blocked_on |
| |
| # Gather list of attachments from the target issue's summary comment. |
| # MakeIssueComments expects a list of [(filename, contents, mimetype),...] |
| attachments = [] |
| for attachment in initial_summary_comment.attachments: |
| client = storage.Client() |
| bucket = client.get_bucket(app_identity.get_default_gcs_bucket_name) |
| blob = bucket.get_blob(attachment.gcs_object_id) |
| content = blob.download_as_bytes() |
| attachments.append([attachment.filename, content, attachment.mimetype]) |
| |
| if attachments: |
| new_issue.attachment_count = len(attachments) |
| |
| # Create the same summary comment as the target issue. |
| comment = self._MakeIssueComment( |
| dest_project.project_id, copier_id, initial_summary_comment.content, |
| attachments=attachments, timestamp=timestamp, is_description=True) |
| |
| new_issue.local_id = self.AllocateNextLocalID( |
| cnxn, dest_project.project_id) |
| issue_id = self.InsertIssue(cnxn, new_issue) |
| comment.issue_id = issue_id |
| self.InsertComment(cnxn, comment) |
| |
| if permissions.HasRestrictions(new_issue, 'view'): |
| self._config_service.InvalidateMemcache( |
| [new_issue], key_prefix='nonviewable:') |
| |
| tracker_fulltext.IndexIssues( |
| cnxn, [new_issue], user_service, self, self._config_service) |
| created_issues.append(new_issue) |
| |
| # The referenced issues are all modified when the relationship is added. |
| self._UpdateIssuesModified( |
| cnxn, iids_to_invalidate, modified_timestamp=timestamp) |
| |
| return created_issues |
| |
| def MoveIssues(self, cnxn, dest_project, issues, user_service): |
| """Move the given issues into the destination project.""" |
| old_location_rows = [ |
| (issue.issue_id, issue.project_id, issue.local_id) |
| for issue in issues] |
| moved_back_iids = set() |
| |
| former_locations_in_project = self.issueformerlocations_tbl.Select( |
| cnxn, cols=ISSUEFORMERLOCATIONS_COLS, |
| project_id=dest_project.project_id, |
| issue_id=[issue.issue_id for issue in issues]) |
| former_locations = { |
| issue_id: local_id |
| for issue_id, project_id, local_id in former_locations_in_project} |
| |
| # Remove the issue id from issue_id_2lc so that it does not stay |
| # around in cache and memcache. |
| # The Key of IssueIDTwoLevelCache is (project_id, local_id). |
| self.issue_id_2lc.InvalidateKeys( |
| cnxn, [(issue.project_id, issue.local_id) for issue in issues]) |
| self.InvalidateIssues(cnxn, issues) |
| |
| for issue in issues: |
| if issue.issue_id in former_locations: |
| dest_id = former_locations[issue.issue_id] |
| moved_back_iids.add(issue.issue_id) |
| else: |
| dest_id = self.AllocateNextLocalID(cnxn, dest_project.project_id) |
| |
| issue.local_id = dest_id |
| issue.project_id = dest_project.project_id |
| issue.project_name = dest_project.project_name |
| |
| # Rewrite each whole issue so that status and label IDs are looked up |
| # in the context of the destination project. |
| self.UpdateIssues(cnxn, issues) |
| |
| # Comments also have the project_id because it is needed for an index. |
| self.comment_tbl.Update( |
| cnxn, {'project_id': dest_project.project_id}, |
| issue_id=[issue.issue_id for issue in issues], commit=False) |
| |
| # Record old locations so that we can offer links if the user looks there. |
| self.issueformerlocations_tbl.InsertRows( |
| cnxn, ISSUEFORMERLOCATIONS_COLS, old_location_rows, ignore=True, |
| commit=False) |
| cnxn.Commit() |
| |
| tracker_fulltext.IndexIssues( |
| cnxn, issues, user_service, self, self._config_service) |
| |
| return moved_back_iids |
| |
| def ExpungeFormerLocations(self, cnxn, project_id): |
| """Delete history of issues that were in this project but moved out.""" |
| self.issueformerlocations_tbl.Delete(cnxn, project_id=project_id) |
| |
| def ExpungeIssues(self, cnxn, issue_ids): |
| """Completely delete the specified issues from the database.""" |
| logging.info('expunging the issues %r', issue_ids) |
| tracker_fulltext.UnindexIssues(issue_ids) |
| |
| remaining_iids = issue_ids[:] |
| |
| # Note: these are purposely not done in a transaction to allow |
| # incremental progress in what might be a very large change. |
| # We are not concerned about non-atomic deletes because all |
| # this data will be gone eventually anyway. |
| while remaining_iids: |
| iids_in_chunk = remaining_iids[:CHUNK_SIZE] |
| remaining_iids = remaining_iids[CHUNK_SIZE:] |
| self.issuesummary_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
| self.issue2label_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
| self.issue2component_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
| self.issue2cc_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
| self.issue2notify_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
| self.issueupdate_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
| self.attachment_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
| self.comment_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
| self.issuerelation_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
| self.issuerelation_tbl.Delete(cnxn, dst_issue_id=iids_in_chunk) |
| self.danglingrelation_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
| self.issueformerlocations_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
| self.reindexqueue_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
| self.issue_tbl.Delete(cnxn, id=iids_in_chunk) |
| |
| def SoftDeleteIssue(self, cnxn, project_id, local_id, deleted, user_service): |
| """Set the deleted boolean on the indicated issue and store it. |
| |
| Args: |
| cnxn: connection to SQL database. |
| project_id: int project ID for the current project. |
| local_id: int local ID of the issue to freeze/unfreeze. |
| deleted: boolean, True to soft-delete, False to undelete. |
| user_service: persistence layer for users, used to lookup user IDs. |
| """ |
| issue = self.GetIssueByLocalID(cnxn, project_id, local_id, use_cache=False) |
| issue.deleted = deleted |
| issue.migration_modified_timestamp = int(time.time()) |
| self.UpdateIssue(cnxn, issue, update_cols=['deleted', 'migration_modified']) |
| tracker_fulltext.IndexIssues( |
| cnxn, [issue], user_service, self, self._config_service) |
| |
| def DeleteComponentReferences(self, cnxn, component_id): |
| """Delete any references to the specified component.""" |
| # TODO(jrobbins): add tasks to re-index any affected issues. |
| # Note: if this call fails, some data could be left |
| # behind, but it would not be displayed, and it could always be |
| # GC'd from the DB later. |
| self.issue2component_tbl.Delete(cnxn, component_id=component_id) |
| |
| ### Local ID generation |
| |
| def InitializeLocalID(self, cnxn, project_id): |
| """Initialize the local ID counter for the specified project to zero. |
| |
| Args: |
| cnxn: connection to SQL database. |
| project_id: int ID of the project. |
| """ |
| self.localidcounter_tbl.InsertRow( |
| cnxn, project_id=project_id, used_local_id=0, used_spam_id=0) |
| |
| def SetUsedLocalID(self, cnxn, project_id): |
| """Set the local ID counter based on existing issues. |
| |
| Args: |
| cnxn: connection to SQL database. |
| project_id: int ID of the project. |
| """ |
| highest_id = self.GetHighestLocalID(cnxn, project_id) |
| self.localidcounter_tbl.InsertRow( |
| cnxn, replace=True, used_local_id=highest_id, project_id=project_id) |
| return highest_id |
| |
| def AllocateNextLocalID(self, cnxn, project_id): |
| """Return the next available issue ID in the specified project. |
| |
| Args: |
| cnxn: connection to SQL database. |
| project_id: int ID of the project. |
| |
| Returns: |
| The next local ID. |
| """ |
| try: |
| next_local_id = self.localidcounter_tbl.IncrementCounterValue( |
| cnxn, 'used_local_id', project_id=project_id) |
| except AssertionError as e: |
| logging.info('exception incrementing local_id counter: %s', e) |
| next_local_id = self.SetUsedLocalID(cnxn, project_id) + 1 |
| return next_local_id |
| |
| def GetHighestLocalID(self, cnxn, project_id): |
| """Return the highest used issue ID in the specified project. |
| |
| Args: |
| cnxn: connection to SQL database. |
| project_id: int ID of the project. |
| |
| Returns: |
| The highest local ID for an active or moved issues. |
| """ |
| highest = self.issue_tbl.SelectValue( |
| cnxn, 'MAX(local_id)', project_id=project_id) |
| highest = highest or 0 # It will be None if the project has no issues. |
| highest_former = self.issueformerlocations_tbl.SelectValue( |
| cnxn, 'MAX(local_id)', project_id=project_id) |
| highest_former = highest_former or 0 |
| return max(highest, highest_former) |
| |
| def GetAllLocalIDsInProject(self, cnxn, project_id, min_local_id=None): |
| """Return the list of local IDs only, not the actual issues. |
| |
| Args: |
| cnxn: connection to SQL database. |
| project_id: the ID of the project to which the issue belongs. |
| min_local_id: point to start at. |
| |
| Returns: |
| A range object of local IDs from 1 to N, or from min_local_id to N. It |
| may be the case that some of those local IDs are no longer used, e.g., |
| if some issues were moved out of this project. |
| """ |
| if not min_local_id: |
| min_local_id = 1 |
| highest_local_id = self.GetHighestLocalID(cnxn, project_id) |
| return list(range(min_local_id, highest_local_id + 1)) |
| |
| def ExpungeLocalIDCounters(self, cnxn, project_id): |
| """Delete history of local ids that were in this project.""" |
| self.localidcounter_tbl.Delete(cnxn, project_id=project_id) |
| |
| ### Comments |
| |
| def _UnpackComment( |
| self, comment_row, content_dict, inbound_message_dict, approval_dict, |
| importer_dict): |
| """Partially construct a Comment PB from a DB row.""" |
| (comment_id, issue_id, created, project_id, commenter_id, |
| deleted_by, is_spam, is_description, commentcontent_id) = comment_row |
| comment = tracker_pb2.IssueComment() |
| comment.id = comment_id |
| comment.issue_id = issue_id |
| comment.timestamp = created |
| comment.project_id = project_id |
| comment.user_id = commenter_id |
| comment.content = content_dict.get(commentcontent_id, '') |
| comment.inbound_message = inbound_message_dict.get(commentcontent_id, '') |
| comment.deleted_by = deleted_by or 0 |
| comment.is_spam = bool(is_spam) |
| comment.is_description = bool(is_description) |
| comment.approval_id = approval_dict.get(comment_id) |
| comment.importer_id = importer_dict.get(comment_id) |
| return comment |
| |
| def _UnpackAmendment(self, amendment_row): |
| """Construct an Amendment PB from a DB row.""" |
| ( |
| _id, _issue_id, comment_id, field_name, old_value, new_value, |
| added_user_id, removed_user_id, custom_field_name, added_component_id, |
| removed_component_id) = amendment_row |
| amendment = tracker_pb2.Amendment() |
| field_enum = tracker_pb2.FieldID(field_name.upper()) |
| amendment.field = field_enum |
| |
| # TODO(jrobbins): display old values in more cases. |
| if new_value is not None: |
| amendment.newvalue = new_value |
| if old_value is not None: |
| amendment.oldvalue = old_value |
| if added_user_id: |
| amendment.added_user_ids.append(added_user_id) |
| if removed_user_id: |
| amendment.removed_user_ids.append(removed_user_id) |
| if custom_field_name: |
| amendment.custom_field_name = custom_field_name |
| if added_component_id: |
| added_component_id = int(added_component_id) |
| amendment.added_component_ids.append(added_component_id) |
| if removed_component_id: |
| removed_component_id = int(removed_component_id) |
| amendment.removed_component_ids.append(removed_component_id) |
| return amendment, comment_id |
| |
| def _ConsolidateAmendments(self, amendments): |
| """Consoliodate amendments of the same field in one comment into one |
| amendment PB.""" |
| |
| fields_dict = {} |
| result = [] |
| |
| for amendment in amendments: |
| key = amendment.field, amendment.custom_field_name |
| fields_dict.setdefault(key, []).append(amendment) |
| for (field, _custom_name), sorted_amendments in sorted(fields_dict.items()): |
| new_amendment = tracker_pb2.Amendment() |
| new_amendment.field = field |
| for amendment in sorted_amendments: |
| if amendment.newvalue is not None: |
| if new_amendment.newvalue is not None: |
| # NOTE: see crbug/monorail/8272. BLOCKEDON and BLOCKING changes |
| # are all stored in newvalue e.g. (newvalue = -b/123 b/124) and |
| # external bugs and monorail bugs are stored in separate amendments. |
| # Without this, the values of external bug amendments and monorail |
| # blocker bug amendments may overwrite each other. |
| new_amendment.newvalue += (' ' + amendment.newvalue) |
| else: |
| new_amendment.newvalue = amendment.newvalue |
| if amendment.oldvalue is not None: |
| new_amendment.oldvalue = amendment.oldvalue |
| if amendment.added_user_ids: |
| new_amendment.added_user_ids.extend(amendment.added_user_ids) |
| if amendment.removed_user_ids: |
| new_amendment.removed_user_ids.extend(amendment.removed_user_ids) |
| if amendment.custom_field_name: |
| new_amendment.custom_field_name = amendment.custom_field_name |
| if amendment.added_component_ids: |
| new_amendment.added_component_ids.extend( |
| amendment.added_component_ids) |
| if amendment.removed_component_ids: |
| new_amendment.removed_component_ids.extend( |
| amendment.removed_component_ids) |
| result.append(new_amendment) |
| return result |
| |
| def _UnpackAttachment(self, attachment_row): |
| """Construct an Attachment PB from a DB row.""" |
| (attachment_id, _issue_id, comment_id, filename, filesize, mimetype, |
| deleted, gcs_object_id) = attachment_row |
| attach = tracker_pb2.Attachment() |
| attach.attachment_id = attachment_id |
| attach.filename = filename |
| attach.filesize = filesize |
| attach.mimetype = mimetype |
| attach.deleted = bool(deleted) |
| attach.gcs_object_id = gcs_object_id |
| return attach, comment_id |
| |
| def _DeserializeComments( |
| self, comment_rows, commentcontent_rows, amendment_rows, attachment_rows, |
| approval_rows, importer_rows): |
| """Turn rows into IssueComment PBs.""" |
| results = [] # keep objects in the same order as the rows |
| results_dict = {} # for fast access when joining. |
| |
| content_dict = dict( |
| (commentcontent_id, content) for |
| commentcontent_id, content, _ in commentcontent_rows) |
| inbound_message_dict = dict( |
| (commentcontent_id, inbound_message) for |
| commentcontent_id, _, inbound_message in commentcontent_rows) |
| approval_dict = dict( |
| (comment_id, approval_id) for approval_id, comment_id in |
| approval_rows) |
| importer_dict = dict(importer_rows) |
| |
| for comment_row in comment_rows: |
| comment = self._UnpackComment( |
| comment_row, content_dict, inbound_message_dict, approval_dict, |
| importer_dict) |
| results.append(comment) |
| results_dict[comment.id] = comment |
| |
| for amendment_row in amendment_rows: |
| amendment, comment_id = self._UnpackAmendment(amendment_row) |
| try: |
| results_dict[comment_id].amendments.extend([amendment]) |
| except KeyError: |
| logging.error('Found amendment for missing comment: %r', comment_id) |
| |
| for attachment_row in attachment_rows: |
| attach, comment_id = self._UnpackAttachment(attachment_row) |
| try: |
| results_dict[comment_id].attachments.append(attach) |
| except KeyError: |
| logging.error('Found attachment for missing comment: %r', comment_id) |
| |
| for c in results: |
| c.amendments = self._ConsolidateAmendments(c.amendments) |
| |
| return results |
| |
| # TODO(jrobbins): make this a private method and expose just the interface |
| # needed by activities.py. |
| def GetComments( |
| self, cnxn, where=None, order_by=None, content_only=False, **kwargs): |
| """Retrieve comments from SQL.""" |
| shard_id = sql.RandomShardID() |
| order_by = order_by or [('created', [])] |
| comment_rows = self.comment_tbl.Select( |
| cnxn, cols=COMMENT_COLS, where=where, |
| order_by=order_by, shard_id=shard_id, **kwargs) |
| cids = [row[0] for row in comment_rows] |
| commentcontent_ids = [row[-1] for row in comment_rows] |
| content_rows = self.commentcontent_tbl.Select( |
| cnxn, cols=COMMENTCONTENT_COLS, id=commentcontent_ids, |
| shard_id=shard_id) |
| approval_rows = self.issueapproval2comment_tbl.Select( |
| cnxn, cols=ISSUEAPPROVAL2COMMENT_COLS, comment_id=cids) |
| amendment_rows = [] |
| attachment_rows = [] |
| importer_rows = [] |
| if not content_only: |
| amendment_rows = self.issueupdate_tbl.Select( |
| cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids, shard_id=shard_id) |
| attachment_rows = self.attachment_tbl.Select( |
| cnxn, cols=ATTACHMENT_COLS, comment_id=cids, shard_id=shard_id) |
| importer_rows = self.commentimporter_tbl.Select( |
| cnxn, cols=COMMENTIMPORTER_COLS, comment_id=cids, shard_id=shard_id) |
| |
| comments = self._DeserializeComments( |
| comment_rows, content_rows, amendment_rows, attachment_rows, |
| approval_rows, importer_rows) |
| return comments |
| |
| def GetComment(self, cnxn, comment_id): |
| """Get the requested comment, or raise an exception.""" |
| comments = self.GetComments(cnxn, id=comment_id) |
| try: |
| return comments[0] |
| except IndexError: |
| raise exceptions.NoSuchCommentException() |
| |
| def GetCommentsForIssue(self, cnxn, issue_id): |
| """Return all IssueComment PBs for the specified issue. |
| |
| Args: |
| cnxn: connection to SQL database. |
| issue_id: int global ID of the issue. |
| |
| Returns: |
| A list of the IssueComment protocol buffers for the description |
| and comments on this issue. |
| """ |
| comments = self.GetComments(cnxn, issue_id=[issue_id]) |
| for i, comment in enumerate(comments): |
| comment.sequence = i |
| |
| return comments |
| |
| |
| def GetCommentsByID(self, cnxn, comment_ids, sequences, use_cache=True, |
| shard_id=None): |
| """Return all IssueComment PBs by comment ids. |
| |
| Args: |
| cnxn: connection to SQL database. |
| comment_ids: a list of comment ids. |
| sequences: sequence of the comments. |
| use_cache: optional boolean to enable the cache. |
| shard_id: optional int shard_id to limit retrieval. |
| |
| Returns: |
| A list of the IssueComment protocol buffers for comment_ids. |
| """ |
| # Try loading issue comments from a random shard to reduce load on |
| # primary DB. |
| if shard_id is None: |
| shard_id = sql.RandomShardID() |
| |
| comment_dict, _missed_comments = self.comment_2lc.GetAll(cnxn, comment_ids, |
| use_cache=use_cache, shard_id=shard_id) |
| |
| comments = sorted(list(comment_dict.values()), key=lambda x: x.timestamp) |
| |
| for i in range(len(comment_ids)): |
| comments[i].sequence = sequences[i] |
| |
| return comments |
| |
| # TODO(jrobbins): remove this method because it is too slow when an issue |
| # has a huge number of comments. |
| def GetCommentsForIssues(self, cnxn, issue_ids, content_only=False): |
| """Return all IssueComment PBs for each issue ID in the given list. |
| |
| Args: |
| cnxn: connection to SQL database. |
| issue_ids: list of integer global issue IDs. |
| content_only: optional boolean, set true for faster loading of |
| comment content without attachments and amendments. |
| |
| Returns: |
| Dict {issue_id: [IssueComment, ...]} with IssueComment protocol |
| buffers for the description and comments on each issue. |
| """ |
| comments = self.GetComments( |
| cnxn, issue_id=issue_ids, content_only=content_only) |
| |
| comments_dict = collections.defaultdict(list) |
| for comment in comments: |
| comment.sequence = len(comments_dict[comment.issue_id]) |
| comments_dict[comment.issue_id].append(comment) |
| |
| return comments_dict |
| |
| def InsertComment(self, cnxn, comment, commit=True): |
| """Store the given issue comment in SQL. |
| |
| Args: |
| cnxn: connection to SQL database. |
| comment: IssueComment PB to insert into the database. |
| commit: set to False to avoid doing the commit for now. |
| """ |
| commentcontent_id = self.commentcontent_tbl.InsertRow( |
| cnxn, content=comment.content, |
| inbound_message=comment.inbound_message, commit=False) |
| comment_id = self.comment_tbl.InsertRow( |
| cnxn, issue_id=comment.issue_id, created=comment.timestamp, |
| project_id=comment.project_id, |
| commenter_id=comment.user_id, |
| deleted_by=comment.deleted_by or None, |
| is_spam=comment.is_spam, is_description=comment.is_description, |
| commentcontent_id=commentcontent_id, |
| commit=False) |
| comment.id = comment_id |
| if comment.importer_id: |
| self.commentimporter_tbl.InsertRow( |
| cnxn, comment_id=comment_id, importer_id=comment.importer_id) |
| |
| amendment_rows = [] |
| for amendment in comment.amendments: |
| field_enum = str(amendment.field).lower() |
| if (amendment.get_assigned_value('newvalue') is not None and |
| not amendment.added_user_ids and not amendment.removed_user_ids): |
| amendment_rows.append( |
| ( |
| comment.issue_id, comment_id, field_enum, amendment.oldvalue, |
| amendment.newvalue, None, None, amendment.custom_field_name, |
| None, None)) |
| for added_user_id in amendment.added_user_ids: |
| amendment_rows.append( |
| ( |
| comment.issue_id, comment_id, field_enum, None, None, |
| added_user_id, None, amendment.custom_field_name, None, None)) |
| for removed_user_id in amendment.removed_user_ids: |
| amendment_rows.append( |
| ( |
| comment.issue_id, comment_id, field_enum, None, None, None, |
| removed_user_id, amendment.custom_field_name, None, None)) |
| for added_component_id in amendment.added_component_ids: |
| amendment_rows.append( |
| ( |
| comment.issue_id, comment_id, field_enum, None, None, None, |
| None, amendment.custom_field_name, added_component_id, None)) |
| for removed_component_id in amendment.removed_component_ids: |
| amendment_rows.append( |
| ( |
| comment.issue_id, comment_id, field_enum, None, None, None, |
| None, amendment.custom_field_name, None, removed_component_id)) |
| # ISSUEUPDATE_COLS[1:] to skip id column. |
| self.issueupdate_tbl.InsertRows( |
| cnxn, ISSUEUPDATE_COLS[1:], amendment_rows, commit=False) |
| |
| attachment_rows = [] |
| for attach in comment.attachments: |
| attachment_rows.append([ |
| comment.issue_id, comment.id, attach.filename, attach.filesize, |
| attach.mimetype, attach.deleted, attach.gcs_object_id]) |
| self.attachment_tbl.InsertRows( |
| cnxn, ATTACHMENT_COLS[1:], attachment_rows, commit=False) |
| |
| if comment.approval_id: |
| self.issueapproval2comment_tbl.InsertRows( |
| cnxn, ISSUEAPPROVAL2COMMENT_COLS, |
| [(comment.approval_id, comment_id)], commit=False) |
| |
| if commit: |
| cnxn.Commit() |
| |
| def _UpdateComment(self, cnxn, comment, update_cols=None): |
| """Update the given issue comment in SQL. |
| |
| Args: |
| cnxn: connection to SQL database. |
| comment: IssueComment PB to update in the database. |
| update_cols: optional list of just the field names to update. |
| """ |
| delta = { |
| 'commenter_id': comment.user_id, |
| 'deleted_by': comment.deleted_by or None, |
| 'is_spam': comment.is_spam, |
| } |
| if update_cols is not None: |
| delta = {key: val for key, val in delta.items() |
| if key in update_cols} |
| |
| self.comment_tbl.Update(cnxn, delta, id=comment.id) |
| self.comment_2lc.InvalidateKeys(cnxn, [comment.id]) |
| |
| def _MakeIssueComment( |
| self, project_id, user_id, content, inbound_message=None, |
| amendments=None, attachments=None, kept_attachments=None, timestamp=None, |
| is_spam=False, is_description=False, approval_id=None, importer_id=None): |
| """Create in IssueComment protocol buffer in RAM. |
| |
| Args: |
| project_id: Project with the issue. |
| user_id: the user ID of the user who entered the comment. |
| content: string body of the comment. |
| inbound_message: optional string full text of an email that |
| caused this comment to be added. |
| amendments: list of Amendment PBs describing the |
| metadata changes that the user made along w/ comment. |
| attachments: [(filename, contents, mimetype),...] attachments uploaded at |
| the time the comment was made. |
| kept_attachments: list of Attachment PBs for attachments kept from |
| previous descriptions, if the comment is a description |
| timestamp: time at which the comment was made, defaults to now. |
| is_spam: True if the comment was classified as spam. |
| is_description: True if the comment is a description for the issue. |
| approval_id: id, if any, of the APPROVAL_TYPE FieldDef this comment |
| belongs to. |
| importer_id: optional User ID of script that imported the comment on |
| behalf of a user. |
| |
| Returns: |
| The new IssueComment protocol buffer. |
| |
| The content may have some markup done during input processing. |
| |
| Any attachments are immediately stored. |
| """ |
| comment = tracker_pb2.IssueComment() |
| comment.project_id = project_id |
| comment.user_id = user_id |
| comment.content = content or '' |
| comment.is_spam = is_spam |
| comment.is_description = is_description |
| if not timestamp: |
| timestamp = int(time.time()) |
| comment.timestamp = int(timestamp) |
| if inbound_message: |
| comment.inbound_message = inbound_message |
| if amendments: |
| logging.info('amendments is %r', amendments) |
| comment.amendments.extend(amendments) |
| if approval_id: |
| comment.approval_id = approval_id |
| |
| if attachments: |
| for filename, body, mimetype in attachments: |
| gcs_object_id = gcs_helpers.StoreObjectInGCS( |
| body, mimetype, project_id, filename=filename) |
| attach = tracker_pb2.Attachment() |
| # attachment id is determined later by the SQL DB. |
| attach.filename = filename |
| attach.filesize = len(body) |
| attach.mimetype = mimetype |
| attach.gcs_object_id = gcs_object_id |
| comment.attachments.extend([attach]) |
| logging.info("Save attachment with object_id: %s" % gcs_object_id) |
| |
| if kept_attachments: |
| for kept_attach in kept_attachments: |
| (filename, filesize, mimetype, deleted, |
| gcs_object_id) = kept_attach[3:] |
| new_attach = tracker_pb2.Attachment( |
| filename=filename, filesize=filesize, mimetype=mimetype, |
| deleted=bool(deleted), gcs_object_id=gcs_object_id) |
| comment.attachments.append(new_attach) |
| logging.info("Copy attachment with object_id: %s" % gcs_object_id) |
| |
| if importer_id: |
| comment.importer_id = importer_id |
| |
| return comment |
| |
| def CreateIssueComment( |
| self, cnxn, issue, user_id, content, inbound_message=None, |
| amendments=None, attachments=None, kept_attachments=None, timestamp=None, |
| is_spam=False, is_description=False, approval_id=None, commit=True, |
| importer_id=None): |
| """Create and store a new comment on the specified issue. |
| |
| Args: |
| cnxn: connection to SQL database. |
| issue: the issue on which to add the comment, must be loaded from |
| database with use_cache=False so that assume_stale == False. |
| user_id: the user ID of the user who entered the comment. |
| content: string body of the comment. |
| inbound_message: optional string full text of an email that caused |
| this comment to be added. |
| amendments: list of Amendment PBs describing the |
| metadata changes that the user made along w/ comment. |
| attachments: [(filename, contents, mimetype),...] attachments uploaded at |
| the time the comment was made. |
| kept_attachments: list of attachment ids for attachments kept from |
| previous descriptions, if the comment is an update to the description |
| timestamp: time at which the comment was made, defaults to now. |
| is_spam: True if the comment is classified as spam. |
| is_description: True if the comment is a description for the issue. |
| approval_id: id, if any, of the APPROVAL_TYPE FieldDef this comment |
| belongs to. |
| commit: set to False to not commit to DB yet. |
| importer_id: user ID of an API client that is importing issues. |
| |
| Returns: |
| The new IssueComment protocol buffer. |
| |
| Note that we assume that the content is safe to echo out |
| again. The content may have some markup done during input |
| processing. |
| """ |
| if is_description: |
| kept_attachments = self.GetAttachmentsByID(cnxn, kept_attachments) |
| else: |
| kept_attachments = [] |
| |
| comment = self._MakeIssueComment( |
| issue.project_id, user_id, content, amendments=amendments, |
| inbound_message=inbound_message, attachments=attachments, |
| timestamp=timestamp, is_spam=is_spam, is_description=is_description, |
| kept_attachments=kept_attachments, approval_id=approval_id, |
| importer_id=importer_id) |
| comment.issue_id = issue.issue_id |
| |
| if attachments or kept_attachments: |
| issue.attachment_count = ( |
| issue.attachment_count + len(attachments) + len(kept_attachments)) |
| self.UpdateIssue(cnxn, issue, update_cols=['attachment_count']) |
| |
| self.comment_creations.increment() |
| self.InsertComment(cnxn, comment, commit=commit) |
| |
| return comment |
| |
| def SoftDeleteComment( |
| self, cnxn, issue, issue_comment, deleted_by_user_id, |
| user_service, delete=True, reindex=False, is_spam=False): |
| """Mark comment as un/deleted, which shows/hides it from average users.""" |
| # Update number of attachments |
| attachments = 0 |
| if issue_comment.attachments: |
| for attachment in issue_comment.attachments: |
| if not attachment.deleted: |
| attachments += 1 |
| |
| # Delete only if it's not in deleted state |
| if delete: |
| if not issue_comment.deleted_by: |
| issue_comment.deleted_by = deleted_by_user_id |
| issue.attachment_count = issue.attachment_count - attachments |
| issue.migration_modified_timestamp = int(time.time()) |
| |
| # Undelete only if it's in deleted state |
| elif issue_comment.deleted_by: |
| issue_comment.deleted_by = 0 |
| issue.attachment_count = issue.attachment_count + attachments |
| issue.migration_modified_timestamp = int(time.time()) |
| |
| issue_comment.is_spam = is_spam |
| self._UpdateComment( |
| cnxn, issue_comment, update_cols=['deleted_by', 'is_spam']) |
| self.UpdateIssue( |
| cnxn, issue, update_cols=['attachment_count', 'migration_modified']) |
| |
| # Reindex the issue to take the comment deletion/undeletion into account. |
| if reindex: |
| tracker_fulltext.IndexIssues( |
| cnxn, [issue], user_service, self, self._config_service) |
| else: |
| self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id]) |
| |
| ### Approvals |
| |
| def GetIssueApproval(self, cnxn, issue_id, approval_id, use_cache=True): |
| """Retrieve the specified approval for the specified issue.""" |
| issue = self.GetIssue(cnxn, issue_id, use_cache=use_cache) |
| approval = tracker_bizobj.FindApprovalValueByID( |
| approval_id, issue.approval_values) |
| if approval: |
| return issue, approval |
| raise exceptions.NoSuchIssueApprovalException() |
| |
| def DeltaUpdateIssueApproval( |
| self, cnxn, modifier_id, config, issue, approval, approval_delta, |
| comment_content=None, is_description=False, attachments=None, |
| commit=True, kept_attachments=None): |
| """Update the issue's approval in the database.""" |
| amendments = [] |
| |
| # Update status in RAM and DB and create status amendment. |
| if approval_delta.status: |
| approval.status = approval_delta.status |
| approval.set_on = approval_delta.set_on or int(time.time()) |
| approval.setter_id = modifier_id |
| status_amendment = tracker_bizobj.MakeApprovalStatusAmendment( |
| approval_delta.status) |
| amendments.append(status_amendment) |
| |
| self._UpdateIssueApprovalStatus( |
| cnxn, issue.issue_id, approval.approval_id, approval.status, |
| approval.setter_id, approval.set_on) |
| |
| # Update approver_ids in RAM and DB and create approver amendment. |
| approvers_add = [approver for approver in approval_delta.approver_ids_add |
| if approver not in approval.approver_ids] |
| approvers_remove = [approver for approver in |
| approval_delta.approver_ids_remove |
| if approver in approval.approver_ids] |
| if approvers_add or approvers_remove: |
| approver_ids = [approver for approver in |
| list(approval.approver_ids) + approvers_add |
| if approver not in approvers_remove] |
| approval.approver_ids = approver_ids |
| approvers_amendment = tracker_bizobj.MakeApprovalApproversAmendment( |
| approvers_add, approvers_remove) |
| amendments.append(approvers_amendment) |
| |
| self._UpdateIssueApprovalApprovers( |
| cnxn, issue.issue_id, approval.approval_id, approver_ids) |
| |
| fv_amendments = tracker_bizobj.ApplyFieldValueChanges( |
| issue, config, approval_delta.subfield_vals_add, |
| approval_delta.subfield_vals_remove, approval_delta.subfields_clear) |
| amendments.extend(fv_amendments) |
| if fv_amendments: |
| self._UpdateIssuesFields(cnxn, [issue], commit=False) |
| |
| label_amendment = tracker_bizobj.ApplyLabelChanges( |
| issue, config, approval_delta.labels_add, approval_delta.labels_remove) |
| if label_amendment: |
| amendments.append(label_amendment) |
| self._UpdateIssuesLabels(cnxn, [issue], commit=False) |
| |
| comment_pb = self.CreateIssueComment( |
| cnxn, issue, modifier_id, comment_content, amendments=amendments, |
| approval_id=approval.approval_id, is_description=is_description, |
| attachments=attachments, commit=False, |
| kept_attachments=kept_attachments) |
| |
| if commit: |
| cnxn.Commit() |
| self.issue_2lc.InvalidateKeys(cnxn, [issue.issue_id]) |
| |
| return comment_pb |
| |
| def _UpdateIssueApprovalStatus( |
| self, cnxn, issue_id, approval_id, status, setter_id, set_on): |
| """Update the approvalvalue for the given issue_id's issue.""" |
| set_on = set_on or int(time.time()) |
| delta = { |
| 'status': status.name.lower(), |
| 'setter_id': setter_id, |
| 'set_on': set_on, |
| } |
| self.issue2approvalvalue_tbl.Update( |
| cnxn, delta, approval_id=approval_id, issue_id=issue_id, |
| commit=False) |
| |
| def _UpdateIssueApprovalApprovers( |
| self, cnxn, issue_id, approval_id, approver_ids): |
| """Update the list of approvers allowed to approve an issue's approval.""" |
| self.issueapproval2approver_tbl.Delete( |
| cnxn, issue_id=issue_id, approval_id=approval_id, commit=False) |
| self.issueapproval2approver_tbl.InsertRows( |
| cnxn, ISSUEAPPROVAL2APPROVER_COLS, [(approval_id, approver_id, issue_id) |
| for approver_id in approver_ids], |
| commit=False) |
| |
| ### Attachments |
| |
| def GetAttachmentAndContext(self, cnxn, attachment_id): |
| """Load a IssueAttachment from database, and its comment ID and IID. |
| |
| Args: |
| cnxn: connection to SQL database. |
| attachment_id: long integer unique ID of desired issue attachment. |
| |
| Returns: |
| An Attachment protocol buffer that contains metadata about the attached |
| file, or None if it doesn't exist. Also, the comment ID and issue IID |
| of the comment and issue that contain this attachment. |
| |
| Raises: |
| NoSuchAttachmentException: the attachment was not found. |
| """ |
| if attachment_id is None: |
| raise exceptions.NoSuchAttachmentException() |
| |
| attachment_row = self.attachment_tbl.SelectRow( |
| cnxn, cols=ATTACHMENT_COLS, id=attachment_id) |
| if attachment_row: |
| (attach_id, issue_id, comment_id, filename, filesize, mimetype, |
| deleted, gcs_object_id) = attachment_row |
| if not deleted: |
| attachment = tracker_pb2.Attachment( |
| attachment_id=attach_id, filename=filename, filesize=filesize, |
| mimetype=mimetype, deleted=bool(deleted), |
| gcs_object_id=gcs_object_id) |
| return attachment, comment_id, issue_id |
| |
| raise exceptions.NoSuchAttachmentException() |
| |
| def GetAttachmentsByID(self, cnxn, attachment_ids): |
| """Return all Attachment PBs by attachment ids. |
| |
| Args: |
| cnxn: connection to SQL database. |
| attachment_ids: a list of comment ids. |
| |
| Returns: |
| A list of the Attachment protocol buffers for the attachments with |
| these ids. |
| """ |
| attachment_rows = self.attachment_tbl.Select( |
| cnxn, cols=ATTACHMENT_COLS, id=attachment_ids) |
| |
| return attachment_rows |
| |
| def _UpdateAttachment(self, cnxn, comment, attach, update_cols=None): |
| """Update attachment metadata in the DB. |
| |
| Args: |
| cnxn: connection to SQL database. |
| comment: IssueComment PB to invalidate in the cache. |
| attach: IssueAttachment PB to update in the DB. |
| update_cols: optional list of just the field names to update. |
| """ |
| delta = { |
| 'filename': attach.filename, |
| 'filesize': attach.filesize, |
| 'mimetype': attach.mimetype, |
| 'deleted': bool(attach.deleted), |
| } |
| if update_cols is not None: |
| delta = {key: val for key, val in delta.items() |
| if key in update_cols} |
| |
| self.attachment_tbl.Update(cnxn, delta, id=attach.attachment_id) |
| self.comment_2lc.InvalidateKeys(cnxn, [comment.id]) |
| |
| def SoftDeleteAttachment( |
| self, cnxn, issue, issue_comment, attach_id, user_service, delete=True, |
| index_now=False): |
| """Mark attachment as un/deleted, which shows/hides it from avg users.""" |
| attachment = None |
| for attach in issue_comment.attachments: |
| if attach.attachment_id == attach_id: |
| attachment = attach |
| |
| if not attachment: |
| logging.warning( |
| 'Tried to (un)delete non-existent attachment #%s in project ' |
| '%s issue %s', attach_id, issue.project_id, issue.local_id) |
| return |
| |
| if not issue_comment.deleted_by: |
| # Decrement attachment count only if it's not in deleted state |
| if delete: |
| if not attachment.deleted: |
| issue.attachment_count = issue.attachment_count - 1 |
| issue.migration_modified_timestamp = int(time.time()) |
| |
| # Increment attachment count only if it's in deleted state |
| elif attachment.deleted: |
| issue.attachment_count = issue.attachment_count + 1 |
| issue.migration_modified_timestamp = int(time.time()) |
| |
| logging.info('attachment.deleted was %s', attachment.deleted) |
| |
| attachment.deleted = delete |
| |
| logging.info('attachment.deleted is %s', attachment.deleted) |
| |
| self._UpdateAttachment( |
| cnxn, issue_comment, attachment, update_cols=['deleted']) |
| self.UpdateIssue( |
| cnxn, issue, update_cols=['attachment_count', 'migration_modified']) |
| |
| if index_now: |
| tracker_fulltext.IndexIssues( |
| cnxn, [issue], user_service, self, self._config_service) |
| else: |
| self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id]) |
| |
| ### Reindex queue |
| |
| def EnqueueIssuesForIndexing(self, cnxn, issue_ids, commit=True): |
| # type: (MonorailConnection, Collection[int], Optional[bool]) -> None |
| """Add the given issue IDs to the ReindexQueue table.""" |
| reindex_rows = [(issue_id,) for issue_id in issue_ids] |
| self.reindexqueue_tbl.InsertRows( |
| cnxn, ['issue_id'], reindex_rows, ignore=True, commit=commit) |
| |
| def ReindexIssues(self, cnxn, num_to_reindex, user_service): |
| """Reindex some issues specified in the IndexQueue table.""" |
| rows = self.reindexqueue_tbl.Select( |
| cnxn, order_by=[('created', [])], limit=num_to_reindex) |
| issue_ids = [row[0] for row in rows] |
| |
| if issue_ids: |
| issues = self.GetIssues(cnxn, issue_ids) |
| tracker_fulltext.IndexIssues( |
| cnxn, issues, user_service, self, self._config_service) |
| self.reindexqueue_tbl.Delete(cnxn, issue_id=issue_ids) |
| |
| return len(issue_ids) |
| |
| ### Search functions |
| |
| def RunIssueQuery( |
| self, cnxn, left_joins, where, order_by, shard_id=None, limit=None): |
| """Run a SQL query to find matching issue IDs. |
| |
| Args: |
| cnxn: connection to SQL database. |
| left_joins: list of SQL LEFT JOIN clauses. |
| where: list of SQL WHERE clauses. |
| order_by: list of SQL ORDER BY clauses. |
| shard_id: int shard ID to focus the search. |
| limit: int maximum number of results, defaults to |
| settings.search_limit_per_shard. |
| |
| Returns: |
| (issue_ids, capped) where issue_ids is a list of the result issue IDs, |
| and capped is True if the number of results reached the limit. |
| """ |
| limit = limit or settings.search_limit_per_shard |
| where = where + [('Issue.deleted = %s', [False])] |
| rows = self.issue_tbl.Select( |
| cnxn, shard_id=shard_id, distinct=True, cols=['Issue.id'], |
| left_joins=left_joins, where=where, order_by=order_by, |
| limit=limit) |
| issue_ids = [row[0] for row in rows] |
| capped = len(issue_ids) >= limit |
| return issue_ids, capped |
| |
| def GetIIDsByLabelIDs(self, cnxn, label_ids, project_id, shard_id): |
| """Return a list of IIDs for issues with any of the given label IDs.""" |
| if not label_ids: |
| return [] |
| where = [] |
| if shard_id is not None: |
| slice_term = ('shard = %s', [shard_id]) |
| where.append(slice_term) |
| |
| rows = self.issue_tbl.Select( |
| cnxn, shard_id=shard_id, cols=['id'], |
| left_joins=[('Issue2Label ON Issue.id = Issue2Label.issue_id', [])], |
| label_id=label_ids, project_id=project_id, where=where) |
| return [row[0] for row in rows] |
| |
| def GetIIDsByParticipant(self, cnxn, user_ids, project_ids, shard_id): |
| """Return IIDs for issues where any of the given users participate.""" |
| iids = [] |
| where = [] |
| if shard_id is not None: |
| where.append(('shard = %s', [shard_id])) |
| if project_ids: |
| cond_str = 'Issue.project_id IN (%s)' % sql.PlaceHolders(project_ids) |
| where.append((cond_str, project_ids)) |
| |
| # TODO(jrobbins): Combine these 3 queries into one with ORs. It currently |
| # is not the bottleneck. |
| rows = self.issue_tbl.Select( |
| cnxn, cols=['id'], reporter_id=user_ids, |
| where=where, shard_id=shard_id) |
| for row in rows: |
| iids.append(row[0]) |
| |
| rows = self.issue_tbl.Select( |
| cnxn, cols=['id'], owner_id=user_ids, |
| where=where, shard_id=shard_id) |
| for row in rows: |
| iids.append(row[0]) |
| |
| rows = self.issue_tbl.Select( |
| cnxn, cols=['id'], derived_owner_id=user_ids, |
| where=where, shard_id=shard_id) |
| for row in rows: |
| iids.append(row[0]) |
| |
| rows = self.issue_tbl.Select( |
| cnxn, cols=['id'], |
| left_joins=[('Issue2Cc ON Issue2Cc.issue_id = Issue.id', [])], |
| cc_id=user_ids, |
| where=where + [('cc_id IS NOT NULL', [])], |
| shard_id=shard_id) |
| for row in rows: |
| iids.append(row[0]) |
| |
| rows = self.issue_tbl.Select( |
| cnxn, cols=['Issue.id'], |
| left_joins=[ |
| ('Issue2FieldValue ON Issue.id = Issue2FieldValue.issue_id', []), |
| ('FieldDef ON Issue2FieldValue.field_id = FieldDef.id', [])], |
| user_id=user_ids, grants_perm='View', |
| where=where + [('user_id IS NOT NULL', [])], |
| shard_id=shard_id) |
| for row in rows: |
| iids.append(row[0]) |
| |
| return iids |
| |
| ### Issue Dependency Rankings |
| |
| def SortBlockedOn(self, cnxn, issue, blocked_on_iids): |
| """Sort blocked_on dependencies by rank and dst_issue_id. |
| |
| Args: |
| cnxn: connection to SQL database. |
| issue: the issue being blocked. |
| blocked_on_iids: the iids of all the issue's blockers |
| |
| Returns: |
| a tuple (ids, ranks), where ids is the sorted list of |
| blocked_on_iids and ranks is the list of corresponding ranks |
| """ |
| rows = self.issuerelation_tbl.Select( |
| cnxn, cols=ISSUERELATION_COLS, issue_id=issue.issue_id, |
| dst_issue_id=blocked_on_iids, kind='blockedon', |
| order_by=[('rank DESC', []), ('dst_issue_id', [])]) |
| ids = [row[1] for row in rows] |
| ids.extend([iid for iid in blocked_on_iids if iid not in ids]) |
| ranks = [row[3] for row in rows] |
| ranks.extend([0] * (len(blocked_on_iids) - len(ranks))) |
| return ids, ranks |
| |
| def ApplyIssueRerank( |
| self, cnxn, parent_id, relations_to_change, commit=True, invalidate=True): |
| """Updates rankings of blocked on issue relations to new values |
| |
| Args: |
| cnxn: connection to SQL database. |
| parent_id: the global ID of the blocked issue to update |
| relations_to_change: This should be a list of |
| [(blocker_id, new_rank),...] of relations that need to be changed |
| commit: set to False to skip the DB commit and do it in the caller. |
| invalidate: set to False to leave cache invalidatation to the caller. |
| """ |
| blocker_ids = [blocker for (blocker, rank) in relations_to_change] |
| self.issuerelation_tbl.Delete( |
| cnxn, issue_id=parent_id, dst_issue_id=blocker_ids, commit=False) |
| insert_rows = [(parent_id, blocker, 'blockedon', rank) |
| for (blocker, rank) in relations_to_change] |
| self.issuerelation_tbl.InsertRows( |
| cnxn, cols=ISSUERELATION_COLS, row_values=insert_rows, commit=commit) |
| if invalidate: |
| self.InvalidateIIDs(cnxn, [parent_id]) |
| |
| # Expunge Users from Issues system. |
| def ExpungeUsersInIssues(self, cnxn, user_ids_by_email, limit=None): |
| """Removes all references to given users from issue DB tables. |
| |
| This method will not commit the operations. This method will |
| not make changes to in-memory data. |
| |
| Args: |
| cnxn: connection to SQL database. |
| user_ids_by_email: dict of {email: user_id} of all users we want |
| to expunge. |
| limit: Optional, the limit for each operation. |
| |
| Returns: |
| A list of issue_ids that need to be reindexed. |
| """ |
| commit = False |
| user_ids = list(user_ids_by_email.values()) |
| user_emails = list(user_ids_by_email.keys()) |
| # Track issue_ids for issues that will have different search documents |
| # and need updates to modification time as a result of removing users. |
| affected_issue_ids = [] |
| |
| timestamp = int(time.time()) |
| |
| # Reassign commenter_id and delete inbound_messages. |
| shard_id = sql.RandomShardID() |
| comment_content_id_rows = self.comment_tbl.Select( |
| cnxn, cols=['Comment.id', 'Comment.issue_id', 'commentcontent_id'], |
| commenter_id=user_ids, shard_id=shard_id, limit=limit) |
| comment_ids = [row[0] for row in comment_content_id_rows] |
| commentcontent_ids = [row[2] for row in comment_content_id_rows] |
| if commentcontent_ids: |
| self.commentcontent_tbl.Update( |
| cnxn, {'inbound_message': None}, id=commentcontent_ids, commit=commit) |
| if comment_ids: |
| self.comment_tbl.Update( |
| cnxn, {'commenter_id': framework_constants.DELETED_USER_ID}, |
| id=comment_ids, |
| commit=commit) |
| affected_issue_ids.extend([row[1] for row in comment_content_id_rows]) |
| |
| # Reassign deleted_by comments deleted_by. |
| self.comment_tbl.Update( |
| cnxn, |
| {'deleted_by': framework_constants.DELETED_USER_ID}, |
| deleted_by=user_ids, |
| commit=commit, limit=limit) |
| |
| # Remove users in field values. |
| fv_issue_id_rows = self.issue2fieldvalue_tbl.Select( |
| cnxn, cols=['issue_id'], user_id=user_ids, limit=limit) |
| fv_issue_ids = [row[0] for row in fv_issue_id_rows] |
| self.issue2fieldvalue_tbl.Delete( |
| cnxn, user_id=user_ids, limit=limit, commit=commit) |
| affected_issue_ids.extend(fv_issue_ids) |
| |
| # Remove users in approval values. |
| self.issueapproval2approver_tbl.Delete( |
| cnxn, approver_id=user_ids, commit=commit, limit=limit) |
| self.issue2approvalvalue_tbl.Update( |
| cnxn, |
| {'setter_id': framework_constants.DELETED_USER_ID}, |
| setter_id=user_ids, |
| commit=commit, limit=limit) |
| |
| # Remove users in issue Ccs. |
| cc_issue_id_rows = self.issue2cc_tbl.Select( |
| cnxn, cols=['issue_id'], cc_id=user_ids, limit=limit) |
| cc_issue_ids = [row[0] for row in cc_issue_id_rows] |
| self.issue2cc_tbl.Delete( |
| cnxn, cc_id=user_ids, limit=limit, commit=commit) |
| affected_issue_ids.extend(cc_issue_ids) |
| |
| # Remove users in issue owners. |
| owner_issue_id_rows = self.issue_tbl.Select( |
| cnxn, cols=['id'], owner_id=user_ids, limit=limit) |
| owner_issue_ids = [row[0] for row in owner_issue_id_rows] |
| if owner_issue_ids: |
| self.issue_tbl.Update( |
| cnxn, {'owner_id': None}, id=owner_issue_ids, commit=commit) |
| affected_issue_ids.extend(owner_issue_ids) |
| derived_owner_issue_id_rows = self.issue_tbl.Select( |
| cnxn, cols=['id'], derived_owner_id=user_ids, limit=limit) |
| derived_owner_issue_ids = [row[0] for row in derived_owner_issue_id_rows] |
| if derived_owner_issue_ids: |
| self.issue_tbl.Update( |
| cnxn, {'derived_owner_id': None}, |
| id=derived_owner_issue_ids, |
| commit=commit) |
| affected_issue_ids.extend(derived_owner_issue_ids) |
| |
| # Remove users in issue reporters. |
| reporter_issue_id_rows = self.issue_tbl.Select( |
| cnxn, cols=['id'], reporter_id=user_ids, limit=limit) |
| reporter_issue_ids = [row[0] for row in reporter_issue_id_rows] |
| if reporter_issue_ids: |
| self.issue_tbl.Update( |
| cnxn, {'reporter_id': framework_constants.DELETED_USER_ID}, |
| id=reporter_issue_ids, |
| commit=commit) |
| affected_issue_ids.extend(reporter_issue_ids) |
| |
| # Note: issueupdate_tbl's and issue2notify's user_id columns do not |
| # reference the User table. So all values need to updated here before |
| # User rows can be deleted safely. No limit will be applied. |
| |
| # Remove users in issue updates. |
| user_added_id_rows = self.issueupdate_tbl.Select( |
| cnxn, |
| cols=['IssueUpdate.issue_id'], |
| added_user_id=user_ids, |
| shard_id=shard_id, |
| limit=limit) |
| user_removed_id_rows = self.issueupdate_tbl.Select( |
| cnxn, |
| cols=['IssueUpdate.issue_id'], |
| removed_user_id=user_ids, |
| shard_id=shard_id, |
| limit=limit) |
| self.issueupdate_tbl.Update( |
| cnxn, |
| {'added_user_id': framework_constants.DELETED_USER_ID}, |
| added_user_id=user_ids, |
| commit=commit) |
| self.issueupdate_tbl.Update( |
| cnxn, |
| {'removed_user_id': framework_constants.DELETED_USER_ID}, |
| removed_user_id=user_ids, |
| commit=commit) |
| affected_issue_ids.extend([row[0] for row in user_added_id_rows]) |
| affected_issue_ids.extend([row[0] for row in user_removed_id_rows]) |
| |
| # Remove users in issue notify. |
| self.issue2notify_tbl.Delete( |
| cnxn, email=user_emails, commit=commit) |
| |
| # Remove users in issue snapshots. |
| self.issuesnapshot_tbl.Update( |
| cnxn, |
| {'owner_id': framework_constants.DELETED_USER_ID}, |
| owner_id=user_ids, |
| commit=commit, limit=limit) |
| self.issuesnapshot_tbl.Update( |
| cnxn, |
| {'reporter_id': framework_constants.DELETED_USER_ID}, |
| reporter_id=user_ids, |
| commit=commit, limit=limit) |
| self.issuesnapshot2cc_tbl.Delete( |
| cnxn, cc_id=user_ids, commit=commit, limit=limit) |
| |
| # Update migration_modified timestamp for affected issues. |
| deduped_issue_ids = list(set(affected_issue_ids)) |
| if deduped_issue_ids: |
| self.issue_tbl.Update( |
| cnxn, {'migration_modified': timestamp}, |
| id=deduped_issue_ids, |
| commit=commit) |
| |
| return deduped_issue_ids |